Stream流编程

  • WebFlux 编程需要有 Stream 流编程基础,而 Lambda 表达式、函数式接口编程又是 Stream 流编程的基础。

Lambda表达式

  • Lambda表达式是JDK8中出现的新特性,其是函数接口的一种实现方式,用于代替匿名内部类。

概念

  • 函数式接口,Functional Interface,也称为功能性接口。简单来说,接口中可以包含多个方法,但仅能有一个自己的抽象方法,即接口的默认方法和静态方法并不影响一个接口成为函数式接口。例如,Java标准库中的java.lang.Runnable是典型的函数式接口。
  • 在JDK帮助文档中,FunctionInterface注解的说明中有关于函数接口的详细描述。
    【原文+翻译】An informative annotation type(信息注释类型) used to indicate(表明) that 【an interface type declaration is intended(意图) to be a functional interface as defined by the Java Language Specification(规范)】. Conceptually(从概念上讲), a functional interface has exactly(仅仅) one abstract method. Since(因为) default methods have an implementation, they are not abstract. If an interface declares an abstract method 【overriding one of the public method java.lang.Object】, that also does not count toward the interface’s abstract method count(其不计入接口抽象方法计数) since any implementation of the interface will have an implementation from java.lang.Object or elsewhere(因为任何一个该接口的实现都将有一个来自于java.lang.Object或其它地方的实现).

用法演示 01-lambda-stream

  • 创建一个 SpringBoot 工程,命名为 01-lambda-stream,在该工程中进行代码演示。
  • 无参数无返回值:
@FunctionalInterface
public interface NoParamNoReturn {
    void doSome();
}
  • 无参数有返回值:
@FunctionalInterface
public interface NoParamHasReturn {
    String doSome();
}
  • 有参数有返回值:
@FunctionalInterface
public interface HasParamHasReturn {
    String doSome(String a, int b);
}
  • 函数式接口与默认方法:只包含一个抽象方法的接口称为函数式接口,而接口中的默认方法并不是抽象方法,所以函数式接口中可以包含默认方法。即 Lambda 表达式可以实现包含有默认方法的函数式接口。
@FunctionalInterface
public interface HasDefault {
    String doSome(String a, String b);

    default void doOther(String a, String b) {
        System.out.println("执行默认方法 - " + a + b);
    }
}
  • 测试:
public class LambdaTest {
    @Test
    public void test01() {
        new NoParamNoReturn() {
            @Override
            public void doSome() {
                System.out.println("使用匿名内部类实现");
            }
        }.doSome();

        NoParamNoReturn lambda = () -> System.out.println("使用Lambda实现");
        lambda.doSome();
    }

    @Test
    public void test02() {
        System.out.println(new NoParamHasReturn() {
            @Override
            public String doSome() {
                return "匿名内部类";
            }
        }.doSome());

        NoParamHasReturn lambda = () -> "Lambda";
        System.out.println(lambda.doSome());
    }

    @Test
    public void test03() {
        System.out.println(new HasParamHasReturn() {
            @Override
            public String doSome(String a, int b) {
                return a + b;
            }
        }.doSome("Hello @FunctionalInterface, ", 2021));

        HasParamHasReturn lambda = (str, n) -> str + n;
        System.out.println(lambda.doSome("Hello Lambda, ", 2021));
    }

    @Test
    public void test04() {
        HasDefault lambda = (a, b) -> a + b;
        System.out.println(lambda.doSome("Hello ", "Lambda"));
        lambda.doOther("Hello", "default");
    }
}

函数式接口编程

  • JDK8 中就根据接口方法参数与返回值的不同,定义好了若干内置的函数接口。在使用Lambda 表达式时,无需再定义那么多其它接口了,只需根据情况选择内置接口即可。
内置接口参数返回类型说明
Predicate<T>TBoolean断言
Consumer<T>T-仅消费一个数据,没有返回值
Supplier<T>-T仅提供(返回)一个数据,无需输入
Function<T, R>TR仅输入一个数据,返回一个数据
UnaryOperator<T>TT输入输出类型相同的Function
BiFunction<T, U, R>(T, U)R输入两个数据,返回一个数据
BinaryOperator<T>(T, U)T两个输入与一个输出类型相同的BiFunction

Predicate

  • 该接口用于判断输入的对象是否符合某个条件。该接口中只有一个抽象方法 test(),三个默认方法 and(与)、or(或)、negate(非),还有一个静态方法 isEqual()。
  • test() 方法会将参数值应用于接口 Lambda 表达式,测试该值的断言。IntPredicate、DoublePredicate 与 Predicate 接口没有任何关系,是另外一种全新的接口,仅仅是为了使用方便。
  • Predicate 接口提供了三个默认方法 and(与)、or(或)、negate(非),它们用于对两个断言结果再次进行运算。
  • Predicate 接口中包含一个静态方法 isEqual(),该方法的参数将作为比较对象,与 test() 方法的参数进行相等性比较。
