流处理

流处理是Java程序中一种重要的数据处理手段,它用少量的代码就可以执行复杂的数据过滤、映射、查找和收集等功能。

流处理的中间操作和最终操作的入参基本都是Lambda表达式。 Lambda - 专业剑 - Confluence

流处理的缺点是代码的可读性不高。

什么是Stream流?


Stream流不是数据结构,也不保存数据。它是在原数据集上定义了一组操作

因为不保存数据,所以每个Stream流只能使用一次。

看源码,本质上是一个接口interface。

 

为什么需要Stream流?

对于一些常用的针对数组的操作,JDK库提供了封装好的工具类API,以Stream流的方式。

即:数组转换为Stream流,调用Java提供的默认的Stream流的方法,比如求和,去重,过滤等等。

优点是可以调用JDK封装好的API。支持并发。在多核情况下,可以使用并行Stream API来发挥多核优势。在单核的情况下,我们自己写的for性能不比Stream API 差多少。

缺点是Stream流不好调试。

 

使用Stream流可以更清楚地知道我们需要对数据集做什么操作,可读性强。可以轻松获取并行Stream流,而不用自己编写多线程代码,从而更专注于业务逻辑。

 

Stream流和Lambda表达式有什么关系?

Stream流的中间操作和最终操作的入参基本都是Lambda表达式。

 

如何使用Stream流?

Java8的Stream流详解_长风-CSDN博客_java stream流操作

手把手带你体验Stream流_编码博客控的博客-CSDN博客

使用Stream流分为三步:

1)创建Stream流;

2)通过Stream流对象执行中间操作

3)执行最终操作,得到结果;

例子:

int[] nums = {1, 2, 3};
int sum = IntStream.of(nums).sum();
log.info("sum: " + sum);

 

1)创建Stream流

1.Collection接口的stream()或parallelStream()方法

2.静态的Stream.of()、Stream.empty()方法

3.Arrays.stream(array, from, to)

4.静态的Stream.generate()方法生成无限流,接受一个不包含引元的函数

5.静态的Stream.iterate()方法生成无限流,接受一个种子值以及一个迭代函数

6.Pattern接口的splitAsStream(input)方法

7.静态的Files.lines(path)、Files.lines(path, charSet)方法

8.静态的Stream.concat()方法将两个流连接起来

 

例子:

List<String> stringList = Arrays.asList("A", "", "B", "abc");

stringList.stream(); // 从集合创建,Collection类提供了stream方法

 

IntStream intStream = Arrays.stream(new int[] {1, 2, 3}); // 从数组创建

 

intStream = IntStream.of(1, 2, 3); // 直接创建数字流

 

intStream = new Random().ints().limit(10); // 使用random创建

 

Java8的Stream流详解_长风-CSDN博客_java stream流操作

默认情况下,从有序集合、生成器、迭代器产生的Stream流,或者通过Stream.sorted()产生的流都是有序流有序流在并行处理完成后都会恢复原顺序。

unordered()方法可以解除有序流的顺序限制,更好地发挥并行处理的性能优势,例如distinct将保存任意一个唯一元素而不是第一个,limit将保留任意n个元素而不是前n个。

 

2)执行中间操作Intermediate

中间操作是对Stream流的数据的加工。注意,如果不执行最终操作,中间操作不会执行。

 

对Stream流执行中间操作返回的仍然是Stream流。所以可以多个中间操作叠加。

常见的中间操作包括filter,map,peek等。

 

其中peek操作主要用于debug,比如打印中间操作过程的结果。

Stream<T> peek(Consumer<? super T> action)

如果数据的类型T是String类型,对数据进行操作并不会改变最终结果。

比如:Stream.of(“one”, “two”, “three”).peek(s -> s.toUpperCase()).forEach(System.out::println);

而如果是对象类型,则会改变。

 

这些中间操作和最终操作的参数基本都是Lambda表达式。

 

1.filter(Predicate)

返回一个满足指定条件的流,将结果为false的元素过滤掉。

String str1 = "Hello My World.";
Stream.of(str1.split(" "))

.filter(s -> s.length() > 2)

.map(s -> s.length())

.forEach(System.out::println);

 

2.map(Function<? Super T, ? extends R> mapper)

转换元素的值,可以用方法引元或者lambda表达式。

对流中的元素调用mapper方法,产生包含这些元素的一个新的流。

返回值<R> Stream<R>

3.flatMap(function)

若元素是流,将流摊平为正常元素,再进行元素转换。

4.limit(n)

保留前n个元素。

5.skip(n)

去除前n个元素。

6.distinct()

去除重复元素。

7.sorted()

将Comparable元素的流排序。

