Java22 stream 新特性 窗口算子 与 虚拟线程map操作:Gatherer 和 Gatherers工具类
gather()
是 Java 22 中引入的一个非常强大的新特性。
Gatherer 是什么
简单来说,Gatherer
是一个全新的、高度可定制的中间操作(Intermediate Operation)。它提供了一种比现有的 map
, filter
, flatMap
等操作更强大、更灵活的数据转换能力。
Gatherer
允许你实现 任意复杂的、有状态的、多对多(M-to-N) 的元素转换逻辑。
想象一下,流中的元素像传送带上的物品一个个流过来,Gatherer
就像一个在传送带旁边的复杂工作站。这个工作站可以:
- 查看一个或多个物品(元素),然后决定输出零个、一个或多个新物品。
map
只能一对一转换。filter
只能一对一或一对零转换。flatMap
是一对多,但是是无状态的。
- 维持自己的内部状态(State)。 比如,它可以记住之前看过的元素,然后根据历史信息来处理当前元素。
- 例如,实现
windowed
(窗口化)操作,将流中每 N 个元素分为一组。这必须记住当前窗口里已经收集了多少个元素。
- 例如,实现
- 在所有物品都处理完后,进行最终的清理或输出。
- 例如,一个
windowed
操作,在流末尾,即使最后一个窗口没满,也需要把这个不完整的窗口输出。
- 例如,一个
Gatherer
的四个核心组件:
Gatherer
的行为由四个函数式接口定义,这让它具备了极高的灵活性:
initializer()
: 状态初始化器。在处理第一个元素前调用,用于创建初始状态对象(比如,一个空的列表用于存放窗口元素)。integrator()
: 集成器/处理器。这是核心逻辑,每当一个新元素到来时,它会被调用。它接收当前状态、新元素,并决定是否更新状态、是否向下游输出新元素。combiner()
: 并行合并器。在并行流中,用于合并不同线程上的状态对象。finisher()
: 终结者。在所有元素处理完毕后调用,用于处理最终的状态,并可能输出最后的元素。
例子:Gatherers.windowFixed(3)
initializer
: 创建一个空的、大小为3的内部缓冲区(状态)。integrator
: 每来一个元素,就放入缓冲区。如果缓冲区满了,就把这个缓冲区包装成一个List
输出到下游,并清空缓冲区。finisher
: 当流结束时,如果缓冲区里还有元素(比如最后只剩1个或2个),就把这个不完整的缓冲区也包装成List
输出。
所以,Gatherer
的目标就是提供一个统一的、强大的接口,让开发者可以实现几乎任何自定义的流转换逻辑,而不仅仅局限于 map
/filter
等预设的操作。
使用场景
- 元素分组(如窗口函数)
- 去重连续相似元素
- 增量累加函数(前缀扫描)
- 增量重排序函数
jdk 24引入的Gathers工具类提供了一些默认实现,后面有分析
下面将展示如何使用 Gatherer 实现这些功能:
元素分组(窗口函数)
// 实现固定大小的窗口分组
public static <T> Gatherer<T, ?, List<T>> window(int size) {return Gatherer.of(() -> new ArrayList<T>(), // 初始化状态Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.add(element);if (state.size() == size) {downstream.push(new ArrayList<>(state));state.clear();}return true;}),(left, right) -> { // 合并器left.addAll(right);return left;},(state, downstream) -> { // 完成器if (!state.isEmpty()) {downstream.push(new ArrayList<>(state));}});
}// 使用示例
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(window(2)).toList(); // [[1,2], [3,4], [5,6]]
去重连续相似元素
// 去重连续相似元素
public static <T> Gatherer<T, ?, T> distinctConsecutive() {return Gatherer.of(() -> new Object() { T last; boolean hasLast = false; },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {if (!state.hasLast || !state.last.equals(element)) {downstream.push(element);state.last = element;state.hasLast = true;}return true;}));
}// 使用示例
List<Integer> distinct = Stream.of(1,1,2,2,2,3,2,2).gather(distinctConsecutive()).toList(); // [1,2,3,2]
增量累加函数(前缀扫描)
// 前缀扫描实现
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<R, T, R> scanner) {return Gatherer.ofSequential(() -> new Object() { R current = initial.get(); },Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.current = scanner.apply(state.current, element);return downstream.push(state.current);}));
}// 使用示例
List<String> result = Stream.of(1,2,3,4,5).gather(scan(() -> "", (str, num) -> str + num)).toList(); // ["1", "12", "123", "1234", "12345"]
增量重排序函数
// 实现增量排序(维护一个固定大小的有序窗口)
public static <T> Gatherer<T, ?, T> incrementalSort(int windowSize, Comparator<T> comparator) {return Gatherer.of(() -> new PriorityQueue<>(comparator),Gatherer.Integrator.ofGreedy((state, element, downstream) -> {state.offer(element);if (state.size() > windowSize) {downstream.push(state.poll());}return true;}),(left, right) -> {left.addAll(right);return left;},(state, downstream) -> {while (!state.isEmpty()) {downstream.push(state.poll());}});
}// 使用示例
List<Integer> sorted = Stream.of(5,3,1,4,2).gather(incrementalSort(3, Comparator.naturalOrder())).toList(); // [1,3,4,2,5]
关键点说明
状态管理:
- 使用
initializer()
创建合适的状态容器 - 在
integrator()
中维护和更新状态 - 在
finisher()
中处理剩余元素
- 使用
性能考虑:
- 对于无状态操作使用
defaultInitializer()
- 对于可并行操作提供有效的
combiner()
- 使用
Greedy
优化非短路操作
- 对于无状态操作使用
组合使用:
- 可以使用
andThen()
组合多个 Gatherer - 预先组合的 Gatherer 比链式调用更高效
- 可以使用
这些实现展示了 Gatherer 的强大功能,它能够优雅地处理各种流处理场景,同时保持代码的清晰和可维护性。
核心方法
// 初始化器 - 创建中间状态
default Supplier<A> initializer()// 集成器 - 处理输入元素
Integrator<A, T, R> integrator()// 组合器 - 合并两个状态
default BinaryOperator<A> combiner()// 完成器 - 执行最终操作
default BiConsumer<A, Downstream<? super R>> finisher()
组合操作
// 将多个 Gatherer 组合在一起
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that)
静态工厂方法
// 创建无状态顺序 Gatherer
static <T, R> Gatherer<T, Void, R> ofSequential(Integrator<Void, T, R> integrator)// 创建可并行化的 Gatherer
static <T, A, R> Gatherer<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher
)
核心关联类
- Stream: 通过
gather(Gatherer)
方法使用 Gatherer - Gatherers: 提供常用 Gatherer 实现的工具类
- Downstream: 接收 Gatherer 输出的下一阶段
- Integrator: 处理元素集成的函数式接口
交互关系
// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>
最佳实践
使用注意事项
状态管理:
- 无状态 Gatherer 使用
defaultInitializer()
- 有状态 Gatherer 需正确实现状态初始化和合并
- 无状态 Gatherer 使用
并行处理:
- 需要并行处理时提供有效的 combiner
- 仅顺序处理时使用
defaultCombiner()
资源清理:
- 在 finisher 中执行必要的清理操作
- 不需要清理时使用
defaultFinisher()
性能优化:
- 使用
Greedy
Integrator 优化无短路操作 - 合理使用预组合的 Gatherer 提高性能
- 使用
示例代码
// 创建 map 操作的 Gatherer
public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {return Gatherer.of((unused, element, downstream) -> downstream.push(mapper.apply(element)));
}// 组合多个 Gatherer
Gatherer<Integer, ?, String> pipeline = map(i -> i + 1) // 增1.andThen(map(i -> i.toString())); // 转字符串
为什么需要 Gatherer.Downstream
?
Gatherer
的 integrator
和 finisher
需要一个回调机制来将它们处理后的结果(类型为 R
)发送出去。它们不能直接访问下游的 Sink
,因为这会破坏 Stream 的封装性。
Gatherer.Downstream
就是这个专门的回调接口,它充当了 Gatherer
内部逻辑和外部流水线之间的 “信使”。
// Gatherer 接口的一部分
interface Downstream<R> {boolean push(R r);// ...
}
GatherSink
同时实现了 Sink<T>
和 Gatherer.Downstream<R>
,这让它成为了一个完美的“双面适配器”:
- 作为
Sink<T>
:它面向上游,接收类型为T
的原始元素。这是它作为标准 Stream 操作的一部分的职责。 - 作为
Gatherer.Downstream<R>
:它面向Gatherer
的内部逻辑 (integrator
和finisher
)。当integrator
想要输出一个处理好的元素R
时,它不直接和下游Sink
对话,而是调用downstream.push(r)
。
我们来看 GatherSink
的代码,就能清晰地看到这个转发过程:
// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink; // 这是真正的下游 Sink// ...// 当上游传来一个元素 T@Overridepublic void accept(T t) {// ...// 调用 integrator,并把 this (自己) 作为 Downstream 传进去proceed &= integrator.integrate(state, t, this);}// ...// 当 integrator 内部调用 downstream.push(r) 时,这个方法被触发@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 在这里,它把结果 r 转发给了真正的下游 sinkreturn !cancellationRequested(p);}}
// ... existing code ...
- 解耦(Decoupling):将
Gatherer
的内部业务逻辑(如何处理数据)与 Stream 的流水线机制(如何传递数据)分离开。Gatherer
的作者不需要关心下游是什么,只需要知道有一个Downstream
接口可以用来“推送”结果。 - 控制(Control):
Downstream
接口不仅有push
,还有isRejecting
等方法。这给了integrator
更多的控制权,比如在推送前检查下游是否还愿意接收数据,从而实现更复杂的短路(short-circuiting)逻辑。 - 封装(Encapsulation):
GatherSink
封装了所有复杂的逻辑,比如状态管理、短路信号的传递等,使得Gatherer
的实现者可以专注于核心的转换算法,而不用处理 Stream 底层的复杂机制。
Gatherer
是一个强大的自定义流处理工具,而 Gatherer.Downstream
接口及其在 GatherSink
中的实现,是连接这个自定义工具和标准 Stream 流水线的、设计精巧的桥梁。
gather
具体例子
gather
的能力最好通过例子来理解。JDK 在 java.util.stream.Gatherers
这个工具类里提供了一些预置的实现,这些例子能很好地展示它的威力:
a. 窗口化 (Windowing)
想象一下,你想把一个数字流按每3个一组进行分组: Stream.of(1,2,3,4,5,6,7,8)
-> [[1, 2, 3], [4, 5, 6], [7, 8]]
用传统操作很难做到,但用 gather
就非常简单:
// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
b. 滑动窗口 (Sliding Window)
或者,你想创建一个大小为2的滑动窗口: Stream.of(1,2,3,4,5,6,7,8)
-> [[1, 2], [2, 3], [3, 4], [4, 5], ...]
同样,gather
也能轻松实现:
// (代码来自 Gatherers.java 的 Javadoc 示例)
List<List<Integer>> windows2 =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();
c. 其他复杂操作
Gatherers
类还提供了 fold
(类似 reduce
但严格串行)、scan
(前缀和扫描)等很多有用的操作。
GathererOp
的角色
GathererOp
就是 gather()
操作在 Stream 流水线中的内部表示和执行引擎。
当调用 stream.gather(...)
时,Java 内部就会创建一个 GathererOp
实例,并将其链接到流操作链上。
// ... existing code ...
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {@SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// ...if (upstream.getClass() == GathererOp.class) {// ... 融合优化 ...} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
// ... existing code ...
这个类非常复杂,因为它承担了所有脏活累活,包括:
- 操作融合:它非常智能,如果发现连续两个
gather()
操作,或者gather()
后面跟着collect()
,它会尝试将它们**融合(fuse) 成一个单一的操作来执行,极大地提升性能。 - 串行与并行执行:它内部包含了处理串行流(
GatherSink
)和并行流(GathererTask
,通过evaluate
方法调度)的完整逻辑。 - 状态管理和短路:它负责管理
Gatherer
的状态,并响应短路信号(当integrator
返回false
时)。
总结
gather()
是一个面向用户的功能,它为 Stream 提供了前所未有的灵活性,让你能定义任意复杂的有状态转换。Gatherer
是一个接口,通过实现它来定义gather()
的具体行为逻辑。GathererOp
是 JDK 内部的实现细节,是驱动Gatherer
逻辑在流中高效运行的复杂引擎。
Gatherer 接口实现分析
Gatherer<T, A, R>
接口定义了一个高度可定制的中间操作。它的核心语义是:“接收一个输入元素流(类型 T
),通过一个可选的中间状态(类型 A
),产生一个输出元素流(类型 R
)”。
与 map
或 filter
不同,Gatherer
不是简单的“一对一”或“一对零/一”的转换。它是一个 “多对多” 的转换器,可以:
- 有状态(Stateful): 它可以维护一个在元素处理过程中不断变化的状态。
- 缓冲(Buffering): 它可以接收多个输入元素后,才产生一个或多个输出。
- 短路(Short-circuiting): 它可以提前终止流的处理。
你可以把它想象成一个与 Collector
类似但更强大的概念。Collector
是一个终端操作,它消费整个流并产生一个最终结果。而 Gatherer
是一个中间操作,它消费一个流并产生另一个流。
Gatherer
的四个核心组件
Gatherer
的行为由四个核心函数定义,这四个函数共同描述了整个转换过程。
// ... existing code ...
public interface Gatherer<T, A, R> {/*** A function that produces an instance of the intermediate state used for* this gathering operation.* ...*/default Supplier<A> initializer() { /*...*/ };/*** A function which integrates provided elements, potentially using* the provided intermediate state, optionally producing output to the* provided {@link Downstream}.* ...*/Integrator<A, T, R> integrator();/*** A function which accepts two intermediate states and combines them into* one.* ...*/default BinaryOperator<A> combiner() { /*...*/ }/*** A function which accepts the final intermediate state* and a {@link Downstream} object, allowing to perform a final action at* the end of input elements.* ...*/default BiConsumer<A, Downstream<? super R>> finisher() { /*...*/ }
// ... existing code ...
a. initializer()
: 状态初始化器
语义:
Supplier<A> initializer()
作用: 创建并返回一个新的、可变的中间状态对象
A
。何时调用: 在处理一个新的数据分片(在并行流中)或整个流(在串行流中)的开始时调用。
默认行为: 如果不提供,则表示这是一个无状态的
Gatherer
,状态类型A
通常为Void
。
b. integrator()
: 核心处理器
语义:
Integrator<A, T, R> integrator()
作用: 这是
Gatherer
的核心逻辑。它接收当前的状态A
、一个新的输入元素T
和一个下游管道Downstream
。它负责:根据输入元素
t
更新状态state
。决定是否要向下游
downstream
推送(push
)零个、一个或多个结果R
。返回一个布尔值,
true
表示继续处理,false
表示短路,请求上游停止发送数据。
这是唯一必须提供的方法。
c. combiner()
: 并行合并器
语义:
BinaryOperator<A> combiner()
作用: 在并行流中,当两个子任务都处理完它们的数据分片后,此方法被调用来合并它们各自的中间状态
A
。何时调用: 仅在并行流中调用。
默认行为: 如果不提供,则表示该
Gatherer
不支持并行处理。
d. finisher()
: 收尾处理器
- 语义:
BiConsumer<A, Downstream<? super R>> finisher()
- 作用: 在所有输入元素都被
integrator
处理完毕后调用。它接收最终的状态A
,允许你执行一些收尾工作,例如,将状态中缓冲的最后一个元素或最终计算结果推送到下游。 - 何时调用: 在流的末尾调用一次。
- 默认行为: 如果不提供,表示没有收尾操作。
Downstream
接口:向下游推送结果
Downstream
是 integrator
和 finisher
用来与下游流水线通信的桥梁。
// ... existing code ...@FunctionalInterfaceinterface Downstream<T> {/*** Pushes, if possible, the provided element downstream -- to the next* stage in the pipeline.* ...*/boolean push(T element);
// ... existing code ...
它的核心方法是 push(T element)
。调用此方法会将一个结果元素 element
发送到下一个流操作。它也返回一个布尔值,如果下游也请求短路,它会返回 false
。
示例:用 Gatherer
实现 map
接口文档中的这个例子完美地诠释了 Gatherer
的语义:
// ... existing code ...* {@snippet lang = java:* public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {* return Gatherer.of(* (unused, element, downstream) -> // integrator* downstream.push(mapper.apply(element))* );* }* }
// ... existing code ...
- 这是一个无状态的
Gatherer
,所以initializer
是默认的,状态A
是Void
(由unused
参数表示)。 - 它的
integrator
对于每个到来的element
,直接调用mapper
函数进行转换,然后通过downstream.push()
将结果推送到下游。 - 它不关心
push
的返回值,所以它会处理完所有元素(非短路)。 - 它没有
combiner
和finisher
。
Greedy接口
Gatherer.Integrator
接口的核心方法 integrate
返回一个 boolean
值,用于表示是否需要短路(short-circuit)。true
表示继续,false
表示停止。
然而,在很多场景下,比如 windowFixed
,我们知道 integrator
总是会处理完所有元素,永远不会提前返回 false
。这种 integrator
被称为 “贪婪的”(Greedy) 。
Integrator.ofGreedy(...)
就是一个静态工厂方法,它做两件事:
- 它接收一个不返回布尔值的
BiConsumer
(或者像这里的方法引用FixedWindow::integrate
,它的返回值会被忽略)。 - 它返回一个特殊的
Integrator
实现,这个实现被标记为Integrator.Greedy
接口的实例。
// 在 Gatherer 接口内部
interface Integrator<A, T, R> {// ...@FunctionalInterfaceinterface Greedy<A, T, R> extends Integrator<A, T, R> {// ...}
}
为什么这个信号很重要?
Stream 的执行引擎(即 GathererOp
)在处理流时,会检查 integrator
是否是 instanceof Integrator.Greedy
。
- 如果是
Greedy
:运行时就不需要在每次调用integrate
后都去检查它的返回值。这可以消除一个分支判断,在处理大量元素的紧凑循环中,这种微小的优化会累积起来,带来可观的性能提升。 - 如果不是
Greedy
:运行时就必须在每次调用后检查返回值,以正确处理可能的短路情况。
所以,ofGreedy
的包装本质上是一种静态的性能提示,它告诉运行时:“放心执行,我这个 integrator
不会短路,你可以走更快的代码路径。”
总结
Gatherer
接口通过其四个核心组件(initializer
, integrator
, combiner
, finisher
)提供了一个强大而灵活的框架,用于定义复杂的、有状态的流转换。它的语义核心在于 integrator
,它消费输入元素、更新状态,并通过 Downstream
按需产生输出,从而实现了远超传统流操作的强大功能。
Gatherers 类分析
Gatherers 是一个工具类,提供了常用的 Gatherer 实现和工厂方法,用于在流处理中执行窗口操作、并发映射、折叠和扫描等中间操作。
使用场景
- 窗口操作:将流元素分组为固定大小或滑动窗口
- 并发处理:使用虚拟线程并发执行映射操作
- 增量处理:执行折叠(fold)和前缀扫描(scan)操作
- 流转换:实现复杂的流到流的转换
窗口操作
// 固定大小窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize)
// 示例:将流元素分组为大小为3的窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5,6).gather(Gatherers.windowFixed(3)).toList(); // [[1,2,3], [4,5,6]]// 滑动窗口
public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize)
// 示例:创建大小为2的滑动窗口
List<List<Integer>> windows = Stream.of(1,2,3,4,5).gather(Gatherers.windowSliding(2)).toList(); // [[1,2], [2,3], [3,4], [4,5]]
并发处理
// 并发映射
public static <T, R> Gatherer<T,?,R> mapConcurrent(int maxConcurrency,Function<? super T, ? extends R> mapper
)
// 示例:并发执行映射操作
List<String> results = Stream.of(1,2,3,4,5).gather(Gatherers.mapConcurrent(4, i -> "Result" + i)).toList();
增量处理
// 折叠操作
public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder
)
// 示例:将数字连接成字符串
Optional<String> result = Stream.of(1,2,3,4).gather(Gatherers.fold(() -> "", (str, num) -> str + num)).findFirst(); // "1234"// 前缀扫描
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner
)
// 示例:生成累积字符串
List<String> results = Stream.of(1,2,3,4).gather(Gatherers.scan(() -> "", (str, num) -> str + num)).toList(); // ["1", "12", "123", "1234"]
核心关联类
- Gatherer:基础接口,定义了流处理中间操作的契约
- Stream:通过 gather() 方法使用 Gatherer
- FutureTask:在并发映射中用于任务管理
- ArrayDeque:用于管理待处理的任务队列
交互关系
// Stream 与 Gatherer 的关系
Stream<T>.gather(Gatherer<T,A,R>) -> Stream<R>// Gatherer 组合关系
Gatherer<T,A,R>.andThen(Gatherer<R,B,S>) -> Gatherer<T,?,S>// 内部实现类关系
Gatherers.Composite<T,A,R,AA,RR> implements Gatherer<T,Object,RR>
最佳实践
窗口操作:
- 注意大窗口可能占用大量内存
- 最后一个窗口可能小于指定大小
- 窗口内容是不可修改的列表
并发处理:
- 合理设置 maxConcurrency,避免过度并发
- 注意处理可能的异常情况
- 操作会保持流的顺序性
性能优化:
- 对于简单转换,优先使用内置方法
- 合理使用组合操作减少中间状态
- 注意内存使用,特别是大窗口操作
错误处理:
- 并发操作中的异常会被包装为 RuntimeException
- 确保转换函数是线程安全的
- 注意处理中断情况
Gatherers
如果说 Gatherer
接口定义了“是什么”(what),那么 Gatherers
类就提供了“怎么用”(how)的最佳范例。Gatherers
是一个 final
的、拥有私有构造函数的工具类。它的核心职责是提供一系列预先实现好的、常用的 Gatherer
实例的静态工厂方法。
public final class Gatherers {private Gatherers() { } // This class is not intended to be instantiated// Public built-in Gatherers and factory methods for them// ...
}
它的定位类似于 Collectors
或 Comparators
,为开发者提供了开箱即用的强大功能,避免了用户为了一些常见场景而不得不手动实现复杂的 Gatherer
接口。
Gatherers
类提供了多个非常有用的静态方法,每个方法都返回一个配置好的 Gatherer
。我们来分析几个关键的实现:
windowFixed(int windowSize)
- 固定窗口
- 功能: 将流中的元素按固定大小
windowSize
分组。最后一组可能小于windowSize
。 - 示例:
[1,2,3,4,5]
用windowFixed(2)
处理后得到[[1,2], [3,4], [5]]
。 - 实现分析:
Gatherers.java
// ... existing code ... public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {// ... 参数检查 ...class FixedWindow { // 状态类Object[] window;int at;FixedWindow() { /* 初始化 window 和 at */ }boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element; // 元素存入数组if (at < windowSize) { // 窗口未满return true; // 继续} else { // 窗口已满// ... 创建新窗口,重置 at ...final var oldWindow = window;window = new Object[windowSize];at = 0;return downstream.push(SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(oldWindow));}}void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) { // 如果最后还有剩余元素// ... 创建一个精确大小的数组并拷贝 ...downstream.push(/* 推送最后一个窗口 */);}}}return Gatherer.<TR, FixedWindow, List<TR>>ofSequential(FixedWindow::new, // InitializerIntegrator.ofGreedy(FixedWindow::integrate), // IntegratorFixedWindow::finish // Finisher); } // ... existing code ...
- 定义一个内部状态类
FixedWindow
,包含一个数组window
用于缓冲元素和一个计数器at
。 initializer
就是FixedWindow::new
。integrator
将元素放入数组,当数组满了就push
出去,并重置状态。finisher
负责处理流结束时可能还未满的最后一个窗口。- 它被标记为
ofSequential
,表示这个实现不支持并行。
- 定义一个内部状态类
SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow)
为什么不直接用 List.of
或 Arrays.asList
? 这层包装是为了极致的性能和零拷贝(zero-copy)。
我们先看看替代方案有什么问题:
Arrays.asList(lastWindow)
: 这个方法会创建一个List
包装器,但它直接引用原始数组。如果你后续修改了这个数组,List
的内容也会变。在Gatherer
的实现中,状态对象(如FixedWindow
)可能会复用其内部数组,这就可能导致已经push
出去的List
的内容被意外修改,这是非常危险的。List.of(lastWindow)
: 这个方法会创建一个不可变的列表,这很好。但为了保证不可变性,它会创建一个内部数组的防御性拷贝(defensive copy)。也就是说,它会分配新内存并把lastWindow
的所有元素复制过去。
在 windowFixed
这种需要频繁创建列表的场景下,每次都进行数组拷贝会带来显著的性能开销和内存压力。
SharedSecrets
是 JDK 内部的一个机制,用于在不同包之间安全地共享一些不希望公开为公共 API 的功能。jdk.internal.access
包就是专门为此设计的。
这里的 listFromTrustedArrayNullsAllowed(lastWindow)
方法,正如其名,做了以下事情:
- Trusted Array (受信任的数组): 它“信任”调用者,相信传入的
lastWindow
数组后续不会再被修改。 - Zero-Copy: 基于这种信任,它直接使用这个数组作为内部存储来构造一个
List
,而不进行任何拷贝。 - Unmodifiable: 返回的
List
仍然是不可变的,任何修改操作都会抛出异常。
在 windowFixed
的实现中,lastWindow
是一个局部变量,创建后马上就用于生成 List
并被丢弃,它永远不会被再次修改。
// ... existing code ...void finish(Downstream<? super List<TR>> downstream) {if (at > 0 && !downstream.isRejecting()) {var lastWindow = new Object[at]; // 1. 创建一个新数组System.arraycopy(window, 0, lastWindow, 0, at); // 2. 拷贝内容window = null; // 3. 丢弃对旧状态数组的引用at = 0;downstream.push( // 4. 使用新数组创建 ListSharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(lastWindow));}}
// ... existing code ...
这个场景完美符合 listFromTrustedArrayNullsAllowed
的使用条件,因此可以用这种零拷贝的方式来最大化性能。
windowSliding(int windowSize)
- 滑动窗口
- 功能: 创建一个在流上滑动的窗口。每一步,窗口都向前滑动一个元素。
- 示例:
[1,2,3,4]
用windowSliding(3)
处理后得到[[1,2,3], [2,3,4]]
。 - 实现分析:
// ... existing code ... public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {// ...class SlidingWindow {// ...boolean integrate(TR element, Downstream<? super List<TR>> downstream) {window[at++] = element;if (at < windowSize) { // 窗口未满return true;} else { // 窗口已满final var oldWindow = window;final var newWindow = new Object[windowSize];// 关键:将旧窗口的后 n-1 个元素拷贝到新窗口的前 n-1 个位置System.arraycopy(oldWindow,1, newWindow, 0, windowSize - 1);window = newWindow; // 状态更新为新窗口at -= 1;return downstream.push(/* 推送旧窗口 */);}}// ...}// ... } // ... existing code ...
windowFixed
类似,但integrate
的逻辑更复杂。当窗口满了之后,它不是清空窗口,而是通过System.arraycopy
将元素向前“滑动”,为下一个元素腾出位置。
Gatherers.fold
- 最终的折叠
fold
方法的行为非常类似于 Stream.reduce
,它将流中的所有元素聚合成一个最终结果。
- 目标: 对一个元素序列执行一个有状态的、顺序敏感的规约操作。
- 核心场景: 当你需要实现的规约操作不满足
reduce
所需的“结合律”(associativity),或者说,当元素的处理顺序至关重要,无法通过并行的combiner
函数来合并中间结果时,fold
就是最佳选择。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(folder, "'folder' must not be null");class State {R value = initial.get();State() {}}return Gatherer.ofSequential(State::new,Integrator.ofGreedy((state, element, downstream) -> {state.value = folder.apply(state.value, element);return true;}),(state, downstream) -> downstream.push(state.value));}
// ... existing code ...
- 参数:
initial
: 一个Supplier
,提供累积的初始值。folder
: 一个BiFunction
,接收当前的累积值和下一个元素,并返回新的累积值。
State
类:- 非常简单,只有一个字段
value
,用于在整个流的处理过程中保存当前的累积结果。它在Gatherer
初始化时通过initial.get()
获得初始值。
- 非常简单,只有一个字段
Integrator
:ofGreedy
表示它会贪婪地消费所有上游元素。state.value = folder.apply(state.value, element);
: 这是核心逻辑。每来一个element
,就用folder
函数更新state.value
。return true;
: 总是返回true
,表示它永远不会主动短路,会处理完所有元素。downstream
参数虽然存在,但在这里并未使用,因为fold
在处理过程中不产生任何中间结果。
Finisher
:(state, downstream) -> downstream.push(state.value)
: 这是fold
的关键。只有当所有元素都被integrator
处理完毕后,finisher
才会被调用。它只做一件事:将最终累积的state.value
推送给下游。因此,fold
操作的输出流中最多只有一个元素。
Gatherer.ofSequential
:fold
被定义为顺序的(没有combiner
),这意味着在并行流中,它会由Hybrid
策略执行,保证integrator
是按元素顺序调用的。
Gatherers.scan
- 增量的扫描(前缀和)
scan
方法与 fold
不同,它会为每一个输入元素都生成一个输出结果,这个结果是到当前元素为止的累积值。它也被称为“前缀和”(Prefix Sum)操作。
- 目标: 创建一个流,其中每个元素都是原始流到该点为止的累积结果。
- 核心场景: 当你需要观察一个累积过程的每一步中间状态时。例如,计算账户余额随每一笔交易的变化,或者在时间序列数据中计算移动平均值的每一步。
// ... existing code ...public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner) {Objects.requireNonNull(initial, "'initial' must not be null");Objects.requireNonNull(scanner, "'scanner' must not be null");class State {R current = initial.get();boolean integrate(T element, Downstream<? super R> downstream) {return downstream.push(current = scanner.apply(current, element));}}return Gatherer.ofSequential(State::new,Integrator.<State,T, R>ofGreedy(State::integrate));}
// ... existing code ...
- 参数: 与
fold
完全相同。 State
类:- 同样只有一个字段
current
,用于保存当前的累积值。
- 同样只有一个字段
Integrator
:return downstream.push(current = scanner.apply(current, element));
: 这是与fold
的根本区别。- 每当一个新
element
到来:current = scanner.apply(current, element)
: 首先,使用scanner
函数计算出新的累积值,并更新state.current
。downstream.push(...)
: 然后,立即将这个新计算出的累积值推送给下游。
- 因此,每处理一个输入元素,就会产生一个输出元素。
Finisher
:scan
方法没有提供finisher
。因为所有的结果都在integrator
中被实时推送了,当流结束时,没有额外的工作需要做。
mapConcurrent(...)
- 虚拟线程并发映射
public static <T, R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency,final Function<? super T, ? extends R> mapper)
mapConcurrent
是一个中间操作,它允许以受控的并发级别,对流中的每个元素应用一个函数。它主要用于 I/O 密集型或其它会引起阻塞的操作(例如,发起网络请求、查询数据库),通过利用虚拟线程(Virtual Threads)来显著提高吞吐量,同时保持流元素的原始顺序。
- 解决的问题: 标准的
Stream.map
是串行的。在并行流中,map
虽然并行,但使用的是重量级的平台线程(Platform Threads),这对于成千上万个会长时间阻塞的 I/O 任务来说,资源开销巨大且效率低下。 - 核心武器: 虚拟线程。虚拟线程是轻量级的,可以大量创建而不会耗尽系统资源,非常适合用于执行 I/O 阻塞任务。
- 设计要点:
- 并发执行: 对每个元素应用的
mapper
函数都在一个独立的虚拟线程中并发执行。 - 控制并发度: 通过
maxConcurrency
参数限制同时“在途”的任务数量,防止下游系统过载。 - 保持顺序: 尽管
mapper
函数是并发执行且完成时间不确定,但其结果必须按照原始流的顺序推送到下游。 - 支持短路: 当下游不再需要元素时(例如
findFirst()
),能够尽力取消正在进行和等待中的任务。
- 并发执行: 对每个元素应用的
mapConcurrent
的实现非常精巧,主要由三个部分构成:MapConcurrentTask
(任务封装)、State
(状态管理与核心逻辑)以及最终的 Gatherer
构建。
MapConcurrentTask
- 任务的封装
// ... existing code ...final class MapConcurrentTask extends FutureTask<R> {final Thread thread;private MapConcurrentTask(Callable<R> callable) {super(callable);this.thread = Thread.ofVirtual().unstarted(this);}}
// ... existing code ...
extends FutureTask<R>
: 每个任务都是一个FutureTask
,这使得我们可以异步地获取其执行结果(get()
),检查其状态(isDone()
),以及取消它(cancel()
)。final Thread thread
: 每个FutureTask
都关联一个专属的虚拟线程。Thread.ofVirtual().unstarted(this)
: 这是关键。它创建了一个虚拟线程,但并不立即启动。线程的Runnable
目标就是this
,即FutureTask
自身(因为FutureTask
实现了Runnable
)。当调用thread.start()
时,这个虚拟线程就会开始执行FutureTask
的run()
方法,进而执行我们传入的mapper
函数。
State
- 核心逻辑与状态机
这个 State
类是 mapConcurrent
这个 Gatherer
(收集器) 的核心,它封装了所有用于实现并发处理的状态和逻辑。mapConcurrent
的目标是接收上游的元素,然后并发地将一个 mapper
函数应用在每个元素上,最后将结果按顺序推送到下游。State
类就是这个过程中的“总调度室”。
State
类的主要职责是:
- 任务管理: 接收上游流(Stream)中的元素。
- 并发执行: 为每个元素创建一个独立的任务(使用虚拟线程),并启动它来执行
mapper
函数。 - 状态跟踪: 维护一个正在处理中的任务队列(work-in-progress)。
- 结果回写: 按顺序从完成的任务中获取结果,并将结果推送到下游。
- 异常与清理: 妥善处理执行过程中的异常,并确保所有启动的线程最终都能被清理。
成员变量 (Fields)
// ... existing code ...final class State {private final ArrayDeque<MapConcurrentTask> wip =new ArrayDeque<>(Math.min(maxConcurrency, 16));// ... existing code ...
State
类只有一个成员变量 wip
:
private final ArrayDeque<MapConcurrentTask> wip
:wip
是 "work-in-progress" 的缩写,意为“正在进行中的工作”。- 它是一个
ArrayDeque
(一种双端队列),用于存储所有已经启动但可能尚未完成的并发任务。 MapConcurrentTask
是FutureTask
的一个内部子类,它包装了mapper.apply(element)
这个调用。每个任务都关联一个虚拟线程。- 队列的初始容量被限制在
maxConcurrency
和 16 之间取较小值,这只是一个小的性能优化,ArrayDeque
可以根据需要自动扩容。 - 关键点: 使用队列(Queue)这种数据结构保证了任务的先进先出(FIFO)顺序。这意味着元素是按它们从上游到达的顺序被处理和向下游推送的,从而维持了流的有序性。
integrate
方法:接收元素并发起任务
// ... existing code ...boolean integrate(T element, Downstream<? super R> downstream) {// Prepare the next task and add it to the work-in-progressfinal var task = new MapConcurrentTask(() -> mapper.apply(element));wip.addLast(task);assert wip.peekLast() == task;assert wip.size() <= maxConcurrency;// Start the next tasktask.thread.start();// Flush at least 1 element if we're at capacityreturn flush(wip.size() < maxConcurrency ? 0 : 1, downstream);}
// ... existing code ...
integrate
方法是 Gatherer
的 integrator
(整合器) 的具体实现。每当上游有一个新元素 element
准备好被处理时,这个方法就会被调用。
它的工作流程如下:
- 创建任务:
new MapConcurrentTask(() -> mapper.apply(element))
创建一个新的MapConcurrentTask
。这个任务的核心工作就是调用用户提供的mapper
函数来处理元素。 - 入队:
wip.addLast(task)
将新创建的任务添加到wip
队列的末尾。 - 启动线程:
task.thread.start()
立即启动与该任务关联的虚拟线程。这意味着mapper
函数的执行会立刻在一个新的虚拟线程中开始,而integrate
方法本身不会等待它完成,从而实现了并发。 - 调用
flush
(冲刷/刷新): 这是实现背压 (back-pressure) 的关键。wip.size() < maxConcurrency
: 检查当前正在处理的任务数是否已经达到了用户设定的最大并发数maxConcurrency
。- 如果
wip.size()
小于maxConcurrency
,则调用flush(0, downstream)
。atLeastN
参数为 0 意味着flush
方法会尝试“非阻塞地”冲刷掉所有已经完成的任务,但如果队列头部的任务还没完成,它不会等待,会立刻返回。 - 如果
wip.size()
达到了maxConcurrency
,则调用flush(1, downstream)
。atLeastN
参数为 1 意味着flush
方法必须等待并冲刷掉至少一个任务的结果后才能返回。这起到了一个限流的作用:在处理能力达到上限时,它会阻塞上游,直到至少一个并发任务完成,腾出“空位”后,才允许上游继续提供新元素。
flush
方法:获取结果并推向下游
// ... existing code ...boolean flush(long atLeastN, Downstream<? super R> downstream) {boolean success = false, interrupted = false;try {boolean proceed = !downstream.isRejecting();MapConcurrentTask current;while (proceed&& (current = wip.peekFirst()) != null&& (current.isDone() || atLeastN > 0)) {R result;// Ensure that the task is done before proceedingfor (;;) {try {result = current.get();break;} catch (InterruptedException ie) {interrupted = true; // ignore for now, and restore later}}proceed &= downstream.push(result);atLeastN -= 1;final var correctRemoval = wip.pollFirst() == current;assert correctRemoval;}return (success = proceed); // Ensure that cleanup occurs if needed} catch (ExecutionException e) {
// ... existing code ...} finally {
// ... existing code ...}}
// ... existing code ...
flush
方法是 State
的另一个核心,负责从 wip
队列中取出已完成的任务,获取其结果,并将其推送到下游。
它的工作流程如下:
- 循环检查队首任务:
while
循环持续检查wip
队列的头部。proceed = !downstream.isRejecting()
: 首先检查下游是否还能接收元素。current = wip.peekFirst()
: 查看队首的任务,但不移除它。current.isDone() || atLeastN > 0
: 这是循环能否执行的关键条件。它表示:- 如果队首任务已经完成 (
isDone()
),则可以处理它。 - 或者,如果
atLeastN > 0
,意味着调用者(integrate
或finisher
)要求必须冲刷掉至少atLeastN
个元素。在这种情况下,即使队首任务尚未完成,循环也会继续,并在内部通过current.get()
阻塞等待它完成。
- 如果队首任务已经完成 (
- 获取结果:
result = current.get()
: 调用FutureTask
的get()
方法来获取任务的执行结果。这个调用是阻塞的,如果任务还没执行完,它会一直等待直到任务完成。InterruptedException
处理:如果在get()
等待期间线程被中断,它会捕获InterruptedException
,设置interrupted
标志位,然后继续尝试get()
。中断状态会在finally
块中被恢复。
- 推向下游:
proceed &= downstream.push(result)
将获取到的结果result
推送给下游。如果下游拒绝接收(例如,对于findFirst
这样的短路操作),push
会返回false
,proceed
也将变为false
,从而终止循环。 - 出队:
wip.pollFirst()
: 成功处理完一个任务后,将其从队列中移除。 - 异常处理与清理 (
finally
):catch (ExecutionException e)
: 如果任务在执行mapper
时抛出异常,get()
会将其包装在ExecutionException
中抛出。这里会解开包装,将原始的RuntimeException
重新抛出。finally
块确保了健壮性。如果flush
因为下游拒绝或异常而提前退出 (!success
),它会负责清理所有仍在wip
队列中的任务:task.cancel(true)
: 向所有未完成的任务发送取消信号(中断它们的执行线程)。next.thread.join()
: 等待所有任务的线程都真正终止,防止产生僵尸线程。Thread.currentThread().interrupt()
: 如果在执行过程中发生了中断,就在最后恢复当前线程的中断状态,这是良好的并发编程实践。
Gatherer
的构造
// ... existing code ...return Gatherer.ofSequential(State::new,Integrator.<State, T, R>ofGreedy(State::integrate),(state, downstream) -> state.flush(Long.MAX_VALUE, downstream));
// ... existing code ...
最后,mapConcurrent
方法通过 Gatherer.ofSequential
将 State
类的各个部分组装成一个完整的 Gatherer
:
State::new
: Initializer (初始化器)。当流开始处理时,会调用它来创建一个新的State
实例。State::integrate
: Integrator (整合器)。如前所述,每当有新元素时,会调用state.integrate(...)
。ofGreedy
表示这个整合器会尽可能多地处理元素,直到下游的push
返回false
。(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
: Finisher (完成器)。当上游所有元素都处理完毕后,会调用这个finisher
。它调用flush
并传入Long.MAX_VALUE
,这会强制flush
方法等待并冲刷掉wip
队列中所有剩余的任务,确保没有任何结果丢失。
总结
Gatherers.mapConcurrent
是一个高度优化的并发工具。它巧妙地将虚拟线程、FutureTask
和 Gatherer
的状态模型结合在一起,实现了:
- 高并发:利用虚拟线程处理阻塞操作。
- 顺序保留:通过一个先进先出的工作队列和阻塞式的
get()
调用来保证输出顺序。 - 资源可控:通过
maxConcurrency
参数实现背压,防止资源耗尽。 - 健壮性:具备完善的异常处理和任务取消机制。
它完美地展示了 Gatherer
接口的强大扩展能力,使得开发者可以在 Stream API 内部实现复杂的、有状态的、甚至是并发的流处理逻辑。
enum Value
enum Value
的核心目标是为 Gatherer
的各个函数式接口(initializer
, finisher
, combiner
)提供一个统一的、默认的、无操作的(no-op)实现。
它的存在主要是为了解决以下几个问题:
- 避免使用
null
: 在 API 设计中,使用null
来表示“不存在”或“未提供”某个功能,常常会导致代码中充斥着繁琐的null
检查,并且是NullPointerException
的主要来源。Value
枚举通过提供一个具体的对象实例来代表“默认行为”,从而完全避免了null
的使用。 - 提供可识别的默认值: 当
Gatherer
的实现者没有提供某个功能(例如,一个无状态的Gatherer
不需要initializer
),API 内部可以使用Value.DEFAULT
这个唯一的、可识别的“哨兵值”(Sentinel Value)来填充。 - 提升性能和代码简洁性: API 内部可以通过简单的引用比较(例如
if (finisher == Value.DEFAULT)
) 来判断是否需要执行某个操作,这比调用一个空的 lambda 表达式更高效,也让代码意图更清晰。
这种设计是软件工程中空对象模式(Null Object Pattern) 的一个经典应用。
代码逐段分析
a. 枚举定义与单例实例
// ... existing code ...enum Value implements Supplier, BinaryOperator, BiConsumer {DEFAULT;
// ... existing code ...
enum Value
: 将Value
定义为一个枚举类型。implements Supplier, BinaryOperator, BiConsumer
: 它同时实现了Gatherer
所需的三个核心函数式接口。这意味着Value
的实例本身就可以被当作这三种接口的任意一种来使用。DEFAULT;
: 这是该枚举的唯一实例。利用 Java 枚举的特性,DEFAULT
是一个天然的、线程安全的单例。在整个 JDK 中,所有需要“默认无操作行为”的地方,都会引用这同一个Value.DEFAULT
对象。
b. 特殊的 statelessCombiner
// ... existing code ...final BinaryOperator<Void> statelessCombiner = new BinaryOperator<>() {@Override public Void apply(Void left, Void right) { return null; }};
// ... existing code ...
- 这是一个非常特殊的字段。它是一个
BinaryOperator<Void>
,专门用于无状态(stateless) 的Gatherer
。 - 无状态的
Gatherer
其状态容器(State/Accumulator)的类型是Void
,在实践中通常用null
来表示。当并行执行流时,需要合并(combine)两个分支的状态。合并两个null
状态的结果,自然还是null
。 - 所以,
statelessCombiner
的apply
方法逻辑非常简单:接收两个Void
类型的参数(实际上都会是null
),然后返回null
。这为无状态的Gatherer
提供了一个正确且可执行的combiner
。
c. 接口方法的默认实现
// ... existing code ...// BiConsumer@Override public void accept(Object state, Object downstream) {}// BinaryOperator@Override public Object apply(Object left, Object right) {throw new UnsupportedOperationException("This combiner cannot be used!");}// Supplier@Override public Object get() { return null; }
// ... existing code ...
这里是 Value
作为三个接口的实例时,其方法的具体实现:
accept(Object state, Object downstream)
: 这是BiConsumer
接口的方法。它的实现是一个空方法体。当Value.DEFAULT
被用作finisher
(完成器) 时,表示在流的末尾不需要执行任何最终操作。调用它不会产生任何效果。get()
: 这是Supplier
接口的方法。它的实现是返回null
。当Value.DEFAULT
被用作initializer
(初始化器) 时,表示这是一个无状态的Gatherer
,其初始状态就是null
。apply(Object left, Object right)
: 这是BinaryOperator
接口的方法。它的实现是直接抛出UnsupportedOperationException
。这非常关键,它传达了一个明确的信息:这个默认的combiner
是一个“虚拟”的,它不能被实际调用。这通常用于那些本身就不支持并行化(non-concurrent)的Gatherer
。如果有人错误地尝试并行化一个这样的Gatherer
,就会立刻得到一个清晰的错误,而不是潜在的数据不一致问题。
d. 便捷的类型转换工厂方法
// ... existing code ...@ForceInline@SuppressWarnings("unchecked")<A> Supplier<A> initializer() { return (Supplier<A>)this; }@ForceInline@SuppressWarnings("unchecked")<T> BinaryOperator<T> combiner() { return (BinaryOperator<T>) this; }@ForceInline@SuppressWarnings("unchecked")<T, R> BiConsumer<T, Gatherer.Downstream<? super R>> finisher() {return (BiConsumer<T, Downstream<? super R>>) this;}
// ... existing code ...
这三个方法是 Value
枚举的点睛之笔。它们是便捷的工厂方法,用于获取类型安全的默认实例。
- 作用: 它们都只做一件事——返回
this
(即Value.DEFAULT
对象),但通过泛型和强制类型转换,将其“伪装”成调用者所期望的、带有正确泛型参数的函数式接口。 - 示例: 当
Gatherer.of(...)
工厂方法需要一个默认的initializer
时,它不需要自己写() -> null
,而是可以直接调用Value.DEFAULT.initializer()
。这样做既简洁,又能从编译器那里获得正确的类型Supplier<A>
。 @ForceInline
: 这个注解是给 JIT (Just-In-Time) 编译器的提示,建议它将这些方法的调用进行内联。因为方法体极其简单,内联后几乎没有额外的方法调用开销,性能极高。@SuppressWarnings("unchecked")
: 因为这里涉及到底层实现中的强制类型转换,所以用这个注解来抑制编译器关于“未经检查的转换”的警告。这是在明确知道类型安全可以由外部逻辑保证的情况下,一种常见的底层库编程技巧。
总结
enum Value
是 Gatherer
API 内部一个优雅而高效的实现细节。它通过一个单例枚举实例,完美地扮演了“空对象”的角色,为 Gatherer
的 initializer
、finisher
和 combiner
提供了统一、安全、高效的默认行为。
它的设计哲学体现了现代 Java API 的优秀实践:
- 用特定对象代替
null
,增强代码的健壮性。 - 利用接口和泛型,提供类型安全且灵活的 API。
- 提供可识别的哨兵值,允许进行高效的内部优化。
record GathererImpl<T, A, R>
GathererImpl
的设计目标非常纯粹:作为一个简单、透明、不可变的数据载体(Data Carrier),用于封装定义一个 Gatherer
所需的全部四个核心函数。
它利用了 Java 16 引入的 record
(记录)特性,该特性旨在简化用于“纯粹数据聚合”的类的创建。
a. record
定义与泛型参数
// ... existing code ...record GathererImpl<T, A, R>(@Override Supplier<A> initializer,@Override Integrator<A, T, R> integrator,@Override BinaryOperator<A> combiner,@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {
// ... existing code ...
record GathererImpl<T, A, R>
:record
关键字告诉编译器,这是一个记录类。编译器会自动为它生成:- 一个接收所有组件(
initializer
,integrator
等)的全参构造函数。 - 为每个组件生成一个同名的公共访问方法(例如
public Supplier<A> initializer()
)。 equals()
,hashCode()
和toString()
方法的默认实现。
- 一个接收所有组件(
- 这使得
GathererImpl
成为一个天然的、简洁的、不可变的(因为其组件都是final
的)数据聚合对象。
泛型参数:
T
: The type of input elements. (输入元素的类型)A
: The type of the mutable accumulator (or state). (可变累加器/状态的类型)R
: The type of result elements. (输出/结果元素的类型) 这三个泛型参数与Gatherer
接口的定义完全一致,贯穿了整个操作。
implements Gatherer<T, A, R>
:- 这明确表示
GathererImpl
是Gatherer
接口的一个具体实现。 - 因为
record
自动生成的访问器方法(initializer()
,integrator()
等)与Gatherer
接口中定义的方法签名完全匹配,所以GathererImpl
天然地满足了Gatherer
接口的契约。
- 这明确表示
b. 四个核心组件
GathererImpl
的主体就是它的四个组件,它们是定义一个 Gatherer
行为的“四要素”:
Supplier<A> initializer
(初始化器):- 职责: 创建并返回一个新的、可变的累加器(状态容器)。
- 调用时机: 在流处理开始时,或者在并行处理中为每个子任务开始时调用。
- 示例: 对于
toList()
,initializer
就是() -> new ArrayList<>()
。
Integrator<A, T, R> integrator
(整合器):- 职责: 这是核心处理逻辑。它定义了如何将一个输入元素
T
合并到累加器A
中,并有选择地将零个或多个结果R
推送到下游。 - 调用时机: 对于上游流中的每一个元素,都会调用此方法。
- 示例: 对于
map(f)
,integrator
的逻辑就是(state, element, downstream) -> downstream.push(f.apply(element))
。
- 职责: 这是核心处理逻辑。它定义了如何将一个输入元素
BinaryOperator<A> combiner
(合并器):- 职责: 在并行流(parallel stream)中,定义如何将两个并行的子任务产生的累加器
A
合并成一个。 - 调用时机: 当并行流的各个分支需要汇总结果时调用。
- 示例: 对于
toList()
,combiner
的逻辑就是(list1, list2) -> { list1.addAll(list2); return list1; }
。
- 职责: 在并行流(parallel stream)中,定义如何将两个并行的子任务产生的累加器
BiConsumer<A, Downstream<? super R>> finisher
(完成器):- 职责: 在所有输入元素都被整合(integrated)完毕后,执行最终的处理。这对于有状态的操作至关重要,例如,它可以在最后将累加器中剩余的元素全部推送到下游。
- 调用时机: 当上游流的所有元素都处理完成后,在流的末尾调用一次。
- 示例: 对于
windowFixed(size)
,当流结束时,如果累加器里还有未满一个窗口的元素,finisher
就会负责将这个最后的、不完整的窗口也推送出去。
c. 静态工厂方法 of
// ... existing code ...static <T, A, R> GathererImpl<T, A, R> of(Supplier<A> initializer,Integrator<A, T, R> integrator,BinaryOperator<A> combiner,BiConsumer<A, Downstream<? super R>> finisher) {return new GathererImpl<>(Objects.requireNonNull(initializer,"initializer"),Objects.requireNonNull(integrator, "integrator"),Objects.requireNonNull(combiner, "combiner"),Objects.requireNonNull(finisher, "finisher"));}
// ... existing code ...
- 职责: 这是创建
GathererImpl
实例的标准方式。它提供了一个清晰的、统一的构造入口。 Objects.requireNonNull(...)
: 这个工厂方法最重要的一个功能就是防御性编程。它对传入的每一个函数组件都进行了非空检查。- 这确保了任何一个
GathererImpl
实例内部的四个组件都绝对不会是null
。 - 这极大地简化了 JDK 内部处理
Gatherer
的代码,因为它们可以放心地调用gatherer.initializer()
等方法,而无需担心NullPointerException
。 - 如果用户在构造
Gatherer
时忘记提供某个必要的函数(并且没有使用Value.DEFAULT
这样的默认值),requireNonNull
会立即抛出异常,并带有明确的错误信息(如"initializer"
),这是一种“快速失败”(fail-fast)的设计原则,有助于及早发现和修复 bug。
- 这确保了任何一个
总结
GathererImpl
可以被看作是 Gatherer
接口的一个标准实现模板。它本身没有任何复杂的逻辑,只是一个“容器”。
- 角色: 它是一个不可变的元组(Tuple),忠实地持有定义
Gatherer
行为的四个函数。 - 优点:
- 简洁: 使用
record
大大减少了样板代码。 - 不可变: 一旦创建,其行为(即它包含的四个函数)就固定了,这使得它在并发环境中是线程安全的。
- 健壮: 通过
of
工厂方法的非空检查,保证了实例的有效性和内部状态的一致性。
- 简洁: 使用
当你调用 Gatherer.of(...)
或 Gatherers
类中任何一个预定义的工厂方法(如 map
, filter
, windowFixed
等)时,它们在内部创建并返回的,就是这样一个 GathererImpl
的实例,其中填充了实现特定操作所需的 initializer
, integrator
, combiner
和 finisher
。
Composite
Composite
类的唯一目标是:将两个 Gatherer
对象(我们称之为 left
和 right
)串联起来,形成一个全新的、功能上等同于两者顺序执行的单一 Gatherer
。
当用户写下 stream.gather(gatherer1.andThen(gatherer2))
时,andThen
方法在内部就会创建一个 Composite
实例,left
是 gatherer1
,right
是 gatherer2
。这个 Composite
对象本身也是一个 Gatherer
,可以被 stream.gather()
方法直接使用。
它实现了函数式编程中经典的函数组合(Function Composition)思想,但将其应用在了更复杂的 Gatherer
结构上。
// ... existing code ...static final class Composite<T, A, R, AA, RR> implements Gatherer<T, Object, RR> {private final Gatherer<T, A, ? extends R> left;private final Gatherer<? super R, AA, ? extends RR> right;// FIXME change `impl` to a computed constant when availableprivate GathererImpl<T, Object, RR> impl;
// ... existing code ...
implements Gatherer<T, Object, RR>
: 这是理解Composite
的关键。一个组合后的Gatherer
,其:- 输入类型
T
: 与第一个Gatherer
(left
) 的输入类型相同。 - 输出类型
RR
: 与第二个Gatherer
(right
) 的输出类型相同。 - 累加器类型
Object
: 这是一个难点。因为left
和right
各有自己的累加器(类型分别为A
和AA
),组合后的Gatherer
需要同时管理这两个累加器。为了类型擦除和实现的方便,这里统一使用Object
作为对外暴露的累加器类型,内部再通过一个包装对象来持有真正的A
和AA
实例。
- 输入类型
泛型参数:
Gatherer<T, A, R> left
: 第一个操作。它接收T
类型的输入,使用A
类型的累加器,产生R
类型的输出。Gatherer<R, AA, RR> right
: 第二个操作。它接收的输入类型R
必须是left
输出类型的超类,使用AA
类型的累加器,最终产生RR
类型的输出。
成员变量:
left
和right
: 分别存储了要组合的两个Gatherer
。private GathererImpl<T, Object, RR> impl
: 这是组合逻辑的核心。Composite
类本身只是一个外壳,它真正的Gatherer
行为(即initializer
,integrator
等四个核心函数)是被**懒加载(lazily computed)**并存储在这个impl
字段中的。FIXME
注释表明,开发者希望未来能用更现代的 Java 特性(如lazy
关键字)来优化这个懒加载过程。
构造与工厂方法
// ... existing code ...static <T, A, R, AA, RR> Composite<T, A, R, AA, RR> of(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {return new Composite<>(left, right);}private Composite(Gatherer<T, A, ? extends R> left,Gatherer<? super R, AA, ? extends RR> right) {this.left = left;this.right = right;}
// ... existing code ...
of(...)
是一个标准的静态工厂方法,用于创建Composite
实例。- 构造函数非常简单,只是将传入的
left
和right
Gatherer
存储起来。真正的组合逻辑被推迟到了第一次使用时。
核心:impl()
懒加载方法
// ... existing code ...@SuppressWarnings("unchecked")private GathererImpl<T, Object, RR> impl() {// ATTENTION: this method currently relies on a "benign" data-race// as it should deterministically produce the same result even if// initialized concurrently on different threads.var i = impl;return i != null? i: (impl = (GathererImpl<T, Object, RR>)impl(left, right));}
// ... existing code ...
这是 Composite
类中最复杂也最核心的方法。它负责在第一次被需要时,动态地构建出那个代表了组合行为的 GathererImpl
。
懒加载模式:
var i = impl;
: 先读取一次impl
字段。i != null ? i : ...
: 如果impl
已经计算过了(不为null
),就直接返回它。... : (impl = ...)
: 如果impl
还是null
,就调用另一个impl(left, right)
静态方法(这是真正干活的方法)来计算出新的GathererImpl
,将其赋值给impl
字段,然后返回。
“良性数据竞争” (Benign Data-Race):
ATTENTION
注释解释了这里的并发处理方式。这个懒加载没有使用synchronized
或volatile
。- 这意味着,如果在多线程环境下,多个线程同时调用
impl()
并且impl
恰好是null
,那么impl(left, right)
这个计算过程可能会被执行多次。 - 但设计者认为这是“良性的”,因为
impl(left, right)
的计算是确定性的(deterministic)——无论它被计算多少次,对于相同的left
和right
,其结果都是一个等价的GathererImpl
。 - 最终,多个线程中的一个会成功地将计算结果写入
impl
字段,其他线程的写入可能会覆盖它,但这没关系,因为覆盖物和被覆盖物是相同的。这是一种以牺牲一点点计算资源为代价,来避免锁开销的常见优化技巧。
组合逻辑的实现(impl(left, right)
方法
这个方法是 Gatherer
组合魔法的真正实现者,它负责将两个独立的 Gatherer
(left
和 right
)“编织”成一个功能完备的、全新的 GathererImpl
。
这个方法的代码很长,因为它包含了针对不同情况的优化路径。我们可以将其分为两个主要部分:特殊优化路径和通用路径。
1. 准备工作:提取函数与特征检测
// ... existing code ...static final <T, A, R, AA, RR> GathererImpl<T, ?, RR> impl(Gatherer<T, A, R> left, Gatherer<? super R, AA, RR> right) {final var leftInitializer = left.initializer();final var leftIntegrator = left.integrator();
// ... existing code ...final var rightFinisher = right.finisher();final var leftStateless = leftInitializer == Gatherer.defaultInitializer();final var rightStateless = rightInitializer == Gatherer.defaultInitializer();final var leftGreedy = leftIntegrator instanceof Integrator.Greedy;final var rightGreedy = rightIntegrator instanceof Integrator.Greedy;
// ... existing code ...
在做任何事情之前,代码首先做了两件准备工作:
提取函数: 将
left
和right
Gatherer
的四个核心函数(initializer
,integrator
,combiner
,finisher
)分别提取到局部变量中。这可以提高代码可读性,并可能带来微小的性能提升(避免重复调用接口方法)。特征检测: 它检查了
left
和right
的几个重要特征:leftStateless
/rightStateless
: 通过比较它们的initializer
是否是默认的initializer
(Value.DEFAULT
),来判断它们是否是无状态的 (stateless)。无状态的Gatherer
不需要创建和维护状态对象。leftGreedy
/rightGreedy
: 通过instanceof
检查,判断它们的integrator
是否是贪婪的 (greedy)。贪婪的integrator
保证了它在处理一个元素时,要么向下游推送一个元素,要么不推送,但绝不会“短路”(即向下游返回false
来终止流)。
这些特征检测的结果,将决定接下来走哪条执行路径。
2. 特殊优化路径:双重无状态与贪婪 (Stateless & Greedy Fast Path)
// ... existing code ...if (leftStateless && rightStateless && leftGreedy && rightGreedy) {return new GathererImpl<>(Gatherer.defaultInitializer(),Gatherer.Integrator.ofGreedy((unused, element, downstream) ->leftIntegrator.integrate(null,element,r -> rightIntegrator.integrate(null, r, downstream))),
// ... existing code ...);} else {
// ... existing code ...
这是为最理想情况设计的“快速通道”。当两个 Gatherer
都是无状态且都是贪婪的时(例如 map(...).andThen(filter(...))
),代码会进行大幅优化。
initializer
: 因为两者都无状态,所以组合后的Gatherer
也是无状态的,直接使用默认的initializer
。integrator
: 这是优化的核心。它创建了一个新的、也是贪婪的integrator
。其逻辑非常直接:(unused, element, downstream) ->leftIntegrator.integrate(null, // 因为 left 无状态,状态对象是 nullelement,// 这里是关键:leftIntegrator 的下游(downstream)// 被一个 lambda 表达式替代了。r -> rightIntegrator.integrate(null, // 因为 right 无状态,状态对象是 nullr, // r 是 left 处理完 element 后的输出downstream // 最终的下游))
element
的过程,就是简单地调用leftIntegrator
,其结果r
又被立刻送入rightIntegrator
,最后的结果直接推送到真正的下游。整个过程没有任何状态管理、没有短路检查,开销极小。combiner
和finisher
: 也做了相应的简化,因为没有状态,所以逻辑变得非常简单。
3. 通用路径:有状态或非贪婪 (Stateful or Non-Greedy General Path)
如果 left
或 right
中任何一个是有状态的或非贪婪的,就必须走这条更复杂的通用路径。这条路径的核心是定义了一个新的内部类 State
。
a. State
类:组合状态的容器
// ... existing code ...} else {class State {final A leftState;final AA rightState;boolean leftProceed;boolean rightProceed;
// ... existing code ...
这个 State
类是组合后 Gatherer
的统一状态容器。它封装了:
final A leftState
:left
Gatherer
的状态对象。final AA rightState
:right
Gatherer
的状态对象。boolean leftProceed
/rightProceed
: 用于跟踪left
和right
integrator
是否已经短路。这是处理非贪婪Gatherer
的关键。如果left
短路了,就不应该再给它提供新元素。如果right
短路了,left
也不应该再把处理结果推送给它。
b. State
类的核心方法
State()
(构造函数):- 负责初始化。它会检查
left
和right
是否是无状态的,如果是,则其状态就是null
;否则,就调用它们各自的initializer
来创建初始状态。 leftProceed
和rightProceed
初始都为true
。
- 负责初始化。它会检查
integrate(T t, Downstream<? super RR> c)
:- 这是组合后的
integrator
的核心逻辑。 - 它调用
leftIntegrator.integrate(...)
,但巧妙地将left
的下游替换为了this::rightIntegrate
。 - 这意味着,
left
处理完元素后,不会直接推送到最终下游,而是会调用State
类的rightIntegrate
方法。 - 它还负责检查和更新
leftProceed
和rightProceed
这两个短路标志位。
- 这是组合后的
rightIntegrate(R r, Downstream<? super RR> downstream)
:- 这个方法充当了
left
和right
之间的“管道”。 - 当
left
产生一个中间结果r
时,这个方法被调用。 - 它会检查
right
是否还能继续(rightProceed
),如果可以,就调用rightIntegrator.integrate(rightState, r, downstream)
,将r
送入right
Gatherer
进行处理,并将最终结果推送到下游。
- 这个方法充当了
finish(Downstream<? super RR> c)
:- 组合后的
finisher
。 - 它会依次调用
left.finisher()
和right.finisher()
。 - 调用
left.finisher()
时,同样会将其下游重定向到rightIntegrate
,以确保left
在结束时产生的任何剩余元素也能被right
正确处理。
- 组合后的
c. 构建最终的 GathererImpl
// ... existing code ...return new GathererImpl<T, State, RR>(State::new,(leftGreedy && rightGreedy)? Integrator.<State, T, RR>ofGreedy(State::integrate): Integrator.<State, T, RR>of(State::integrate),// ...State::finish);
// ... existing code ...
最后,通用路径会使用 State
类的方法引用来构建一个新的 GathererImpl
:
initializer
:State::new
integrator
:State::integrate
(并根据left
和right
是否都为贪婪来决定组合后的integrator
是否也为贪婪)combiner
:State::joinLeft
finisher
:State::finish
总结
Composite.impl
静态方法是一个高度工程化的杰作。它展示了如何在提供一个通用、强大功能(组合 Gatherer
)的同时,不牺牲性能。
- 它首先识别特殊情况:当两个
Gatherer
都是无状态和贪婪时,它会生成一个极度优化的、几乎没有额外开销的函数调用链。 - 然后提供通用解决方案:对于其他所有情况,它通过创建一个新的
State
类来封装两个Gatherer
的状态和短路逻辑,确保即使在复杂的有状态和非贪婪场景下,组合行为也能正确无误地执行。
这种“快速通道 + 通用后备”的设计模式,是高性能库中非常常见的技术,它确保了简单用例的性能最大化,同时保证了复杂用例的正确性。
andThen
提供了一种流畅的、函数式的方式来将两个 Gatherer
串联(compose)起来。如果说 Composite
类是实现 Gatherer
组合的“幕后黑手”,那么 andThen
方法就是暴露给开发者使用的、优雅的公开 API。
它的核心思想是函数组合:创建一个新的 Gatherer
,它会先用第一个 Gatherer
(this
)处理输入流,然后将第一个 Gatherer
的输出作为第二个 Gatherer
(that
)的输入,最终新 Gatherer
的输出是第二个 Gatherer
的输出。这完全等同于 g2(g1(input))
的概念。
1. andThen
方法源码
这是 Gatherer
接口中 andThen
方法的实现:
// ... existing code ...default <R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after) {Objects.requireNonNull(after);return new Composite<>(this, after);}// ... existing code ...
可以看到,它的实现非常简洁:
- 非空检查:使用
Objects.requireNonNull(after)
确保传入的Gatherer
不为null
。 - 创建 Composite 实例:直接
new Composite<>(this, after)
,将当前的Gatherer
(this
) 和传入的Gatherer
(after
) 作为构造函数参数,创建一个Composite
对象。Composite
类会处理所有复杂的组合逻辑,正如我们之前详细分析的那样。
2. 泛型解析
andThen
方法的泛型设计确保了类型安全,我们来解读一下:
<R_OUT> Gatherer<T, ?, R_OUT> andThen(Gatherer<? super R, ?, ? extends R_OUT> after)
this
:调用andThen
的Gatherer
实例,其类型是Gatherer<T, A, R>
。T
: 它的输入元素类型。R
: 它的输出结果类型。
after
: 作为参数传入的Gatherer
,其类型是Gatherer<? super R, ?, ? extends R_OUT>
。? super R
:after
的输入类型必须是this
输出类型R
的超类(或者是R
本身)。这保证了this
的输出可以安全地作为after
的输入。? extends R_OUT
:after
的输出类型是R_OUT
或其子类。
- 返回值:
Gatherer<T, ?, R_OUT>
T
: 组合后Gatherer
的输入类型与this
的输入类型相同。?
: 组合后Gatherer
的中间累加器类型被隐藏了。这是因为Composite
内部会管理两个Gatherer
各自的累加器,这个复杂的内部状态对外部调用者是不可见的,用?
表示最为合适。R_OUT
: 组合后Gatherer
的最终输出类型与after
的输出类型相同。
andThen
方法是 Gatherer
API 可组合性的关键体现。它优雅地隐藏了 Composite
类的实现细节,为开发者提供了一个符合直觉的、链式调用的接口来构建复杂的流处理管道。
通过 gatherer1.andThen(gatherer2).andThen(gatherer3)
这样的调用,开发者可以像拼接乐高积木一样,将简单、专一的 Gatherer
组合成功能强大的复合 Gatherer
,这正是函数式编程和现代 API 设计所推崇的模式。
总结
Composite
类是 Gatherer
API 可组合性的基石。它本身是一个轻量级的包装器,通过懒加载的方式,在需要时动态地将两个 Gatherer
的核心行为函数(四要素)进行精密的“编织”,生成一个新的、代表了两者串联行为的 GathererImpl
。
它完美地展示了如何将复杂的操作分解为可组合的、独立的单元,并通过一个通用的组合器(Composite
)将它们无缝地连接起来,这正是函数式和声明式编程强大威力的体现。
总结
Gatherers
类是 Stream.gather()
功能不可或缺的一部分。它不仅提供了多个即插即用的高效 Gatherer
实现,解决了窗口、扫描、并发映射等常见但复杂的流处理问题,而且其源代码本身就是学习如何正确、高效地实现 Gatherer
接口的最佳教材。通过分析 windowFixed
、scan
、mapConcurrent
等方法的实现,我们可以深刻理解 Gatherer
接口中状态、集成、收尾等概念的实际应用。