public class PredicateTest {
    @Test
    public void test01() {
        // Predicate.test()用法
        Predicate<Integer> pre = i -> i > 8;
        System.out.println(pre.test(9));    // true
        System.out.println(pre.test(7));    // false

        IntPredicate intPre = i -> i < 3;
        System.out.println(intPre.test(2));     // true
        System.out.println(intPre.test(5));     // false

        DoublePredicate doublePre = n -> n < 5;
        System.out.println(doublePre.test(5.2));    // false
        System.out.println(doublePre.test(4.8));    // true
    }

    @Test
    public void test02() {
        // Predicate默认(与或非)方法的用法
        Predicate<Integer> gt8 = i -> i > 8;
        Predicate<Integer> lt3 = i -> i < 3;

        System.out.println(gt8.and(lt3).test(9));   // false
        System.out.println(gt8.or(lt3).test(9));    // true
        System.out.println(gt8.negate().test(9));   // false
    }

    @Test
    public void test() {
        System.out.println(Predicate.isEqual("Hello").test("hello"));   // false
        System.out.println(Predicate.isEqual("Hello").test("Hello"));   // true
    }
}

Consumer

  • Consumer 消费者,只有一个输入没有输出的函数接口。该接口有一个抽象方法 accept(),与一个默认方法 andThen()。
  • accept() 方法用于将参数值应用于接口 Lambda 表达式。
  • andThen() 方法将两个 Consumer 表达式连接,先执行前面的,再执行后面的。
public class ConsumerTest {
    @Test
    public void test01() {
        Consumer<String> consumer = s -> System.out.println("Hello, " + s);
        consumer.accept("Consumer");    // Hello, Consumer
    }

    @Test
    public void test02() {
        Consumer<Integer> c1 = n -> System.out.println(n * 2);
        Consumer<Integer> c2 = n -> System.out.println(n * n);
        c1.andThen(c2).accept(5); // 10 25
    }
}

Supplier

  • Supplier 提供者,没有输入但有输出的函数接口。该接口只有一个抽象方法 get(),用于获取函数接口方法的返回值。
public class SupplierTest {
    @Test
    public void test01() {
        Supplier<String> supplier = () -> "Supplier lambda";
        System.out.println(supplier.get()); // Supplier lambda
    }
}

Function

  • 只有一个输入,且有一个输出的函数接口。该接口中有一个抽象方法 apply(),有两个默方法 andThen() 与 compose(),及一个静态方法 identity()。
  • apply() 方法用于将参数应用于接口方法。
  • andThen() 是先执行前面的接口Lambda表达式,再将前面的执行结果作为后面 Lambda 的输入参数执行后面的。
  • compose() 方法的执行恰好与andThen()的顺序相反。该方法会首先将 apply()方法的参数作为输入运行前面的接口 Lambda 表达式,然后,再将前面表达式的运算结果作为输入再运行后面的接口 Lambda 表达式。即,前面的作为后面的组成部分出现。
  • identity() 是 Function 接口包含的静态方法,其返回结果为该 Function 的输入参数值。
public class FunctionTest {
    @Test
    public void test01() {
        Function<Integer, String> func = n -> "I love you, " + n;
        System.out.println(func.apply(2021));   // I love you, 2021
    }

    @Test
    public void test02() {
        Function<Integer, Integer> func1 = x -> x * 2;
        Function<Integer, Integer> func2 = x -> x * x;

        // 先将 5 作为func1的参数,计算结果为 10;再将 func1 计算结果10作为 func2 的参数再计算
        System.out.println(func1.andThen(func2).apply(5)); // 100
        // 先将 5 作为func2的参数,计算结果为 25;再将 func2 计算结果25作为 func1 的参数再计算
        System.out.println(func1.compose(func2).apply(5)); // 50
    }

    @Test
    public void test03() {
        System.out.println(Function.identity().apply(5));       // 5
        System.out.println(Function.identity().apply(3 * 8));   // 24
    }
}

UnaryOperater

  • 与 Function 功能用法基本一样
public class UnaryOperaterTest {
    @Test
    public void test01() {
        UnaryOperator<String> uo = n -> "I love you, " + n;
        System.out.println(uo.apply("HuBei")); // I love you, HuBei
    }

    @Test
    public void test02() {
        UnaryOperator<Integer> uo1 = x -> x * 2;
        UnaryOperator<Integer> uo2 = x -> x * x;

        // 先将 5 作为uo1的参数,计算结果10,再讲uo1计算结果10作为uo2的参数再计算
        System.out.println(uo1.andThen(uo2).apply(5)); // 100
        // 先将 5 作为uo2的参数,计算结果25,再讲uo2计算结果10作为uo1的参数再计算
        System.out.println(uo1.compose(uo2).apply(5)); // 50
    }

    @Test
    public void test03() {
        System.out.println(UnaryOperator.identity().apply(5)); // 5
        System.out.println(UnaryOperator.identity().apply(3 * 8)); // 24
    }
}

BiFunction

  • 有两个输入与一个输出的函数接口。BiFunction 接口与 Function 接口没有任何关系。其有一个抽象方法 apply(),与一个默认方法 andThen()。这两个方法的意义与前面的相同。
public class BiFunctionTest {
    @Test
    public void test01() {
        BiFunction<Integer, Integer, String> biFunc = (x, y) -> x + " : " + y;
        System.out.println(biFunc.apply(3, 5)); // 3 : 5
    }