8.sorted(Comparator)

将流元素按Comparator排序。

9.peek(function)

流不变,但会把每个元素传入fun执行,可以用作调试。

 

例子:

String str1 = "Hello My World.";
Stream.of(str1.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);

 

 

3)执行最终操作Terminal

最终操作是对Stream流的启动操作。

最终操作返回的不再是Stream流对象。

最终操作只能有一个。

 

约简操作

1.max(Comparator<? Super T> comparator)

根据指定比较器规则,获取流中最大元素。

2.min(Comparator)

 

3.count()

返回流中元素个数。

4.findFirst()

返回第一个元素。

5.findAny()

返回任意元素。

6.anyMatch(Predicate)

任意元素匹配时返回true

7.allMatch(Predicate)

所有元素匹配时返回true。

8.noneMatch(Predicate)

没有元素匹配时返回true。

9.reduce(fun)

从流中计算某个值,接受一个二元函数作为累积器,从前两个元素开始持续应用它,累积器的中间结果作为第一个参数,流元素作为第二个参数。

10.reduce(a, fun)

a为幺元值,作为累积器的起点。

11.reduce(a, fun1, fun2)

与二元变形类似,并发操作中,当累积器的第一个参数与第二个参数都为流元素类型时,可以对各个中间结果也应用累积器进行合并,但是当累积器的第一个参数不是流元素类型而是类型T的时候,各个中间结果也为类型T,需要fun2来将各个中间结果进行合并。

 

收集操作

1.iterator()

2.forEach(Consumer<? Super T> action)

遍历流中的每一个元素,执行action动作。

顺序执行还是并行执行,取决于流是串行还是并行。

3.forEachOrdered(fun)

可以应用在并行流上以保持元素顺序。

4.toArray()

5.toArray(T[] :: new)

返回正确的元素类型

6.collect(Collector)

7.collect(fun1, fun2, fun3)

fun1转换流元素;fun2为累积器,将fun1的转换结果累积起来;fun3为组合器,将并行处理过程中累积器的各个结果组合起来

 

以上的操作可以分为3类

数据过滤:filter(), distinct(), limit(), skip()

数据映射:map()

数据查找:allMatch(), anyMatch(), nonmatch(), findFirst()

 

问题

Stream流的这些函数是并行还是串行执行?

乐字节-Java8核心特性实战之Stream(流) - 简书 (jianshu.com)

在 Java 8 中, 集合接口有两个方法来生成流:

stream() − 为集合创建串行流。

parallelStream() − 为集合创建并行流。

所以看创建的是串行流还是并行流。

 

性能比较

Stream并行流详解_我是七月呀的博客-CSDN博客_stream并行流

1)基本类型

性能消耗: Stream串行>for循环>Stream并行

2)对象

性能消耗:Stream串行>for循环>Stream并行

3)复杂对象

性能消耗:for循环>Stream串行>Stream并行

 

Stream流的这些函数是阻塞的吗?

有阻塞作用,只有当最终命令执行时才会一起链式执行,这得益于Lamba的阻塞作用。

常见问题

1.不使用终端操作

错误:忘记调用终端操作(如collect()forEach()reduce()),这会导致流没有执行。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");

    // 创建流但没有调用终端操作
    names.stream()
    .filter(name -> name.startsWith("A")); // 这里没有调用终端操作

    // 由于流没有执行,什么都不会打印
    System.out.println("Stream operations have not been executed.");
}

解决方案:始终以终端操作结束,以触发流的处理。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");

    // 创建流并调用终端操作
    names.stream()
    .filter(name -> name.startsWith("A")) // 中间操作
    .forEach(System.out::println); // 终端操作

    // 这将打印 "Alice",因为流被执行了
}

2.修改源数据

错误:在处理流时修改源数据结构(如List)可能导致未知的结果。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
    // 尝试在流处理时修改源列表
    names.stream()
    .filter(name -> {
        if (name.startsWith("B")) {
            names.remove(name); // 修改源列表
        }
        return true;
    })
    .forEach(System.out::println);
    // 由于并发修改,输出可能不符合预期
    System.out.println("Remaining names: " + names);
}

解决方案:不要在流操作期间修改源数据,而是使用流创建新的集合。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
    // 基于过滤结果创建一个新列表
    List<String> filteredNames = names.stream()
    .filter(name -> name.startsWith("B")) // 过滤出以 'B' 开头的名字
    .collect(Collectors.toList());
    // 显示过滤后的列表
    System.out.println("Filtered names: " + filteredNames);
    System.out.println("Original names remain unchanged: " + names);
}

3.忽略并行流的开销

