Java22 stream 新特性 窗口算子:GathererOp 和 GatherSink
GathererOp
GathererOp
是 Java Stream API 中 gather()
操作的核心运行时实现。它负责将用户定义的 Gatherer
应用于流中的元素,并生成新的流。
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
<T, A, R>
: 这是类的泛型参数:T
: 上游流中元素的类型,也是Gatherer
消费的元素类型。A
:Gatherer
内部状态累加器的类型。R
:Gatherer
产生的结果元素的类型,也是下游流中元素的类型。
extends ReferencePipeline<T, R>
:GathererOp
继承自ReferencePipeline
。ReferencePipeline
是 JDK Stream API 内部用于处理对象类型元素流的基类之一。- 这意味着
GathererOp
本身就是一个流操作阶段,它可以连接到其他流操作之前或之后。 - 它从上游接收类型为
T
的元素(实际上是P_OUT extends T
,如of
方法所示),并向下游产生类型为R
的元素。
根据类注释和代码结构,GathererOp
的主要作用是:
- 实现
gather
操作: 它是Stream.gather(Gatherer)
方法背后的实际执行者。 - 管理
Gatherer
的生命周期: 包括初始化、整合元素、合并(并行时)和完成。 - 支持顺序和并行执行:
GathererOp
能够根据流的执行模式(顺序或并行)选择合适的求值策略。 - 融合优化:
gather().gather()
融合: 如果一个gather
操作紧跟着另一个gather
操作,GathererOp
会尝试将这两个Gatherer
合并成一个,以减少中间流和操作开销。这是通过gatherer.andThen(gatherer)
实现的。gather().collect()
融合: 如果gather
操作后紧跟着一个collect
终端操作,GathererOp
会覆盖collect
方法,将Gatherer
的逻辑与Collector
的逻辑融合执行,避免生成中间集合,从而提高性能,尤其是在并行流中。
- 性能: 注释中提到 "The performance-critical code below contains some more complicated encodings",表明其内部实现对性能有较高要求,并可能采用了一些复杂的编码技巧。
静态工厂方法 of
@SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// 当一个 gather 操作附加到另一个 gather 操作上时,// 我们可以将它们融合成一个if (upstream.getClass() == GathererOp.class) {return new GathererOp<>(((GathererOp<P_IN, Object, P_OUT>) upstream).gatherer.andThen(gatherer),(GathererOp<?, ?, P_IN>) upstream);} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
- 这是创建
GathererOp
实例的入口。当在流上调用.gather()
时,内部会调用此方法。 - 参数:
upstream
: 上游的ReferencePipeline
,即前一个流操作。gatherer
: 用户提供的Gatherer
接口的实现。
- 逻辑:
- 它首先检查上游操作是否也是一个
GathererOp
。 - 如果是 (
upstream.getClass() == GathererOp.class
): 这意味着连续调用了.gather()
。此时,它会使用upstream.gatherer.andThen(gatherer)
将两个Gatherer
组合成一个新的Gatherer
,然后用这个组合后的Gatherer
和更上游的流(upstream.upstream()
,通过第二个构造函数间接获取)创建一个新的GathererOp
。这就是gather().gather()
融合。 - 如果不是: 则直接使用传入的
upstream
和gatherer
创建一个新的GathererOp
。
- 它首先检查上游操作是否也是一个
构造函数
GathererOp
有两个私有构造函数:
private GathererOp(ReferencePipeline<?, T> upstream, Gatherer<T, A, R> gatherer)
:- 用于处理第一个(或非融合的)
.gather()
调用。 - 它调用父类
ReferencePipeline
的构造函数,并根据gatherer.integrator()
的类型(是否为Integrator.Greedy
)设置操作标志(opFlags
),例如是否为短路操作。 - 保存传入的
gatherer
。
- 用于处理第一个(或非融合的)
private GathererOp(Gatherer<T, A, R> gatherer, GathererOp<?, ?, T> upstream)
:- 用于融合连续的
.gather()
调用。此时,gatherer
参数是已经通过andThen
组合后的Gatherer
。 upstream
参数是前一个GathererOp
。它会追溯到这个upstream GathererOp
的上游(即upstream.upstream()
)作为新GathererOp
的直接上游。- 同样设置操作标志并保存
gatherer
。
- 用于融合连续的
核心成员变量
final Gatherer<T, A, R> gatherer;
: 存储与此GathererOp
关联的Gatherer
实例。
内部类
a. NodeBuilder<X>
static final class NodeBuilder<X> implements Consumer<X> {// ...
}
- 作用: 在并行流评估中,
NodeBuilder
充当元素的惰性累加器。每个并行任务可能会使用一个NodeBuilder
来收集其处理的元素。最终,这些NodeBuilder
的内容会被构建成Node<X>
对象(Stream API 内部用于表示一批数据的结构),然后这些Node
可以被合并。 - 主要方法:
accept(X x)
: 向构建器中添加一个元素。它会维护一个rightMost
的SpinedBuffer.Builder
来高效追加。join(NodeBuilder<X> that)
: 将另一个NodeBuilder
的内容合并到当前构建器。为了避免创建过深的Node
树(不平衡的 Concat-trees),对于小的NodeBuilder
,它会直接将元素追加到当前rightMost
构建器中,而不是总是创建一个Nodes.conc
节点。LINEAR_APPEND_MAX
控制这个阈值。build()
: 从累积的元素中构建一个Node<X>
。
b. GatherSink<T, A, R>
static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {// ...
}
- 作用:
GatherSink
是连接Gatherer
逻辑和下游流操作(或终端操作)的桥梁。它实现了Sink<T>
接口来接收上游元素,并实现了Gatherer.Downstream<R>
接口供Gatherer
的integrator
和finisher
推送结果。 - 实现接口:
Sink<T>
: 使其能够作为流操作链中的一个环节,接收上游元素。begin(long size)
: 在处理开始前调用,用于初始化Gatherer
的状态 (state = gatherer.initializer().get()
) 和下游Sink
。accept(T t)
: 每当上游产生一个元素t
时调用。它会调用gatherer.integrator().integrate(state, t, this)
来处理元素并更新状态。proceed
标志会根据integrate
的返回值更新,用于支持短路。cancellationRequested()
: 判断是否应停止处理。这是实现短路的关键,它会检查自身的proceed
和downstreamProceed
标志以及下游sink.cancellationRequested()
。end()
: 在所有元素处理完毕后调用。它会调用gatherer.finisher().accept(state, this)
来执行最终处理,并通知下游sink.end()
。
Gatherer.Downstream<R>
: 作为Gatherer
的输出通道。isRejecting()
: 返回!downstreamProceed
,表示下游是否已拒绝接收更多元素。push(R r)
:Gatherer
通过此方法将处理结果r
推送出去。GatherSink
会将r
传递给下游的sink.accept(r)
。
- 状态变量:
sink
: 下游的Sink
。gatherer
: 当前的Gatherer
。integrator
: 缓存的gatherer.integrator()
,用于优化。state
:Gatherer
的内部状态。proceed
: 表示Gatherer
本身是否希望继续处理。downstreamProceed
: 表示下游Sink
是否希望继续接收元素。
关键方法的覆盖
opIsStateful()
:boolean opIsStateful() {// TODOreturn true; }
目前总是返回
true
,表示GathererOp
是一个有状态的操作。注释中提到了未来可能的优化方向,即根据Gatherer
的具体特性(是否有initializer
、combiner
、finisher
)来判断其是否真正有状态。有状态操作在并行流中通常需要更复杂的处理(例如,不能简单地流水线化)。opWrapSink(int flags, Sink<R> downstream)
:Sink<T> opWrapSink(int flags, Sink<R> downstream) {return new GatherSink<>(gatherer, downstream); }
这是
AbstractPipeline
中的一个核心方法,用于将下游的Sink
包装成当前操作需要的Sink
。在这里,它创建并返回一个GatherSink
实例。opEvaluateParallel(PipelineHelper<R> unused1, Spliterator<I> spliterator, IntFunction<R[]> unused2)
: 此方法定义了GathererOp
在并行流中的求值逻辑。它利用了evaluate
辅助方法,并使用NodeBuilder
来收集并行任务的结果,最后将这些结果合并成一个Node<R>
。opEvaluateParallelLazy(PipelineHelper<R> helper, Spliterator<P_IN> spliterator)
: 用于并行流的惰性求值。注释提到,只有非常特定类型的Gatherer
(无初始化器、有组合器、无完成器)才能直接、高效地表示为Spliterator
。目前的实现是先通过opEvaluateParallel
完全求值得到一个Node
,然后再从这个Node
创建Spliterator
,这意味着它不是真正的惰性并行。collect(...)
方法 (两个重载):@Override public <CR, CA> CR collect(Collector<? super R, CA, CR> c) {// ...return evaluate(...); }@Override public <RR> RR collect(Supplier<RR> supplier,BiConsumer<RR, ? super R> accumulator,BiConsumer<RR, RR> combiner) {// ...return evaluate(...); }
这两个方法覆盖了
Stream
接口的collect
终端操作。它们实现了gather().collect()
的融合。通过调用内部的evaluate
方法,将Gatherer
的逻辑和Collector
的逻辑一起执行,避免了先完成gather
生成中间结果再进行collect
的开销。
私有辅助方法 evaluate
evaluate
方法是 GathererOp
的“大脑”和“执行引擎”。它位于整个操作的核心,负责统筹和调度,根据不同的运行环境(串行/并行)和 Gatherer
的特性,选择最合适的执行策略来完成 gather
和下游 collect
操作。
它的核心职责可以概括为:接收一个数据源 (Spliterator
) 和一个终端收集器 (Collector
) 的组件,然后智能地选择并执行一条最优路径,最终返回收集器的结果。
这个方法是 GathererOp
中两个主要的终端操作 collect
的最终实现。你会发现 collect
方法的主要工作就是解析 Collector
,然后将所有组件(spliterator
, gatherer
, collector
的各个部分)打包传递给 evaluate
。
// ... existing code ...private <CA, CR> CR evaluate(final Spliterator<T> spliterator,final boolean parallel,final Gatherer<T, A, R> gatherer,final Supplier<CA> collectorSupplier,final BiConsumer<CA, ? super R> collectorAccumulator,final BinaryOperator<CA> collectorCombiner,final Function<CA, CR> collectorFinisher) {
// ... existing code ...
spliterator
: 数据源。parallel
: 一个布尔标志,指示当前流是否处于并行模式。这是决定执行路径的第一个关键分叉点。gatherer
: 用户提供的Gatherer
实例。collectorSupplier
,collectorAccumulator
,collectorCombiner
,collectorFinisher
: 这四个参数是下游Collector
的四个核心函数,被解构后传入。evaluate
方法将它们与Gatherer
的逻辑进行融合。
evaluate
的执行逻辑与决策树
evaluate
内部的执行逻辑可以看作一个清晰的决策树:
evaluate(...)
|
+-- 1. 是并行流吗 (is parallel?)|+-- 否 (No) -> **串行路径**| || +-- 创建一个 `Sequential` 实例| || +-- 调用 `sequential.evaluateUsing(spliterator)`| || +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **并行路径**|+-- 2. Gatherer 是否可合并 (has combiner?)|+-- 否 (No) -> **Hybrid 策略**| || +-- 创建一个 `Hybrid` 任务| || +-- `invoke()` 执行任务| || +-- `get()` 获取共享的 `Sequential` 实例| || +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **Parallel 策略**|+-- 创建一个 `Parallel` 任务|+-- `invoke()` 执行任务|+-- `get()` 获取根任务合并后的 `Sequential` 实例|+-- 调用 `sequential.get()` 返回最终结果
现在我们把这个决策树和代码对应起来看:
1. 串行路径 (!parallel
)
// ... existing code ...if (!parallel)return new Sequential().evaluateUsing(spliterator).get();
// ... existing code ...
这是最简单直接的路径。如果流不是并行的,evaluate
会:
new Sequential()
: 创建一个Sequential
实例。在这个实例的构造函数中,Gatherer
的状态 (state
) 和Collector
的容器 (collectorState
) 都被初始化了。.evaluateUsing(spliterator)
: 驱动Sequential
实例开始处理spliterator
中的所有元素。如我们之前分析的,这个过程将Gatherer
的输出直接推送给Collector
的累加器。.get()
: 在所有元素处理完毕后,调用Sequential
的get
方法,该方法会依次执行Gatherer
的finisher
和Collector
的finisher
,产出最终结果。
2. 并行路径 (parallel
)
当 parallel
为 true
时,evaluate
进入并行处理逻辑。它首先要做的就是判断应该使用 Hybrid
还是 Parallel
策略。
// ... existing code ...// Parallel section starts here:final var combiner = gatherer.combiner();// ... (Hybrid 和 Parallel 类的定义) ...if (combiner == Gatherer.defaultCombiner()) {// NO COMBINER -> HYBRIDreturn new Hybrid(spliterator).invoke().get();} else {// HAS COMBINER -> PARALLELreturn new Parallel(spliterator).invoke().get();}
// ... existing code ...
(注:为了清晰,将 Hybrid
和 Parallel
的调用逻辑移到了一起,实际代码中它们的定义在调用之前)
gatherer.combiner()
: 这是决策的关键。evaluate
获取Gatherer
的合并器。if (combiner == Gatherer.defaultCombiner())
: 判断Gatherer
是否提供了有效的合并器。true
(不可合并): 选择Hybrid
策略。创建一个Hybrid
根任务,并通过invoke()
启动ForkJoin
计算。计算完成后,invoke()
返回根任务,我们再调用.get()
获取最终的Sequential
实例(在Hybrid
模式下,所有任务共享这一个实例),最后调用Sequential
实例的.get()
方法获取最终结果。false
(可合并): 选择Parallel
策略。创建一个Parallel
根任务,并通过invoke()
启动。在计算过程中,每个叶子任务有自己的Sequential
实例,结果会逐级向上合并。invoke()
完成后,返回的根任务中包含了完全合并后的Sequential
实例。最后同样调用这个实例的.get()
方法获取最终结果。
总结
evaluate
方法是 GathererOp
实现高性能和高灵活性的基石。它通过对 parallel
标志和 gatherer.combiner()
的判断,精确地将执行流导向三个专门设计的执行器之一:
Sequential
: 用于串行流,实现了Gatherer
和Collector
的零开销融合。Hybrid
: 用于并行的、但Gatherer
不可合并的场景。它通过“上游并行,下游串行”的混合模式,在保证正确性的前提下最大化并行度。Parallel
: 用于并行的、且Gatherer
可合并的场景。它实现了彻底的分治并行计算,将性能发挥到极致。
这个方法完美地体现了Java Stream API在设计上的深思熟虑,即如何将一个复杂的操作分解,并为不同的场景提供高度优化的执行路径。
GatherSink
GatherSink
是 Gatherer
操作在串行流(Sequential Stream)或作为并行处理中叶子节点的执行体。它扮演着一个“适配器”和“状态管理器”的关键角色,连接上游的元素流和下游的 Sink
。
- 角色:它是一个
Sink
(接收器)。在 Stream 的流水线模型中,每个操作都会包装下游的Sink
,形成一个链条。GatherSink
正是gather()
操作的Sink
实现。 - 双重身份:它实现了两个关键接口:
Sink<T>
: 使其能被上游操作调用,接收上游流过来的元素T
。Gatherer.Downstream<R>
: 使其能被Gatherer
的integrator
和finisher
调用,将处理后的结果R
推送给下游。
- 核心目标:
- 管理
Gatherer
的生命周期:调用initializer
创建初始状态,在接收每个元素时调用integrator
,在流结束时调用finisher
。 - 管理
Gatherer
的状态(state
)。 - 处理短路(Short-Circuiting):当
Gatherer
或下游Sink
不再需要更多元素时,能够有效地向上游传递“取消”信号。
- 管理
我们来逐个部分解析 GatherSink
的实现。
// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink;private final Gatherer<T, A, R> gatherer;private final Integrator<A, T, R> integrator; // Optimization: reuseprivate A state;private boolean proceed = true;private boolean downstreamProceed = true;GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {this.gatherer = gatherer;this.sink = sink;this.integrator = gatherer.integrator(); // 提前获取,避免重复调用}
// ... existing code ...
sink
: 指向下游的Sink
。GatherSink
处理完元素后,通过这个sink
将结果传递下去。gatherer
: 当前操作所使用的Gatherer
实例。integrator
: 这是从gatherer
中提前获取的integrator
。这是一个性能优化,因为integrator
是accept
方法中的热点代码,提前缓存可以避免在每次调用时都通过gatherer.integrator()
访问。state
:Gatherer
的状态对象,由initializer
创建。proceed
: 一个布尔标志,表示当前Gatherer
是否还想继续处理上游的元素。如果integrator
返回false
,这个标志就会被设为false
。downstreamProceed
: 一个布尔标志,表示下游Sink
是否还想接收元素。它通过调用sink.cancellationRequested()
来更新。
这两个 proceed
标志是实现短路机制的核心。
Sink<T>
接口实现 - 与上游交互
这是 GatherSink
作为标准 Sink
的行为。
// ... existing code ...// java.util.stream.Sink contract below:@Overridepublic void begin(long size) {final var initializer = gatherer.initializer();if (initializer != Gatherer.defaultInitializer()) // 优化:如果不是默认的无操作initializerstate = initializer.get();sink.begin(-1); // GathererOp 通常不知道输出大小,所以传递-1}@Overridepublic void accept(T t) {// ... 性能优化注释 ...proceed &= integrator.integrate(state, t, this);}@Overridepublic boolean cancellationRequested() {return cancellationRequested(proceed && downstreamProceed);}private boolean cancellationRequested(boolean knownProceed) {// 高性能敏感区域return !(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)));}@Overridepublic void end() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher()) // 优化:如果不是默认的无操作finisherfinisher.accept(state, this);sink.end();state = null; // 帮助GC}
// ... existing code ...
begin(long size)
: 在流处理开始时调用。它会调用gatherer
的initializer
来创建初始状态state
,然后通知下游sink
处理开始。accept(T t)
: 这是最核心的方法,每当上游传来一个元素t
时被调用。- 它调用
integrator.integrate(state, t, this)
。integrator
会处理这个元素,可能会更新state
,也可能会通过this
(即Gatherer.Downstream
)向下游推送零个或多个结果。 integrator
返回一个布尔值,表示它自己是否还想接收更多元素。proceed &= ...
这个写法非常巧妙。它确保一旦proceed
变为false
,它就再也不会变回true
。这是一个比if (!integrator.integrate(...)) proceed = false;
更高效的无分支写法。
- 它调用
cancellationRequested()
: 上游操作会调用这个方法来查询是否应该停止发送元素。- 它同时检查
proceed
(自己是否想继续) 和downstreamProceed
(下游是否想继续)。 cancellationRequested(boolean knownProceed)
内部的逻辑!(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)))
稍微复杂,我们分解一下:- 如果
knownProceed
是false
(意味着proceed
或downstreamProceed
已经是false
),则直接返回true
(请求取消)。 - 如果
knownProceed
是true
,则检查!sink.cancellationRequested()
。 - 如果下游没有请求取消,
!sink.cancellationRequested()
为true
,整个&&
表达式为true
,取反后为false
(不请求取消)。 - 如果下游请求了取消,
!sink.cancellationRequested()
为false
。由于||
的短路特性,会执行downstreamProceed = false
,将下游的取消状态缓存起来。然后||
表达式结果为false
,整个&&
表达式为false
,取反后为true
(请求取消)。
- 如果
- 它同时检查
end()
: 在流处理结束时调用。它会调用gatherer
的finisher
来处理最终的状态,可能会向下游推送最后的元素。然后通知下游sink
处理结束。
Gatherer.Downstream<R>
接口实现 - 与 Gatherer
内部交互
这是 GatherSink
作为 integrator
和 finisher
的回调通道的行为。
// ... existing code ...// Gatherer.Sink contract below:@Overridepublic boolean isRejecting() {return !downstreamProceed;}@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 将结果推给下游return !cancellationRequested(p);}}
// ... existing code ...
push(R r)
:integrator
或finisher
通过调用这个方法来产出结果。- 它首先检查
downstreamProceed
,如果下游还在接收,就调用sink.accept(r)
将结果r
推送下去。 - 然后它返回一个布尔值,告诉调用者(
integrator
/finisher
)处理完这次push
后,整个流水线是否还希望继续。这个返回值基于cancellationRequested
的结果,这样integrator
就可以在push
之后立即知道是否应该短路。
- 它首先检查
isRejecting()
: 允许integrator
查询下游是否已经拒绝接收更多元素。这对于某些复杂的Gatherer
很有用,它们可能在push
之前就想知道下游的状态。
总结
GatherSink
是连接 Stream
流水线和 Gatherer
逻辑的核心桥梁。它通过实现 Sink
和 Gatherer.Downstream
两个接口,完美地扮演了双重角色:
- 对上游,它是一个标准的
Sink
,遵循begin -> accept* -> end
的生命周期,并能通过cancellationRequested
向上游传递短路信号。 - 对
Gatherer
内部,它是一个Downstream
回调,为integrator
和finisher
提供了推送结果 (push
) 和查询下游状态 (isRejecting
) 的能力。
它的设计充满了性能优化的考量,例如缓存 integrator
、使用无分支的 &=
操作、以及精巧的 cancellationRequested
逻辑,确保了 gather
操作在串行模式下的高效执行。
Sequential
Sequential
类是 GathererOp
中 evaluate
方法的核心组件之一,它专门用于 串行(Sequential) 执行模式。它的设计目标是将一个 Gatherer
和一个下游的 Collector
融合在一起,形成一个单一的、高效的串行处理单元。
- 角色:它是一个融合了
Gatherer
逻辑和Collector
逻辑的处理器。它同时扮演了多个角色,通过实现Consumer<T>
和Gatherer.Downstream<R>
接口来完成。 - 核心目标:
- 避免中间结果物化:常规的
stream.gather(...).collect(...)
会先由gather
操作完全处理完所有元素,生成一个中间的Stream<R>
,然后再由collect
操作来消费这个中间流。Sequential
类通过融合,使得Gatherer
产生一个结果R
后,能立即被Collector
的accumulator
消费,从而避免了创建和存储整个中间结果集,极大地提升了效率和降低了内存消耗。 - 统一处理逻辑:它为串行流和
Hybrid
并行模式的串行处理阶段提供了一个统一的、可复用的执行体。
- 避免中间结果物化:常规的
我们来逐个部分解析 Sequential
的实现。
// ... existing code ...// Sequential is the fusion of a Gatherer and a Collector which can// be evaluated sequentially.final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {A state;CA collectorState;boolean proceed;Sequential() {if (initializer != Gatherer.defaultInitializer())state = initializer.get();collectorState = collectorSupplier.get();proceed = true;}
// ... existing code ...
implements Consumer<T>, Gatherer.Downstream<R>
: 这个类的双重身份。Consumer<T>
: 使其accept(T)
方法可以被Spliterator.forEachRemaining
或tryAdvance
调用,从而接收上游流过来的原始元素T
。Gatherer.Downstream<R>
: 使其push(R)
方法可以被Gatherer
的integrator
调用,从而接收Gatherer
处理后产生的结果R
。
state
:Gatherer
的状态对象,由gatherer.initializer()
创建。collectorState
:Collector
的状态对象(即累加容器),由collectorSupplier.get()
创建。proceed
: 一个布尔标志,用于处理非greedy
模式下的短路。当integrator
返回false
时,它会被设为false
,从而终止evaluateUsing
中的循环。
evaluateUsing(Spliterator<T> spliterator)
- 驱动方法
// ... existing code ...@ForceInlineSequential evaluateUsing(Spliterator<T> spliterator) {if (greedy)spliterator.forEachRemaining(this);elsedo {} while (proceed && spliterator.tryAdvance(this));return this;}
// ... existing code ...
- 这是驱动整个串行处理流程的入口。
if (greedy)
: 如果Gatherer
是greedy
的(即不会短路),就一次性调用spliterator.forEachRemaining(this)
。这会遍历spliterator
中的所有元素,并对每个元素调用this.accept(T)
方法。这是最高效的处理方式。else
: 如果Gatherer
可能会短路,则使用do-while
循环和spliterator.tryAdvance(this)
。tryAdvance
一次只处理一个元素,并且循环条件会检查proceed
标志。一旦proceed
变为false
,循环就会立即终止,实现短路。
accept(T t)
- 实现 Consumer<T>
// ... existing code ...@Overridepublic void accept(T t) {/** ...*/var ignore = integrator.integrate(state, t, this)|| (!greedy && (proceed = false));}
// ... existing code ...
- 当
evaluateUsing
拉取上游元素时,此方法被调用。 integrator.integrate(state, t, this)
: 这是核心调用。它将上游元素t
、Gatherer
的当前状态state
交给integrator
处理。this
作为Downstream
被传入,以便integrator
可以通过push
方法输出结果。|| (!greedy && (proceed = false))
: 这是一个精巧的短路实现。integrator.integrate
返回一个布尔值,true
表示希望继续,false
表示不希望继续。- 如果
integrator
返回true
,由于||
的短路特性,后面的部分不会执行,proceed
保持true
。 - 如果
integrator
返回false
,则会执行||
后面的部分。!greedy
条件确保这只在非贪婪模式下发生,然后proceed = false
被执行,进行短路。
push(R r)
- 实现 Gatherer.Downstream<R>
// ... existing code ...@Overridepublic boolean push(R r) {collectorAccumulator.accept(collectorState, r);return true;}
// ... existing code ...
- 当
integrator
内部决定要产出一个结果r
时,它会调用downstream.push(r)
,也就是这个方法。 collectorAccumulator.accept(collectorState, r)
: 这就是融合的关键所在。Gatherer
产生的结果r
,没有被放入任何中间集合,而是被直接传递给了Collector
的累加器。return true;
: 在这个融合的场景下,Collector
本身是不能短路的(这是Collector
的规范),所以它总是告诉Gatherer
的integrator
“可以继续推数据给我”。
get()
- 获取最终结果
// ... existing code ...@SuppressWarnings("unchecked")public CR get() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher())finisher.accept(state, this);// IF collectorFinisher == null -> IDENTITY_FINISHreturn (collectorFinisher == null)? (CR) collectorState: collectorFinisher.apply(collectorState);}
// ... existing code ...
- 在所有元素都被
evaluateUsing
处理完毕后,调用此方法来获取最终结果。 gatherer.finisher().accept(state, this)
: 首先,调用Gatherer
的finisher
,让它可以处理最终的状态,并可能通过push
输出最后的元素。collectorFinisher.apply(collectorState)
: 然后,调用Collector
的finisher
,对累加容器collectorState
进行最终的转换,得到最终结果CR
。如果Collector
有IDENTITY_FINISH
特性,则直接返回累加容器本身。
总结
Sequential
类是一个高度优化的、专门用于串行执行的融合处理器。它通过巧妙地实现 Consumer
和 Downstream
接口,将 Gatherer
的输出无缝对接到 Collector
的输入,避免了不必要的中间数据结构,并将 Gatherer
和 Collector
的整个生命周期(初始化、累加、终结)紧密地结合在一起,形成了一个高效的单一处理循环。这对于提升串行流的性能至关重要。
Hybrid
Hybrid
是 Gatherer
操作在并行流(Parallel Stream)中,针对没有提供 combiner
的有状态 Gatherer
的一种特殊执行策略。它的名字“Hybrid”(混合)恰如其分地描述了它的工作模式:上游并行处理,下游串行。
Hybrid
的设计借鉴了 ForEachOrderedTask
,它将任务分解成一个链表式的结构,确保了即使任务被 ForkJoinPool
中的不同线程并行执行,最终结果的处理顺序也和原始 Spliterator
的顺序一致。
其工作流程可以概括为:
- 任务分裂(
compute
):将原始的Spliterator
递归地分裂成更小的块,形成一个任务树。关键在于,它会维护一个leftPredecessor
(左前驱)的引用,构建出一个逻辑上的任务链表。 - 上游并行处理(
compute
-greedy
分支):对于叶子任务,如果Gatherer
是greedy
的(即它会消耗所有上游元素),Hybrid
会先执行完所有上游操作(如map
,filter
),并将结果缓冲到一个NodeBuilder
中。这一步是完全并行的。 - 串行化处理(
onCompletion
):当一个任务完成时(无论是完成了上游处理,还是它本身就是叶子节点),onCompletion
方法会被调用。这个方法会按照任务链表的顺序,依次执行localResult.evaluateUsing(s)
,即调用Sequential
实例来串行地处理自己持有的那一小块数据。 - 短路支持(
cancelled
):通过一个共享的AtomicBoolean cancelled
标志,实现非greedy
模式下的短路。一旦下游的Sequential
处理器表示不再需要数据 (proceed
变为false
),就会设置cancelled
标志,后续的任务在执行前会检查此标志,从而避免不必要的计算。
为什么需要Hybrid
场景:一个无法并行合并的 Gatherer
让我们以 Gatherers.windowSliding(3)
为例。这是一个典型的有状态且无法并行合并的 Gatherer
。
- 有状态:它需要维护一个内部队列(大小最多为3),记录最近看到的元素,以便生成滑动窗口。
- 无法并行合并:假设我们有一个流
[1, 2, 3, 4, 5, 6]
。- 线程A处理
[1, 2, 3]
,它的最终状态是队列[1, 2, 3]
,并输出了窗口[1, 2, 3]
。 - 线程B处理
[4, 5, 6]
,它的最终状态是队列[4, 5, 6]
,并输出了窗口[4, 5, 6]
。 - 现在,我们如何合并这两个结果?我们丢失了
[2, 3, 4]
和[3, 4, 5]
这两个跨越了数据块边界的窗口。没有一个通用的combiner
能猜到需要这样去合并。因此,它没有提供combiner
。
- 线程A处理
对于这样一个 Gatherer
,如果没有 Hybrid
策略,在并行流中,我们只有两个选择,但都不理想:
完全串行执行:
- 做法:放弃并行,整个流从头到尾都在一个线程里执行。
- 优点:能得到正确的结果。
- 缺点:性能极差。如果流是这样的:
source.filter(...).map(...).gather(windowSliding(3))
,那么filter
和map
这两个本可以高效并行的无状态操作,也被迫在单线程里执行,完全浪费了多核CPU的优势。
常规的并行执行(会出错):
- 做法:像处理
map
或filter
一样,把数据分块,让每个线程独立处理自己的那部分gather
逻辑。 - 优点:速度快。
- 缺点:结果是错误的!就像上面例子展示的,我们会丢失所有跨越数据块边界的窗口。
- 做法:像处理
这就是 Hybrid
策略要解决的核心矛盾:我们既想要利用多核CPU并行处理 filter
和 map
等上游操作来提升性能,又必须保证 windowSliding
这个 Gatherer
本身是按顺序处理元素的以确保结果正确。
Hybrid
(混合)策略的名字完美地诠释了它的解决方案:把一个流处理过程拆分成两部分,分别对待。
上游并行(Upstream Parallelism):
Hybrid
任务在ForkJoinPool
中被分裂成许多小任务,分布到不同CPU核心上。- 每个任务负责一小块数据。它会首先执行完所有上游的无状态操作(如
filter
,map
)。比如,线程A处理source
的前1000个元素,它会先对这1000个元素进行filter
和map
。线程B同时对后1000个元素做同样的事。 - 这一步是完全并行的,极大地利用了CPU资源。
- 处理完的结果被临时缓冲起来(存入
NodeBuilder
)。
下游串行(Downstream Serial):
- 当上游的并行计算完成后,
Hybrid
任务会进入onCompletion
阶段。 Hybrid
内部通过一个精巧的链表结构 (next
指针) 保证了,即使各个任务块的并行计算完成时间是乱序的,它们向下游提交数据(即执行gather
逻辑)的顺序也严格遵循原始数据流的顺序。- 所有任务共享同一个
Sequential
实例,这个实例包含了Gatherer
的状态。 - 任务A处理完后,把它的缓冲结果交给
Sequential
实例处理;然后,任务B才能把它缓冲的结果交过来。这就确保了windowSliding
看到的元素流是[..., element_N, element_N+1, ...]
这样连续不断的,从而可以正确地生成所有窗口。
- 当上游的并行计算完成后,
所以,Hybrid
策略的必要性在于:
- 对于那些有状态且不可合并的
Gatherer
,我们不能使用常规的并行模式,否则结果会出错。 - 但我们又不想因为这一个
Gatherer
操作而放弃整个流的并行化潜力,让map
、filter
等无辜的操作也跟着串行执行,这会造成巨大的性能损失。
Hybrid
提供了一个两全其美的方案:它像一个聪明的调度器,将可以并行的部分(上游无状态操作)充分并行化,然后将必须串行的部分(有状态的 Gatherer
逻辑)严格串行化,并在这两者之间建立了一座桥梁,从而在保证结果正确性的前提下,最大化地压榨了多核CPU的性能。
类的定义与关键成员变量
// ... existing code ...@SuppressWarnings("serial")final class Hybrid extends CountedCompleter<Sequential> {private final long targetSize;private final Hybrid leftPredecessor;private final AtomicBoolean cancelled;private final Sequential localResult;private Spliterator<T> spliterator;private Hybrid next;private static final VarHandle NEXT = MhUtil.findVarHandle(MethodHandles.lookup(), "next", Hybrid.class);
// ... existing code ...
extends CountedCompleter<Sequential>
: 继承自CountedCompleter
,这是ForkJoin
框架中用于处理完成依赖关系的核心类。它的结果类型是Sequential
,即下游的串行处理器。targetSize
:ForkJoin
任务分裂的建议目标大小。leftPredecessor
: 指向其在逻辑顺序上的前一个任务。这是保证顺序执行的关键。cancelled
: 一个共享的AtomicBoolean
,用于在非greedy
模式下实现短路。所有分裂出的任务共享同一个cancelled
实例。localResult
: 下游的串行处理器。所有任务也共享同一个Sequential
实例,因为处理逻辑必须是串行的。spliterator
: 当前任务负责处理的数据片段。next
: 指向其在逻辑顺序上的后一个任务。通过VarHandle
进行原子更新,用于在任务完成时触发下一个任务的执行。
compute()
- 任务分裂与上游并行化
Hybrid.compute()
的执行流程可以分为两个主要阶段:
- 任务分裂阶段(Task Splitting):通过一个
while
循环,递归地将大的数据块(Spliterator
)分解成更小的、适合并行处理的子任务。这个阶段的核心是构建一个逻辑上的任务链表,为后续的有序处理做准备。 - 叶子任务处理阶段(Leaf Task Processing):当任务块小到不再适合继续分裂时,循环停止。此时,当前任务成为“叶子任务”,它需要真正地处理自己所持有的那一小块数据。“存入临时
nb
”就发生在这个阶段。
// ... existing code ...@Overridepublic void compute() {var task = this;Spliterator<T> rightSplit = task.spliterator, leftSplit;long sizeThreshold = task.targetSize;boolean forkRight = false;
// ... existing code ...
- 初始化:
task
指向当前执行的Hybrid
实例。rightSplit
是当前任务需要处理的数据源。sizeThreshold
是决定是否继续分裂的阈值。
1. 任务分裂阶段 (while
循环)
// ... existing code ...while ((greedy || !cancelled.get())&& rightSplit.estimateSize() > sizeThreshold&& (leftSplit = rightSplit.trySplit()) != null) {var leftChild = new Hybrid(task, leftSplit, task.leftPredecessor);var rightChild = new Hybrid(task, rightSplit, leftChild);/* leftChild and rightChild were just created and not* fork():ed yet so no need for a volatile write*/leftChild.next = rightChild;// ... (pending count 和 fork 逻辑 和 ForEachOrdered一样) ...// 这部分是 ForkJoin 框架的控制逻辑,用于管理任务依赖和执行// 核心思想是交替地 fork 左/右子任务,让当前线程继续处理另一半if (forkRight) {rightSplit = leftSplit;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;}
// ... existing code ...
- 循环条件:
(greedy || !cancelled.get())
: 如果不是greedy
模式(即可能会短路),则每次分裂前都检查是否已被取消。rightSplit.estimateSize() > sizeThreshold
: 数据块足够大,值得分裂。(leftSplit = rightSplit.trySplit()) != null
: 数据源支持分裂。
- 创建子任务:创建
leftChild
和rightChild
两个新的Hybrid
任务。 - 构建链表:
leftChild.next = rightChild;
这一行至关重要。它在任务树的兄弟节点之间建立了一个逻辑上的先后关系。这保证了即使leftChild
和rightChild
被不同线程并行执行,我们也能在将来按正确的顺序处理它们的结果。 fork()
:rightChild.fork()
或leftChild.fork()
将一个子任务提交给ForkJoinPool
,让其他空闲线程可以“窃取”并执行它。当前线程则继续在循环中处理剩下的那一半数据。
这个分裂过程会一直持续,直到数据块小得不能再分,此时 while
循环退出,进入叶子任务处理阶段。
2. 叶子任务处理阶段 (关键部分)
// ... existing code .../** ...* IMPORTANT: Currently we only perform the processing of this* upstream data if we know the operation is greedy -- as we cannot* safely speculate on the cost/benefit ratio of parallelizing* the pre-processing of upstream data under short-circuiting.*/if (greedy && task.getPendingCount() > 0) {// Upstream elements are bufferedNodeBuilder<T> nb = new NodeBuilder<>();rightSplit.forEachRemaining(nb); // Run the upstreamtask.spliterator = nb.build().spliterator();}task.tryComplete();}
// ... existing code ...
if (greedy && ...)
: 这个优化只在greedy
模式下进行。greedy
意味着Gatherer
必须处理完所有上游元素才能完成,不会短路。这使得我们可以安全地预处理所有数据。NodeBuilder<T> nb = new NodeBuilder<>();
: 创建一个临时的缓冲区。rightSplit.forEachRemaining(nb);
: 这就是实现上游并行的魔法所在!rightSplit
是什么?它不是原始的数据源,而是经过了上游所有操作(如filter
,map
)层层包装后的Spliterator
。forEachRemaining(nb)
的作用是:拉动rightSplit
,让它开始吐出元素。每吐出一个元素,上游的map
,filter
等操作就会被执行。nb
是一个Sink
(通过accept
方法),它接收这些经过上游操作处理后的结果,并把它们缓冲在内部。- 为什么能并行? 因为此时,多个不同的叶子任务(
Hybrid
实例)正在不同的CPU核心上同时执行它们的compute
方法。每个叶子任务都在调用forEachRemaining
,从而驱动它所负责的那一小块数据流过上游的map
/filter
流水线。这就实现了对上游无状态操作的并行处理。
task.spliterator = nb.build().spliterator();
: 处理完成后,nb
中包含了当前任务块所有处理好的元素。我们用nb.build()
将其固化成一个Node
,然后替换掉任务原来的spliterator
。现在,task.spliterator
指向的是一块已经过上游并行处理、结果被缓存的内存数据。task.tryComplete();
: 通知ForkJoin
框架,当前任务的compute
阶段已经完成。这会触发onCompletion
的执行。
总结与流程梳理
我们把整个流程串起来看:
- 一个大的
Hybrid
任务开始执行compute
。 while
循环将任务分裂成许多小的叶子任务,这些任务被fork()
到ForkJoinPool
中,由不同的线程并行执行。同时,通过next
指针构建了一个逻辑顺序链。- 并行阶段:每个叶子任务在其
compute
方法中,调用forEachRemaining
,并行地执行gather
之前的所有map
,filter
等操作,并将结果缓冲到各自的NodeBuilder
中。 - 串行阶段的准备:当一个叶子任务的
compute
完成后,它调用tryComplete()
,这会触发onCompletion
。 - 串行阶段:
onCompletion
方法是按next
链表顺序被调用的。它会把自己缓冲好的数据(task.spliterator
)交给下游唯一的、共享的Sequential
处理器进行串行处理。
所以,“存入临时 nb
”这个步骤,本质上是将上游的并行计算和下游的串行处理解耦开。它允许我们在上游尽情地利用多核并行计算,然后把计算结果(乱序到达的)先暂存起来,最后再由 onCompletion
机制像接力赛一样,一棒一棒地、按正确的顺序,把这些暂存的结果交给下游的串行处理器。
onCompletion()
- 串行化处理与任务串联
// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {var s = spliterator;spliterator = null; // GC// 1. 执行串行处理if (s != null&& (greedy || !cancelled.get())&& !localResult.evaluateUsing(s).proceed // 调用 Sequential 处理器&& !greedy)cancelled.set(true); // 如果需要,设置短路标志// 2. 触发下一个任务@SuppressWarnings("unchecked")var leftDescendant = (Hybrid) NEXT.getAndSet(this, null);if (leftDescendant != null) {leftDescendant.tryComplete();}}
// ... existing code ...
- 执行串行处理:这是
Hybrid
模式的核心。当一个任务完成其compute
阶段后,onCompletion
被调用。它会调用共享的localResult
(一个Sequential
实例) 的evaluateUsing
方法,把自己持有的spliterator
(对于greedy
模式,这是缓冲后的Node
的spliterator
) 交给它处理。由于所有任务共享同一个localResult
,并且通过next
指针的串联保证了onCompletion
的调用顺序,这里的处理实际上是串行的。 - 触发下一个任务:
NEXT.getAndSet(this, null)
是一个原子操作,它获取并清空next
引用。如果next
任务(即leftDescendant
)存在,就调用它的tryComplete()
。这就像一个接力赛,当前任务完成处理后,拍一下下一个任务的肩膀,让它开始处理。
总结
Hybrid
类是一个非常精巧的设计,它在面对“有状态且不可并行合并”的 Gatherer
时,找到了一条兼顾并行性能和顺序要求的中间道路:
- 它将问题分解为**上游(Upstream)和下游(Downstream)**两部分。
- 上游(
gather
之前的操作)通过ForkJoin
和NodeBuilder
缓冲,实现了最大程度的并行计算。 - 下游(
gather
和collect
操作)通过共享的Sequential
实例和CountedCompleter
的有序完成机制,实现了严格的串行处理。
Hybrid
模式是 GathererOp
能够高效融入并行流体系,同时又不破坏复杂有状态操作的顺序依赖性的关键所在。
Parallel
Parallel
类是 GathererOp
为可并行合并的 Gatherer
操作量身定制的并行执行策略。当一个 Gatherer
提供了有效的 combiner
(合并器)时,就会采用此策略。它借鉴了 AbstractShortCircuitTask
的设计思想,实现了高效的并行处理和结果合并,并支持短路操作。
- 角色:一个基于
ForkJoin
框架的并行任务,用于执行那些状态可以被并行计算然后合并的Gatherer
。 - 核心目标:
- 完全并行化:与
Hybrid
不同,Parallel
策略旨在将Gatherer
的逻辑(包括状态的累积)也完全并行化。每个工作线程都会有一个独立的Gatherer
状态。 - 状态合并:在并行计算完成后,通过用户提供的
gatherer.combiner()
和下游collector.combiner()
将各个线程的独立状态合并成最终结果。 - 支持短路:能够处理非
greedy
的Gatherer
,当某个分支决定短路时,能有效地取消后续不必要的计算。
- 完全并行化:与
我们来逐个部分解析 Parallel
的实现。
// ... existing code ...@SuppressWarnings("serial")final class Parallel extends CountedCompleter<Sequential> {private Spliterator<T> spliterator;private Parallel leftChild; // Only non-null if rightChild isprivate Parallel rightChild; // Only non-null if leftChild isprivate Sequential localResult;private volatile boolean canceled;private long targetSize; // lazily initialized// ... 构造函数 ...
// ... existing code ...
extends CountedCompleter<Sequential>
: 继承自CountedCompleter
,这是ForkJoin
框架中用于管理依赖任务完成计数的关键类。Sequential
是其最终的计算结果类型。spliterator
: 当前任务需要处理的数据分片。leftChild
,rightChild
: 指向分裂出的左右子任务,用于构建任务树。localResult
: 每个Parallel
任务实例都拥有一个独立的Sequential
实例。这个Sequential
实例负责处理当前任务分片的数据,并持有该分片的局部Gatherer
状态和Collector
状态。这是与Hybrid
模式(共享同一个localResult
)的根本区别。canceled
: 一个volatile
的布尔值,用于实现短路。当一个任务需要取消时,它会通知其兄弟任务和父任务。targetSize
: 任务分裂的阈值,延迟初始化。
compute()
- 任务分裂与执行
// ... existing code ...@Overridepublic void compute() {Spliterator<T> rs = spliterator, ls;long sizeEstimate = rs.estimateSize();final long sizeThreshold = getTargetSize(sizeEstimate);Parallel task = this;boolean forkRight = false;boolean proceed;while ((proceed = (greedy || !task.isRequestedToCancel()))&& sizeEstimate > sizeThreshold&& (ls = rs.trySplit()) != null) {final var leftChild = task.leftChild = new Parallel(task, ls);final var rightChild = task.rightChild = new Parallel(task, rs);task.setPendingCount(1);if (forkRight) {rs = ls;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;sizeEstimate = rs.estimateSize();}if (proceed)task.doProcess();task.tryComplete();}
// ... existing code ...
- 任务分裂循环 (
while
):- 逻辑与
Hybrid
类似,通过trySplit
将大任务递归地分解成小任务。 isRequestedToCancel()
: 在每次分裂前检查是否已被其他任务请求取消,以实现短路。fork()
: 将其中一个子任务提交给ForkJoinPool
,让其他线程窃取执行,当前线程继续处理另一半。
- 逻辑与
- 叶子任务处理:
- 当任务不再分裂时,循环退出。
if (proceed)
: 如果没有被取消,则调用task.doProcess()
。
doProcess()
:
private void doProcess() {if (!(localResult = new Sequential()).evaluateUsing(spliterator).proceed&& !greedy)cancelLaterTasks();
}
localResult = new Sequential()
: 关键点! 每个叶子任务都会创建一个全新的Sequential
实例来处理自己的数据分片。evaluateUsing(spliterator)
: 调用Sequential
的方法,完成对当前数据分片的Gatherer
和Collector
的处理。cancelLaterTasks()
: 如果evaluateUsing
返回false
(表示当前分支短路了),则调用此方法去通知其他相关的任务取消执行。
merge(Sequential l, Sequential r)
- 结果合并
// ... existing code ...Sequential merge(Sequential l, Sequential r) {/** Only join the right if the left side didn't short-circuit,* or when greedy*/if (greedy || (l != null && r != null && l.proceed)) {l.state = combiner.apply(l.state, r.state);l.collectorState =collectorCombiner.apply(l.collectorState, r.collectorState);l.proceed = r.proceed;return l;}return (l != null) ? l : r;}
// ... existing code ...
- 这是
Parallel
策略的核心。当左右子任务都完成后,父任务会调用此方法来合并它们的结果。 if (greedy || ...)
: 检查是否需要合并。如果不是greedy
模式,且左边的任务已经短路了 (l.proceed
为false
),则无需合并右边的结果。l.state = combiner.apply(l.state, r.state)
: 调用用户提供的Gatherer
合并器,合并两个子任务的Gatherer
状态。l.collectorState = collectorCombiner.apply(...)
: 调用下游Collector
的合并器,合并两个子任务的Collector
状态。
onCompletion
// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {spliterator = null; // GC assistanceif (leftChild != null) {/* ... */localResult = merge(leftChild.localResult, rightChild.localResult);leftChild = rightChild = null; // GC assistance}}
// ... existing code ...
- 当一个任务的所有子任务都完成时(即
pendingCount
减到0),ForkJoin
框架会调用此方法。 if (leftChild != null)
: 这判断当前任务是否是一个“父任务”(即它曾经分裂过)。叶子任务的leftChild
为null
,不会进入此逻辑。localResult = merge(...)
: 调用merge
方法,将左右子任务的localResult
合并,并将合并后的结果存入当前父任务的localResult
中。这个过程会从叶子节点开始,逐级向上合并,直到根任务。
总结
Parallel
类实现了一种经典的分治(Divide and Conquer)并行计算模式:
- 分解 (Divide):
compute
方法将数据源递归地分裂成小块,构建出一个任务树。 - 解决 (Conquer): 树的每个叶子节点独立地、并行地处理自己的数据块,生成一个局部的结果(一个包含局部状态的
Sequential
对象)。 - 合并 (Combine):
onCompletion
和merge
方法协同工作,将子任务的局部结果两两合并,自底向上地在任务树中传递,直到根节点产生最终的、完全合并的结果。
这种策略的适用前提是 Gatherer
和 Collector
都必须是可结合的(Associative),即 (a op b) op c
等价于 a op (b op c)
,这样才能保证无论分裂和合并的顺序如何,最终结果都是一致的。