    @Test
    public void test02() {
        BiFunction<Integer, Integer, String> biFunc = (x, y) -> x + " : " + y;
        UnaryOperator<String> uo = s -> "键值对为 " + s;
        // 将(3, 5)应用于biFunc上,再讲biFunc的运算结果作为uo的参数进行运算
        System.out.println(biFunc.andThen(uo).apply(3, 5)); // 键值对为 3 : 5
    }

    @Test
    public void test03() {
        BiFunction<Integer, Integer, Integer> biFunc = (x, y) -> x + y;
        Function<Integer, String> func = n -> "结果为:" + n;
        // 将(3, 5)应用于biFunc上,再讲biFunc的运算结果作为func的参数进行运算
        System.out.println(biFunc.andThen(func).apply(3, 5)); // 结果为:8
    }
}

BinaryOperater

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    private String name;
    private int age;
}
public class StudentComparator implements Comparator<Student> {
    @Override
    public int compare(Student s1, Student s2) {
        return s1.getAge() - s2.getAge();
    }
}
public class BinaryOperatorTest {
    @Test
    public void test01() {
        BinaryOperator<Integer> bo = (x, y) -> x * y;
        System.out.println(bo.apply(3, 5)); // 15
    }

    @Test
    public void test02() {
        BinaryOperator<Integer> bo = (x, y) -> x * y;
        Function<Integer, String> func = n -> "结果为:" + n;
        // 将(3, 5)应用于bo上,再将bo的运算结果作为func的参数进行运算
        System.out.println(bo.andThen(func).apply(3, 5));   // 结果为:15
    }

    @Test
    public void test03() {
        Student stu3 = new Student("张三", 23);
        Student stu4 = new Student("李四", 24);

        StudentComparator comparator = new StudentComparator();
        Student minStu = BinaryOperator.minBy(comparator).apply(stu3, stu4);
        Student maxStu = BinaryOperator.maxBy(comparator).apply(stu3, stu4);

        System.out.println(minStu); // Student(name=张三, age=23)
        System.out.println(maxStu); // Student(name=李四, age=24)
    }
}

Lambda方法引用

  • Lambda 方法引用是指,借助内置函数式接口对一个类中的静态方法、实例方法、构造方法进行使用的方式。

准备

public class Person {
    private String name;
    public Person() {
    }
    public Person(String name) {
        this.name = name;
    }
    // 静态方法
    public static void sleeping(int hours) {
        System.out.println("人每天需要睡眠 " + hours + " 小时");
    }
    // 实例方法:在第一个参数位置存在一个隐藏参数this
//    public String play(Person this, int minutes) {
    public String play(int minutes) {
        return name + "已经玩了" + minutes + "分钟了。";
    }

    public void study(String course) {
        System.out.println(name + "正在学习" + course);
    }

    @Override
    public String toString() {
        return "Person{name = " + name +"}";
    }
}

测试代码

public class MethodRefTest {
    @Test
    public void test01() {
        Person person = new Person("张三");
        System.out.println(person.play(5)); // 张三已经玩了5分钟了。
        person.study("WebFlux");    // 张三正在学习WebFlux
    }

    @Test
    public void test02() {
        // Lambda静态方法引用   类名::静态方法名
        // sleeping()方法只有一个输入,没有输出,符合函数式接口Consumer的定义
        Consumer<Integer> consumer = Person::sleeping;
        consumer.accept(8); // 相当于  Person.sleeping(8)
    }

    @Test
    public void test03() {
        // Lambda实例方法引用   实例名::实例方法名
        Person person = new Person("李四");
        // play()方法只有一个输入,且有输出,符合函数式接口Function的定义
        Function<Integer, String> func = person::play;
        System.out.println(func.apply(5));  // 相当于person.play(5)
    }

    @Test
    public void test04() {
        // Lambda实例方法引用   类名::实例方法名
        Person person = new Person("李四");
        // play()方法有一个隐藏参数,于是就有两个输入,且有输出,符合函数式接口BiFunction的定义
        BiFunction<Person, Integer, String> bf = Person::play;
        System.out.println(bf.apply(person, 5));  // 相当于person.play(5)
    }

    @Test
    public void test05() {
        // Lambda实例方法引用   实例名::实例方法名
        Person person = new Person("李四");
        // study()方法只有一个输入,没有输出,符合函数式接口Consumer的定义
        Consumer<String> consumer = person::study;
        consumer.accept("WebFlux");    // 相当于person.study("WebFlux")
    }

    @Test
    public void test06() {
        // Lambda无参构造器引用   类名::new
        // 无参构造器是没有输入,但有输出,其符合函数式接口Supplier的定义
        Supplier<Person> supplier = Person::new;
        System.out.println(supplier.get());  // 相当于 new Person()
    }

    @Test
    public void test07() {
        // Lambda 带参构造器引用   类名::new
        // 带参构造器是仅有一个输入,且有输出,其符合函数式接口Function的定义
        Function<String, Person> func = Person::new;
        System.out.println(func.apply("王五"));  // 相当于 new Person("王五")
    }
}

Stream流编程

  • Stream 流编程是一种函数式编程。