错误:认为并行流总是能提高性能,而不考虑上下文,例如小数据集或轻量级操作。

public static void main(String[] args) {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); // 小数据集
    // 在小数据集上使用并行流
    numbers.parallelStream()
    .map(n -> {
        // 模拟轻量级操作
        System.out.println(Thread.currentThread().getName() + " processing: " + n);
        return n * n;
    })
    .forEach(System.out::println);
    // 输出可能显示为简单任务创建了不必要的线程
}


解决方案:谨慎使用并行流,尤其是对于大数据集的 CPU 密集型任务。

public static void main(String[] args) {
    List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000) // 大数据集
    .boxed()
    .collect(Collectors.toList());
    // 在大数据集上使用并行流进行 CPU 密集型操作
    List<Integer> squareNumbers = numbers.parallelStream()
    .map(n -> {
        // 模拟 CPU 密集型操作
        return n * n;
    })
    .collect(Collectors.toList());
    // 打印前 10 个结果
    System.out.println("First 10 squared numbers: " + squareNumbers.subList(0, 10));
}


4.过度使用中间操作

错误:链式调用过多的中间操作(如filter()map())可能会引入性能开销。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");

    // 过度使用中间操作
    List<String> result = names.stream()
    .filter(name -> name.startsWith("A")) // 第一个中间操作
    .filter(name -> name.length() > 3) // 第二个中间操作
    .map(String::toUpperCase) // 第三个中间操作
    .map(name -> name + " is a name") // 第四个中间操作
    .toList(); // 终端操作

    // 输出结果
    System.out.println(result);
}


解决方案:尽量减少流管道中的中间操作,并在可能的情况下使用流融合。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");

    // 优化流管道
    List<String> result = names.stream()
    .filter(name -> name.startsWith("A") && name.length() > 3) // 将过滤器合并为一个
    .map(name -> name.toUpperCase() + " is a name") // 合并 map 操作
    .toList(); // 终端操作

    // 输出结果
    System.out.println(result);
}


5.不处理 Optional 值

错误:在使用findFirst()reduce()等操作时,没有正确处理Optional结果。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
    // 尝试查找以 "Z" 开头的名字(不存在)
    String firstNameStartingWithZ = names.stream()
    .filter(name -> name.startsWith("Z")) 
    .findFirst() // 返回一个 Optional
    .get(); // 如果 Optional 为空,这将抛出 NoSuchElementException
    // 输出结果
    System.out.println(firstNameStartingWithZ);
}


解决方案:在访问Optional的值之前,始终检查它是否存在,以避免NoSuchElementException

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
    // 正确处理 Optional
    Optional<String> firstNameStartingWithZ = names.stream()
    .filter(name -> name.startsWith("Z")) 
    .findFirst(); // 返回一个 Optional
    // 检查 Optional 是否存在
    if (firstNameStartingWithZ.isPresent()) {
        System.out.println(firstNameStartingWithZ.get());
    } else {
        System.out.println("No name starts with 'Z'");
    }
}


6.忽略线程安全

错误:在并行流中使用共享的可变状态可能导致竞态条件和不一致的结果。

public static void main(String[] args) {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    List<Integer> results = new ArrayList<>(); // 共享的可变状态
    // 在并行流中使用共享的可变状态
    numbers.parallelStream().forEach(number -> {
        results.add(number * 2); // 这可能导致竞态条件
    });
    // 输出结果
    System.out.println("Results: " + results);
}


解决方案:避免共享可变状态;使用线程安全的集合或局部变量。

public static void main(String[] args) {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    List<Integer> results = new CopyOnWriteArrayList<>(); // 线程安全的集合
    // 在并行流中使用线程安全的集合
    numbers.parallelStream().forEach(number -> {
        results.add(number * 2); // 避免竞态条件
    });
    // 输出结果
    System.out.println("Results: " + results);
}


7.混淆中间操作和终端操作

错误:不清楚中间操作(返回新流)和终端操作(产生结果)之间的区别。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
    // 错误:尝试将中间操作用作终端操作
    // 这将无法编译,因为 'filter' 返回一个 Stream,而不是一个 List
    names.stream().filter(name -> name.startsWith("A")).forEach(System.out::println); // 这里正确使用了终端操作
}


解决方案:熟悉每种操作类型的特性,以避免代码中的逻辑错误。

public static void main(String[] args) {
    List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
    // 正确使用中间操作和终端操作
    List<String> filteredNames = names.stream()
        .filter(name -> name.startsWith("A")) // 中间操作
        .collect(Collectors.toList()); // 终端操作

    // 输出过滤后的名字
    System.out.println("Filtered Names: " + filteredNames);
}




Attachments: