Java Stream API
最近在公司实习,发现项目中使用了很多Stream流。当时在学Java的时候并没有很深入的去了解,只是浅浅的看了一下,后面也很少去用(当时以为这个东西没什么用)。
现在通过一些视频(哔哩哔哩_bilibili@AlbertShen,讲的很详细,视频有点长,但是质量绝对好,是我看过最有水平的视频,大家可以去支持一下)和文档,总结出了这篇文章。
一.介绍
Stream API通过声明式的方式处理数据集合,比如列表、数组等,还能有效利用多核处理器进行并行操作,使得代码简洁高效。
下面举一个例子来直观感受一下。
我们要统计年龄大于等于18的学生:
List<Student> students=List.of(new Student(16,1),new Student(17,2),new Student(18,3),new Student(19,4)
);
List<Student> adults=new ArrayList<>();
for(Student student:students){if(student.getAge()>=18){adults.add(student);}
}
使用Stream API:
List<Student> students=List.of(new Student(16,1),new Student(17,2),new Student(18,3),new Student(19,4)
);
List<Student> adults= students.stream().filter(student -> student.getAge()>=18).collect(Collectors.toList());
Stream 流主要有三个步骤:
1)创建流
stream 流本身并不是数据结构,当我们创建一个stream流时,实际的数据仍然存储在原始的数据结构中。
2)中间操作
用于对流中的元素进行操作,每次调用中间操作时,都会创建一个新的流。中间操作是惰性执行,直到遇到终端操作才会真正执行。
3)终端操作
终端操作是整个流处理的实际执行部分,它会触发之前定义的中间操作,并生成最终结果。执行终端操作后,流中元素会被消费,流就不能再使用了。
二.创建流
1.从集合创建流
List<String> list = List.of("a", "b", "c");
Stream<String> stream = list.stream();
2.从数组创建流
String[] arr={"a","b","c"};
Stream<String> stream = Arrays.stream(arr);
3.使用Stream.of()
Stream<String> stream = Stream.of("a","b","c");
4.流的合并
Stream<String> stream1 = Stream.of("a","b","c");
Stream<String> stream2 = Stream.of("d","e","f");
Stream<String> concat = Stream.concat(stream1, stream2);
5.使用Stream.builder()
Stream.Builder<String> streamBuilder = Stream.builder();
streamBuilder.add("a");
streamBuilder.add("b");
Stream<String> stream = streamBuilder.build();
注意一旦进行build操作,就不能再往builder中添加元素了。
6.从文件创建流
Path path = Paths.get("file.txt");
try(Stream<String> lines= Files.lines(path)){lines.forEach(System.out::println);
} catch (IOException e){e.getStackTrace();
}
7.基础类型流创建
//基础类型流
IntStream intStream = IntStream.of(1, 2, 3);
IntStream intStream = IntStream.range(1, 4); [1,4)
IntStream intStream = IntStream.rangeClosed(1, 4); [1,4]
//对象流
Stream<Integer> stream = intStream.boxed();
8.无限流创建
generate 实现重复的值或随机数据:
//重复数据
Stream<String> constantStream = Stream.generate(() -> "hello").limit(5);
//随机数据
Stream<Double> limit = Stream.generate(Math::random).limit(5);
使用limit来限制输出的数量,防止无限循环的发生。
iterate 实现数学序列或迭代算法:
Stream<Integer> interateStream = Stream.iterate(0, n -> n + 2).limit(5);
三.中间操作
1.筛选和切片
用于过滤或缩减流中的元素数量。
1)filter 根据条件筛选元素:
students.stream().filter(student -> student.getAge() >= 18).forEach(System.out::println);
2)distinct 筛选重复的元素:
Stream.of("a","b","a","c").distinct().forEach(System.out::println);
这里注意,distinct的底层是基于元素的hashCode()和equals()来判断元素是否重复,如果要去重自定义的对象的话,要重写这两个方法。
3)limit 截取流中指定的前几个元素:
Stream.of("a","b","a","c").distinct().limit(2).forEach(System.out::println);
4)skip 跳过流中指定的前几个元素:
Stream.of("a","b","a","c").distinct().skip(2).forEach(System.out::println);
2.映射
转换流中的元素或提取元素的特定属性。
1)map 适用于单层结构的里,用于元素一对一转换:
Stream<Student> studentStream = students.stream();
Stream<Integer> idStream = studentStream.map(student -> student.getId());
2)flatMap 可以扁平化数据结构,将它们转换合并成一个单层流。具体步骤是这样的,先将多层数据创建流,再将每个数据结构转换成独立的流,最后将这些独立的流合并组成单层流。
List<List<Student>> studentGroups = List.of(List.of(new Student(16, 1),new Student(17, 2)),List.of(new Student(18, 3),new Student(19, 4))
);
Stream<List<Student>> studentGroupsStream = studentGroups.stream();
Stream<Student> studentStream = studentGroupsStream.flatMap(stduent -> stduent.stream());
3)mapToInt 转化成基础类型流:
IntStream idStream = students.stream().mapToInt(student -> student.getId());
3.排序
对流中的元素进行排序。
1)sorted 排序:
Stream.of("b","c","a").sorted().forEach(System.out::println);
四.终端操作
1.查找与匹配
属于短路操作,在找到符合条件的元素后,会立即返回结果。
1)anyMatch 有一个满足条件的就返回结果:
boolean match = students.stream().anyMatch(student -> student.getAge() >= 18);
2)allMatch 全部满足结果返回true:
boolean match = students.stream().allMatch(student -> student.getAge() >= 18);
3)findFirst 返回第一个元素:
Optional<Student> optionalStudent = students.stream().findFirst();
2.聚合操作
学过数据库的都知道,数据库SQL中有一些聚合函数,sum,max这些,Stream API也一样。
但是要注意,使用sum或average的时候,流要是基本类型流,不能是对象流。
这里就简单的介绍一个 count 吧:
long count = students.stream().count();
3.规约操作
上面的聚合操作本质上是规约操作的一种特殊形式。规约操作通过一个自定义的累加器函数对流中的所有元素进行迭代处理。
使用 reduce 实现 sum 的功能:
Stream<Integer> ageStream = students.stream().map(student -> student.getAge());
Integer sum = ageStream.reduce(0, (a, b) -> a + b);
进行字符串拼接:
String joined=students.stream().map(student -> student.getName()).reduce("",(a,b)->a+b+",");
4.收集操作
把流中处理完的数据汇集到新的数据结构中。
List<Student> adults = students.stream().filter(student -> student.getAge() >= 18).collect(Collectors.toList());
Map<Integer, Integer> adults = students.stream().filter(student -> student.getAge() >= 18).collect(Collectors.toMap(student -> student.getId(),student -> student.getAge()));
根据给出的条件进行分组:
Map<Integer, List<Student>> collect = students.stream().collect(Collectors.groupingBy(student -> student.getClassId()));
根据给出的条件进行分区:
Map<Boolean, List<Student>> collect = students.stream().collect(Collectors.partitioningBy(student -> student.getAge()>=18));
进行字符串拼接:
String join = students.stream().map(student -> student.getName()).collect(Collectors.joining(","));
对数据进行统计:
IntSummaryStatistics ageSummary = students.stream().collect(Collectors.summarizingInt(student -> student.getAge()));
System.out.println(ageSummary.getAverage());
System.out.println(ageSummary.getMax());
5.迭代操作
forEach 打印元素:
students.stream().forEach(item-> System.out.println(item));
五.并行流
1.介绍
并行流,Parallel Streams,借助多核处理器的并行计算能力加速数据处理,特别适合大型数据集或计算密集型任务。
并行流在开始时,分割迭代器会将数据分割成多个片段,分割过程通常采用递归的方式动态进行,以平衡子任务的工作负载,提高资源利用率。然后Fork/Join框架会将这些数据片段分配到多个线程和处理器核心上进行并行处理。处理完成后,结果将会被汇总合并,其核心是任务的分解Fork和结果的合并Join。
在处理上顺序流和并行流的中间操作和终端操作相同,这里就不过多赘述了。
2.并行流的顺序问题
并行流采用多线程并发处理,可以同时在多个核心上处理数据,但是不能保证元素的处理顺序。
1)迭代操作
如果使用forEach,无法保证输出的元素是有序的:
List.of("a","b","c","d").parallelStream().forEach(System.out::println);
使用下面方法可以保证输出的顺序性:
List.of("a","b","c","d").parallelStream().forEachOrdered(System.out::println);
为什么 forEachOrdered 可以保证输出的顺序性?
在处理数据时,对于有序数据源,比如List,分割迭代器会对数据源进行递归分割。分割通过划分数据源的索引范围来实现。每次分割都会生成新的分割迭代器实例,该实例内部维护了指向原数据的索引范围。这种分割机制可以让数据的出现顺序得以保证。
然后Fork/Join框架将分配后的数据块分配给不同的子任务执行,框架根据分割迭代器维护的顺序信息来调度方法的执行顺序。
2)收集操作
使用 collect 收集:
List<String> collect = List.of("a", "b", "c", "d").parallelStream().collect(Collectors.toList());
为什么 collect 可以保证输出的顺序性?
这个跟 forEachOrdered 有点相似,同样是依赖于分割迭代器和Fork/Join框架,只不过最后多了合并和收集操作。
每个并行执行的任务在完成处理后,会将其结果存储到一个临时数据结构中。Fork/Join框架会利用分割迭代器提供的区段顺序信息,引导这些临时结果按顺序合并。
3.并行流和顺序流一致性问题
实际上,通过系统内部精确的执行策略,绝大多数终端操作都能够保证一致性。
然而并非所有操作都能保证一致性,比如forEach和某些形式的 reduce。
reduce 是否能保证一致性,取决于使用的操作是否关联,如果操作不是关联的,那么结果可能就不一致。
举个例子:
Integer reduce = List.of(1, 2, 3, 4, 5).parallelStream().reduce(0, (a, b) -> a + b);
System.out.println(reduce);
减法操作就不是关联性的,像加法乘法是关联性的。