Stream概述

  • 函数式编程是一种编程范式,是如何编写程序的一种方法论。它属于结构化编程的一种,主要思想是把对数据的一连串操作通过链式函数调用的方式来实现。其属于 Fluent 风格编程范畴。
  • JDK8 中,Stream 是一个接口,其中包含了很多的方法,这些方法可以对流中的数据进行操作。这些方法共分为三类:
    • 创建 Stream 接口对象的方法,这些方法一般都需要通过输入原始操作数据,然后创建数据池;
    • 返回Stream类型的中间操作方法;
    • 返回其它类型的方法,这些方法即终止操作方法。
  • Stream 流编程是一种函数式编程,或者说,函数式编程是 Stream 流编程的实现方式。Stream 流编程的具体实现步骤如下:
    • 输入要操作的数据,将数据放入到“数据池”,而 Stream 就是这个“数据池”;
    • Stream 接连调用中间操作函数,这些函数均返回 Stream 类型;
    • Stream 调用终止操作函数,使链式调用结束,让其返回最终的操作结果,即“数据池”中最终数据。
  • Stream 流编程需要注意的问题
    • 中间操作返回的都是Stream,可以保证“链式调用”;
    • 这些函数都有一个共同的任务:对Stream流中的每一个数据都进行操作;
    • 一个中间函数对于数据池的操作结果将作为下一个中间操作函数的数据输入;
    • 所有操作都是不可逆的,一旦操作就会影响数据池中的数据。
  • Stream 流编程中对于中间方法的执行,存在一个惰性求值问题:多个中间操作可以连接起来形成一个链式调用,除非链式调用的最后执行了终止操作,否则中间操作是不会执行任何处理的。即只有当终止操作执行时才会触发链式调用的执行,这种方法调用方式称为惰性求值。
public class StreamTest {
    @Test
    public void test01() {
        int[] nums = {1, 2, 3};
        int sum = IntStream.of(nums)
                .map(i -> i * 2)    // 中间操作 {2, 4, 6}
                .map(i -> i * i)    // 中间操作 {4, 16, 36}
                .sum();             // 终止操作 56
        System.out.println(sum);
    }

    @Test
    public void test02() {
        int[] nums = {1, 2, 3};
        int sum = IntStream.of(nums)
                .map(i -> i * 2)    // 中间操作 {2, 4, 6}
                .map(i -> {
                    System.out.println(i + " 进行乘方");
                    return i * i;
                })    // 中间操作 {4, 16, 36}
                .sum();             // 终止操作 56
        System.out.println(sum);
//        2 进行乘方
//        4 进行乘方
//        6 进行乘方
//        56
    }

    @Test
    public void test03() {
        int[] nums = {1, 2, 3};
        int sum = IntStream.of(nums)
                .map(i -> i * 2)    // 中间操作 {2, 4, 6}
                .map(StreamTest::compute)    // 中间操作 {4, 16, 36}
                .sum();             // 终止操作 56
        System.out.println(sum);
    }

    // 静态方法,对指定元素进行乘方
    private static int compute(int n) {
        System.out.println(n + " 进行乘方");
        return n * n;
    }

    @Test
    public void test04() {
        int[] nums = {1, 2, 3};
        IntStream stream = IntStream.of(nums)
                .map(i -> i * 2)    // 中间操作 {2, 4, 6}
                .map(StreamTest::compute);// 中间操作 {4, 16, 36}
        System.out.println(Arrays.toString(nums)); // [1, 2, 3]
    }
}

Stream流的创建

  • 使用数组创建流:使用 java.util.Array 的静态方法 stream()可以创建流,即数据池中存放的是数组中的数据。
  • 使用集合创建流:使用 java.util.Collection 接口的方法可以创建集合流,即数据池中存放的是集合中的数据。
  • 创建数字流:通过“数字类型 Stream”类的 of()或 rangeClosed()方法可以创建包含指定数字的 Stream,即数据池中存放的是指定的数据。注:rangeClosed,闭区间。需要注意,这些数字流接口,例如 IntStream、LongStream、DoubleStream 等与 Stream 接口并无继承关系。但它们可以通过装箱方法 boxed()转换为 Stream 类型。当然,也可以通过 Random 的方法创建包含指定个数随机数的流。
  • 自定义流:通过 Stream 类的 generate()方法自定义流,其中 generate()方法中的参数为提供者 Supplier,即没有输入只有输出的接口。该流中的数据池中的数据就来自于这个提供者。
public class BuildStreamTest {
    @Test
    public void test01() {
        // 使用数组创建流
        int[] nums = {1, 2, 3};
        IntStream stream1 = IntStream.of(nums);
        IntStream stream2 = Arrays.stream(nums);

        System.out.println(IntStream.of(nums).sum());   // 6
        System.out.println(Arrays.stream(nums).sum());  // 6
    }

    @Test
    public void test02() {
        // 使用集合创建流
        List<String> list = new ArrayList<>();
        Stream<String> listStream = list.stream();

        Set<Integer> set = new HashSet<>();
        Stream<Integer> setStream = set.stream();
    }

