Java 声明式编程- Stream API 实战
Java 8 引入的 Stream API 极大地提升了集合操作的表达力。借助 函数式编程 与 声明式风格,我们能够更简洁、更直观地处理集合数据。本文从 Java Stream API 的核心概念 出发,结合代码示例逐步讲解:流的创建、流的中间操作与终端操作、并行流、多样化收集器、声明式编程风格,帮助读者掌握 Stream 的使用。
一、Stream 的三大核心组成
一个完整的流式处理一般包括三部分:
- 数据源:集合、数组、文件、IO 通道等;
- 中间操作:返回 Stream 本身,用于加工数据(
filter
、map
、flatMap
、distinct
、limit
、sorted
等); - 终端操作:触发流执行,返回具体结果(
forEach
、collect
、count
、max
、min
等)。
示例:
list.stream() // 数据流.filter(i -> i % 2 == 0) // 中间操作:筛选偶数.max(Integer::compareTo) // 终端操作:取最大值.ifPresent(System.out::println); // 输出结果
二、流的创建方式
Stream 的数据源非常灵活,常见方式包括:
// 1. 直接创建
Stream<Integer> stream = Stream.of(1, 2, 3);// 2. 拼接
Stream<Integer> concat = Stream.concat(Stream.of(1, 2), Stream.of(3, 4));// 3. 使用 builder 构建
Stream<Object> build = Stream.builder().add("11").add("22").build();// 4. 从集合中获取
List<Integer> list = List.of(1, 2);
Stream<Integer> stream1 = list.stream();Set<Integer> set = Set.of(1, 2);
Stream<Integer> stream2 = set.stream();Map<String, Integer> map = Map.of("a", 1, "b", 2);
map.keySet().stream();
map.values().stream();
几乎所有集合类都支持 .stream()
方法,方便快捷。
三、并行流与线程模型
默认情况下,Stream 是顺序执行的。我们也可以使用 并行流 来加速数据处理:
long count = Stream.of(1, 2, 3, 4, 5).parallel() // 开启并行流.filter(i -> {System.out.println("当前线程:" + Thread.currentThread());return i > 2;}).count();
System.out.println(count);
并行流原理:底层基于 ForkJoinPool.commonPool,把数据拆分后并行处理,再合并结果。
⚠️ 注意:并行流需要关注 线程安全 和 任务拆分开销,适用于 CPU 密集型场景,不适合所有情况。
四、常见中间操作示例
在 StreamDemo
中,我们演示了多种流操作(完整代码附在文末):
1. filter
:条件过滤
List<Integer> collect = List.of(1, 2, 3, 4, 5, 6).stream().filter(i -> i > 2).collect(Collectors.toList());
System.out.println(collect); // [3, 4, 5, 6]
2. takeWhile
:满足条件则保留,否则终止流
List<Integer> collect1 = List.of(1, 2, 3, 4, 5, 6).stream().takeWhile(i -> i > 2).collect(Collectors.toList());
System.out.println(collect1); // []
因为首元素 1
不满足条件,流立刻终止。
3. map
与 flatMap
personList.stream().map(Person::name) // 映射为姓名.flatMap(name -> Stream.of(name.split(" "))) // 拆分字符串.distinct() // 去重.forEach(System.out::println);
4. peek
:调试/监视
personList.stream().filter(p -> p.age() > 18).peek(p -> System.out.println("filter peek:" + p)).map(Person::name).peek(name -> System.out.println("map peek:" + name)).forEach(System.out::println);
五、分组与收集
Stream 的强大之处在于内置了丰富的收集器,最常见的就是 Collectors.groupingBy
:
Map<String, List<Person>> listMap = personList.stream().filter(person -> person.age() > 15).collect(Collectors.groupingBy(Person::gender));System.out.println(listMap);
输出结果:
{女=[Person[name=李 四, gender=女, age=19], Person[name=王 六, gender=女, age=21]], 男=[Person[name=张三, gender=男, age=18], Person[name=王 五, gender=男, age=20], Person[name=孙 七, gender=男, age=16]]}
六、Stream 与传统迭代的区别
- 传统 for 循环:命令式,开发者需要逐个控制迭代过程。
- Stream API:声明式,开发者只需声明“做什么”,至于“怎么做”交给框架。
模型差异:推 vs 拉
- 迭代器(拉模型):调用者主动拉取数据;
- Stream(推模型):上游有数据时自动推给下游。
这让 Stream 更适合函数式编程、响应式编程的场景。
七、完整示例代码
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;public class StreamDemo {public static void main(String[] args) {// 1. 挑出最大偶数List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// StreamAPI:// 1). 把数据封装成流;要到数据流;集合类.stream// 2). 定义流式操作// 3). 获取最终结果list.stream().filter(i -> i % 2 == 0).max(Integer::compareTo).ifPresent(System.out::println);// 流的三大部分:// 1. 数据流 2. N个中间操作 3. 一个最终操作// 1)、创建流Stream<Integer> stream = Stream.of(1, 2, 3);Stream<Integer> concat = Stream.concat(stream, Stream.of(4, 5, 6));Stream<Object> build = Stream.builder().add("11").add("22").build();// 2)、从集合容器中获取这个流,List Set MapList<Integer> integers = List.of(1, 2);Stream<Integer> stream1 = integers.stream();Set<Integer> integers1 = Set.of(1, 2);Stream<Integer> stream2 = integers1.stream();Map<Object, Object> objectMap = Map.of();objectMap.keySet().stream();objectMap.values().stream();System.out.println("--------------------------------------------------");System.out.println("主线程:" + Thread.currentThread());// 流是不是并发?和for有什么区别?// 流也是逐个处理,默认不并发,也可以并发// 并发后,要自行解决多线程安全问题// 推荐:流的所有操作都是无状态,数据状态仅在此函数内有效long count = Stream.of(1, 2, 3, 4, 5).parallel() // intermediate operation 并发流.filter(i -> {System.out.println("当前线程:" + Thread.currentThread());System.out.println("正在filter:" + i);return i > 2;}).count();System.out.println(count);System.out.println("--------------------------------------------------");List<Person> personList = List.of(new Person("张三", "男", 18),new Person("李 四", "女", 19),new Person("王 五", "男", 20),new Person("王 六", "女", 21),new Person("孙 七", "男", 16));// 声明式编程:基于事件机制的回调// 定义一系列的回调函数,回调函数并非程序员自己调用,而是发生这个事件的时候由系统进行调用// 1. 挑出年龄大于18的人// 拿到集合其实就是拿到集合的深拷贝的值,流的所有操作都是流的元素引用,不会改变集合的值personList.stream().limit(3) // 流只会从 personList 里取前 3 个元素参与后续处理。.filter(person -> person.age() > 18).peek(person -> System.out.println("filter peek:" + person)).map(person -> person.name()).peek(name -> System.out.println("map peek:" + name)).flatMap(name -> Stream.of(name.split(" "))).distinct()
// .limit(3)
// .sorted((o1, o2) -> o1.compareTo(o2)).forEach(System.out::println);System.out.println("--------------------------------------------------");List<Integer> collect = List.of(1, 2, 3, 4, 5, 6).stream().filter(i -> i > 2) // 无条件遍历流中的每一个元素.collect(Collectors.toList());System.out.println(collect);List<Integer> collect1 = List.of(1, 2, 3, 4, 5, 6).stream().takeWhile(i -> i > 2) // 当满足条件,拿到这个元素,不满足直接结束流操作.collect(Collectors.toList());System.out.println(collect1);// 数据是自流动的,而不是靠迭代被动流动// 推拉模型:// 推:流模式,上游有数据,自动推给下游// 拉:迭代器,自己遍历,自己拉取Map<String, List<Person>> listMap = personList.stream().filter(person -> person.age() > 15).collect(Collectors.groupingBy(Person::gender));System.out.println(listMap);// 让少量线程一直忙,而不是让大量线程一直切换/等待}public record Person(String name, String gender, Integer age) {}
}
八、总结
- Stream API 提供了声明式的数据处理方式,避免繁琐的 for 循环;
- 流分为 数据源 → 中间操作 → 终端操作 三部分;
- 支持 并行流,利用多核 CPU 提升性能;
Collectors
提供丰富的收集器,能方便地做分组、统计、聚合;- 与传统迭代相比,Stream 更贴近 函数式编程 和 响应式编程 思路。
借助 Stream,我们可以让 Java 代码更简洁、更优雅、更现代化。