    @Test
    public void test03() {
        // 创建数字流
        // 创建一个包含1, 2, 3的stream
        IntStream intStream = IntStream.of(1, 2, 3);

        // 创建一个包含[1, 5)范围的stream
        IntStream rangeStream = IntStream.range(1, 5);
        // 创建一个包含[1, 5]范围的stream
        IntStream rangeClosedStream = IntStream.rangeClosed(1, 5);
        System.out.println(rangeStream.sum());          // 10
        System.out.println(rangeClosedStream.sum());    // 15

        // new Random().ints() 创建一个无限流,limit(5)限制流中元素个数为5个
        IntStream randomLimitStream = new Random().ints().limit(5);
//        IntStream randomLimitStream = new Random().ints(5);   // 与上面等价
    }

    @Test
    public void test04() {
        // 自定义流: 生成一个无限流,然后限定元素个数
        Stream<Double> stream = Stream.generate(Math::random).limit(5);
        System.out.println(Arrays.toString(stream.toArray()));
    }
}

并行处理与串行处理

  • 我们前面的操作都是由一个线程以串行的方式逐个对流中的元素进行处理的。为了提高处理效率,Stream 支持以并行的方式对流中的元素进行处理。
  • 使用 Stream 流式编程对于并行操作非常简单,无需自己创建 Thread 多线程,只需调用parallel()方法即可实现多线程环境。默认情况下为串行流。
public class SerialParallelTest {
    @Test
    public void test01() {
        // 串行处理
        IntStream.range(1, 100)
//                .sequential()   // 默认
                .peek(SerialParallelTest::printA)
                .count();
    }

    @Test
    public void test02() {
        // 并行处理
        IntStream.range(1, 100)
                .parallel()
                .peek(SerialParallelTest::printA)
                .count();
    }

    @Test
    public void test03() {
        // 串并行混合处理:先并行后串行,最终执行效果为后者:串行处理
        IntStream.range(1, 100)
                .parallel()
                .peek(SerialParallelTest::printA)
                .sequential()
                .peek(SerialParallelTest::printB)
                .count();
    }

    @Test
    public void test04() {
        // 串并行混合处理:先串行后并行,最终执行效果为后者:并行处理
        IntStream.range(1, 100)
                .sequential()
                .peek(SerialParallelTest::printA)
                .parallel()
                .peek(SerialParallelTest::printB)
                .count();
    }

    @Test
    public void test05() {
        // 串行处理仅有一个main线程
        IntStream.range(1, 100)
//                .sequential()
                .peek(SerialParallelTest::printThread)
                .count();
    }

    @Test
    public void test06() {
        // 并行处理的默认线程数量
        IntStream.range(1, 100)
                .parallel()
                .peek(SerialParallelTest::printThread)
                .count();
    }

    @Test
    public void test07() {
        // 修改默认线程池中的线程数量
        // 指定默认线程池中的数量为32,其中包含指定的31个ForkJoinPool.commonPool-worker与main线程
        String key = "java.util.concurrent.ForkJoinPool.common.parallelism";
        System.setProperty(key, "31");
        IntStream.range(1, 100)
                .parallel()
                .peek(SerialParallelTest::printThread)
                .count();
    }

    @Test
    public void test08() {
        // 使用自定义线程池
        // 创建线程池,包含4个线程
        ForkJoinPool pool = new ForkJoinPool(4);
        // 定义并行任务
        Callable<Long> task = () -> IntStream.range(1, 100)
                .parallel()
                .peek(SerialParallelTest::printThread)
                .count();
        // 想线程池提交并行任务
        pool.submit(task);
        // wait()、notify()、notifyAll()方法必须在同步方法或同步代码块中被调用,
        // 且哪个对象调用了这些方法,哪个对应就要充当同步锁
        synchronized (pool) {
            try {
                // 将pool阻塞,即会阻塞main线程的执行
                pool.wait();
            } catch (Exception ignore) {
            }
        }
    }

    private static void printA(int x) {
        System.out.println(x + " A");
        sleep();
    }

    private static void printB(int x) {
        System.out.println(x + " B");
        sleep();
    }

    private static void printThread(int x) {
        String name = Thread.currentThread().getName();
        System.out.println(x + " -- " + name);
        sleep();

    }

    private static void sleep() {
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (Exception ignore) {
        }
    }
}

Stream流的中间操作

  • 中间操作分类:返回 Stream 接口类型的方法称为中间操作。根据“这些操作对当前数据池中的某一元素进行操作时是否需要了解对之前元素操作的结果”标准,可以将中间操作划分为无状态操作与有状态操作。
  • 无状态操作:
    • map(Function<T, R> action):功能是将流中的元素映射为另一个值,其参数为 Function,有一个输入与输出。由于有输出,所以 map() 操作会对流中元素产生影响。
    • mapToXxx():其功能是将流中的元素映射为指定类型的元素,不同的流其可映射为的元素类型是不同的,即其所拥有的 mapToXxx()方法是不同的。
    • flatMap(Function<T, Stream> action):其功能是将流中的元素映射为多个值,即扁平化 map。其适用场景为流中原来的每个元素为集合,该方法用于将每个集合元素全部打散,然后添加到流中。由于其参数为 Function,有输入与输出,所以 flatMap()操作会对流中元素产生影响。需要注意的是,该 Function 的输出类型为 Stream。
    • filter(Predicate<T> action):用于过滤掉不适合指定条件的流中的元素。其参数为 Predicate 断言,用于设置过滤条件。
  • 有状态操作:
    • distinct():过滤掉流中重复元素,无参数。
    • sorted() 或 sorted(Comparator c):对流中的元素进行排序。没有参数的 sorted()默认是按照字典序排序的,即按照 ASCII排序的。
    • skip(long):从流中去除指定个数的元素。
public class MiddleOperationTest {
    // 在所有操作都是无状态时,流中元素对于操作的执行
    //      并非是将流中所有元素按照顺序先执行完一个操作,再让所有元素执行完第二个操作
    //      而是逐个拿出元素,将所有操作执行完毕后,再拿出一个元素,将所有操作再执行完毕
    @Test
    public void test01() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" ")) // 当前流中的元素为各个单词
                .peek(System.out::println)
                .map(String::length) // 当前流中的元素为各个单词的长度
                .forEach(System.out::println);
    }

    @Test
    public void test02() {
        IntStream.range(1, 10)
                .mapToObj(i -> "No." + i)
                .forEach(System.out::println);
    }

    @Test
    public void test03() {
        String[] nums = {"111", "222", "333"};
        Arrays.stream(nums)     // "111", "222", "333"
                .mapToInt(Integer::valueOf)     // 111, 222, 333
                .forEach(System.out::println);
    }

    @Test
    public void test04() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                // flatMap中参数为Function,且要求返回类型为Stream
                .flatMap(word -> word.chars().boxed())
//                .forEach(System.out::print);
                .forEach(c -> System.out.print((char) (c.intValue())));
    }

    @Test
    public void test05() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                // 最终形成的流中的元素为各个单词的字母
                .flatMap(word -> Stream.of(word.split("")))
                .forEach(System.out::println);
    }

    @Test
    public void test06() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                // 当过滤条件为true时,当前元素会保留在流中,否则从流中删除
                .filter(word -> word.length() > 4)
                .forEach(System.out::println);
    }

    @Test
    public void test07() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                .flatMap(word -> Stream.of(word.split("")))
                .distinct() // 取出重复字母
                .sorted()   // 按照字典序进行排序
                .forEach(System.out::println);
    }

    @Test
    public void test08() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                .flatMap(word -> Stream.of(word.split("")))
                .distinct()
                // 按照逆字典序进行排序
                .sorted((o1, o2) -> (int) o2.charAt(0) - (int) o1.charAt(0))
                .forEach(System.out::println);
    }

    @Test
    public void test09() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                .flatMap(word -> Stream.of(word.split("")))
                .distinct()
                .sorted()
                .skip(4)    // 指定跳过(去除)4个元素
                .forEach(System.out::print);

        Stream.of(words.split(" "))
                .skip(3)
                .forEach(System.out::println);
    }

    @Test
    public void test10() {
        String words = "I Love China Welcome";
        Stream.of(words.split(" "))
                .flatMap(word -> Stream.of(word.split("")))
                .distinct()
                .peek(System.out::print)
                .sorted()
                .forEach(System.err::print);
    }
}

Stream流的终止操作

  • 终止操作分类:终止操作的结束意味着 Stream 流操作结束。根据操作是否需要遍历流中的所有元素,可以将终止操作划分为短路操作与非短路操作。
  • 非短路操作:
    • forEach(Consumer<T> action):在并行操作下会打乱原顺序。
    • forEachOrdered(Consumer<T> action):遍历流中的元素,若流中的元素原本具有顺序,则按照原顺序排序。
    • collect(Collector col):将流中的最终数据收集到集合中。其参数为 Collector 收集器,通过 Collectors 的静态方法 toList()、toSet()等可以获取到 Collector 对象。
    • toArray():将流中的最终数据收集到数组中。
    • count():统计流中的元素个数。
    • reduce(BinaryOperator<T> bo):该方法的作用是将集合流最终转换为一个指定类型的数据。其参数为二元接口 BinaryOperator,即两个输入一个输出,且类型相同。由两个输入最终变为了一个输出,就达到了缩减 reduce 的效果了。
      • reduce()方法还有一个包含两个参数的重载方法,其中第一个参数为默认值,即在流中元素为空时所使用的值。该方法可以避免 NoSuchElementException 异常的抛出。
    • max(Comparator com):在流中找出最大值。
    • min(Comparator com):在流中找出最小值。
  • 短路操作:
    • allMatch(Predicate p):用于判断是否所有元素都符合指定的条件,只要有一个不符合,马上结束匹配并返回false。只有所有都匹配上了才会返回 true。
    • anyMatch(Predicate p):用于判断是否存在符合条件的元素,只要找到一个符合的元素,马上结束匹配并返回true。
    • noneMatch(Predicate p):用于判断是否全部不符合条件,只要找到一个符合的,马上返回 false。只有所有都不符合才会返回 true。
    • findFirst()与findAny():findFirst()与 findAny()方法均会将从流中查找到的元素封装到 Optional 容器对象中。若没有找到,则 Optional 容器对象中的值会为空,Optional 对象的 isPresent()方法返回值会为 false。
public class TerminateOperationTest {
    private static final String WORDS = "Shenzhen is a vibrant city";

    // 对于流的并行操作,forEach()处理结果是无序的
    @Test
    public void test01() {
        Stream.of((WORDS.split(" ")))
                .parallel()
                .forEach(System.out::println);
    }

    // 对于流的并行操作,forEachOrdered()处理结果是无序的
    @Test
    public void test02() {
        Stream.of((WORDS.split(" ")))
                .parallel()
                .forEachOrdered(System.out::println);
    }

    // 将流中的元素收集到一个集合中
    @Test
    public void test03() {
        List<String> list = Stream.of((WORDS.split(" ")))
                .collect(Collectors.toList());
        System.out.println(list);   // [Shenzhen, is, a, vibrant, city]
    }

    // 将流中的元素收集到一个数组中
    @Test
    public void test04() {
        Object[] array = Stream.of((WORDS.split(" ")))
                .toArray();
        System.out.println(Arrays.toString(array)); // [Shenzhen, is, a, vibrant, city]
    }

    // 统计流中元素的个数
    @Test
    public void test05() {
        long count = Stream.of((WORDS.split(" "))).count();
        System.out.println(count); // 5
    }

    // 计算流中所有单词的长度之和
    @Test
    public void test06() {
        Optional<Integer> reduce = Stream.of((WORDS.split(" ")))
                .map(String::length)
                .reduce(Integer::sum);
        // Optional的get()方法在其封装的对象为空时会抛出异常
        System.out.println(reduce.get()); // 22
    }

    // 计算流中所有单词的长度之和
    @Test
    public void test07() {
        Optional<Integer> reduce = Stream.of((WORDS.split(" ")))
                .map(String::length)
//                .filter(n -> n > 200)
                .reduce(Integer::sum);
        // Optional的orElse()方法在正常情况下会返回正常的值
        // 当其封装的对象为空时会返回参数指定的值
        System.out.println(reduce.orElse(-1));
//        System.out.println(reduce);
    }

    // 将流中的元素使用逗号相连接
    @Test
    public void test08() {
        String reduce = Stream.of(WORDS.split(" "))
                .reduce(null, (s1, s2) -> s1 + ", " + s2);
        System.out.println(reduce); // null, Shenzhen, is, a, vibrant, city
    }

    // 从流中找出长度最长的那个单词
    @Test
    public void test09() {
        String max = Stream.of(WORDS.split(" "))
//                .filter(s -> s.length() > 200)
                .max((s1, s2) -> s1.length() - s2.length())
                // 等价于
//                .min((s1, s2) -> s2.length() - s1.length())
                .orElse("当前流中没有元素");
        System.out.println(max);    // Shenzhen
    }

    // 从流中找出长度最短的那个单词
    @Test
    public void test10() {
        String max = Stream.of(WORDS.split(" "))
//                .filter(s -> s.length() > 200)
                .max((s1, s2) -> s2.length() - s1.length())
                // 等价于
//                .min((s1, s2) -> s1.length() - s2.length())
                .orElse("当前流中没有元素");
        System.out.println(max);    // a
    }

    // 判断所有单词长度是否大于3
    // allMatch():只要找到一个不符合条件的元素马上结束匹配工作
    @Test
    public void test11() {
        boolean allMatch = Stream.of(WORDS.split(" "))
                .allMatch(word -> word.length() > 3);
        System.out.println(allMatch);   // false
    }

    // 判断是否存在单词长度大于3的单词
    // anyMatch():只要找到一个符合条件的元素马上结束匹配工作,返回true
    @Test
    public void test12() {
        boolean anyMatch = Stream.of(WORDS.split(" "))
                .anyMatch(word -> word.length() > 3);
        System.out.println(anyMatch);   // true
    }

    // 判断是否不存在单词长度大于3的单词
    // noneMatch():只要找到一个符合条件的元素马上结束匹配工作,返回false
    @Test
    public void test13() {
        boolean noneMatch = Stream.of(WORDS.split(" "))
                .noneMatch(word -> word.length() > 3);
        System.out.println(noneMatch);   // false
    }

    // findFirst():只要找到第一个元素,马上结束查找
    @Test
     public void test14() {
        String ele = Stream.of(WORDS.split(" "))
                .findFirst()
                .orElse("这里没有一个元素");
        System.out.println(ele);    // Shenzhen
    }

    // findAny():只要找到任何一个元素,马上结束查找
    @Test
     public void test15() {
        String ele = Stream.of(WORDS.split(" "))
                .findAny()
                .orElse("这里没有一个元素");
        System.out.println(ele);    // Shenzhen
    }
}

收集器

  • Stream 的 collect()方法可以将流中的最终数据收集到集合中,其为一个非短路终止操作。其参数为 Collector 收集器,一般都是通过调用 Collectors 的静态方法获取收集器对象的。不过,收集器的功能远比将数据收集到集合要强大的多,还可以对收集的数据进行统计、分组等操作。根据不同的需求调用 Collectors 的不同静态方法,获取到不同的 Collector 收集器。
  • 流转集合:可以将流中的元素转换为 List 或 Set 集合。
    • toList():将流中的元素收集为一个 List。
    • toSet():将流中的元素收集为一个 Set。
    • toCollection(Supplier sup):收集器默认使用的是无序的 HashSet,若要指定使用有序的 TreeSet,则可通过toCollection()方法指定。
  • 分组:
    • 仅分组:分组是指,按照流中指定的元素的属性值进行分组。
    • 分组后统计:统计是对元素个数的统计,所以无需指定属性用于统计。
  • 布尔分块:
    • 仅分块:布尔分块是按照指定断言的 true 与 false 结果进行分组,其只会划分为两组,且 key 只能是 true 或 false。
    • 分块后求平均值:分块后,对两块内容按照指定的属性求平均值。
  • 获取汇总统计数据:汇总统计数据指的是对流中某一数据汇总后所统计出的最大值、最小值、平均值等数据。该方法只能适用于数值型数据统计。
public class CollectorsTest {
    private List<Student> students;

    @Before
    public void before() {
        students = Arrays.asList(
                new Student("周零", "清华大学", "男", 20, 95.5),
                new Student("郑一", "清华大学", "女", 21, 98.5),
                new Student("吴二", "北京大学", "男", 22, 91.0),
                new Student("张三", "复旦大学", "男", 23, 90.5),
                new Student("李四", "清华大学", "女", 21, 92.0),
                new Student("王五", "北京大学", "男", 23, 93.5),
                new Student("赵六", "清华大学", "男", 22, 96.5),
                new Student("钱七", "复旦大学", "女", 20, 97.5),
                new Student("秦八", "复旦大学", "男", 21, 99.0),
                new Student("段九", "北京大学", "男", 20, 92.5)
        );
    }

    // 获取所有学生姓名列表
    @Test
    public void test01() {
        List<String> names = students.stream().map(Student::getName)
                .collect(Collectors.toList());
        System.out.println(names);
    }

    // 获取所有学校名单
    @Test
    public void test02() {
        Set<String> schools = students.stream().map(Student::getSchool)
                .collect(Collectors.toSet());
        // 或者
//        Set<String> schools = students.stream()
//                .map(Student::getSchool)
//                .collect(Collectors.toCollection(TreeSet::new));
        System.out.println(schools);
    }

    // 获取各个学校的学生(按照学校进行分组)
    @Test
    public void test03() {
        Map<String, List<Student>> schoolGroup = students.stream()
                .collect(Collectors.groupingBy(Student::getSchool));
        // 显示格式可读性很差
        System.out.println(schoolGroup);

        // 使用工具类显示map中的Key-Value
        MapUtils.verbosePrint(System.out, "学校", schoolGroup);

        // 获取key为“清华大学”的value,即获取所有清华大学的选手
        System.out.println(schoolGroup.get("清华大学"));
    }

    // 统计各个学校人数
    @Test
    public void test04() {
        Map<String, Long> schoolCount = students.stream()
                .collect(Collectors.groupingBy(Student::getSchool, Collectors.counting()));
        System.out.println(schoolCount);
        // 获取清华大学人数
        System.out.println(schoolCount.get("清华大学"));
    }

    // 按照性别分组
    @Test
    public void test05() {
        Map<String, List<Student>> genderGroup = students.stream()
                .collect(Collectors.groupingBy(Student::getGender));
        MapUtils.verbosePrint(System.out, "性别", genderGroup);
        // 获取所有男生
        System.out.println(genderGroup.get("男"));
    }

    // 按照性别分组
    @Test
    public void test06() {
        Map<Boolean, List<Student>> genderGroup = students.stream()
                .collect(Collectors.partitioningBy(s -> "男".equals(s.getGender())));
        MapUtils.verbosePrint(System.out, "性别", genderGroup);

        // 获取所有男生
        System.out.println(genderGroup.get(true));
    }

    // 以95为标准按照成绩将所有参赛选手分组,分为大于95的组及不大于95的组
    @Test
    public void test07() {
        Map<Boolean, List<Student>> genderGroup = students.stream()
                .collect(Collectors.partitioningBy(s -> s.getScore() > 95));
        MapUtils.verbosePrint(System.out, "95划分成绩", genderGroup);
        // 获取所有成绩大于95的学生
        System.out.println(genderGroup.get(true));
    }

    // 以95为标准按照成绩将所有参赛选手分组,分为大于95的组及不大于95的组
    // 对这两组分别计算其平均分
    @Test
    public void test08() {
        Map<Boolean, Double> scoreGroupAvg = students.stream()
                .collect(Collectors.partitioningBy(s -> s.getScore() > 95, Collectors.averagingDouble(Student::getScore)));
        System.out.println(scoreGroupAvg);
        // 获取所有成绩大于95的所有学生的平均分
        System.out.println(scoreGroupAvg.get(true));
    }

    // 获取成绩相关的统计数据
    @Test
    public void test09() {
        DoubleSummaryStatistics scoreSummary = students.stream()
                .collect(Collectors.summarizingDouble(Student::getScore));
        System.out.println(scoreSummary);
        // 输出成绩的数量
        System.out.println("成绩个数:" + scoreSummary.getCount());
        // 输出成绩中的最大值
        System.out.println("最大成绩:" + scoreSummary.getMax());
    }
}
Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