沧州网站运营自己做网站需要什么材料
GathererOp
GathererOp 是 Java Stream API 中 gather() 操作的核心运行时实现。它负责将用户定义的 Gatherer 应用于流中的元素,并生成新的流。
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
<T, A, R>: 这是类的泛型参数:T: 上游流中元素的类型,也是Gatherer消费的元素类型。A:Gatherer内部状态累加器的类型。R:Gatherer产生的结果元素的类型,也是下游流中元素的类型。
extends ReferencePipeline<T, R>:GathererOp继承自ReferencePipeline。ReferencePipeline是 JDK Stream API 内部用于处理对象类型元素流的基类之一。- 这意味着
GathererOp本身就是一个流操作阶段,它可以连接到其他流操作之前或之后。 - 它从上游接收类型为
T的元素(实际上是P_OUT extends T,如of方法所示),并向下游产生类型为R的元素。
根据类注释和代码结构,GathererOp 的主要作用是:
- 实现
gather操作: 它是Stream.gather(Gatherer)方法背后的实际执行者。 - 管理
Gatherer的生命周期: 包括初始化、整合元素、合并(并行时)和完成。 - 支持顺序和并行执行:
GathererOp能够根据流的执行模式(顺序或并行)选择合适的求值策略。 - 融合优化:
gather().gather()融合: 如果一个gather操作紧跟着另一个gather操作,GathererOp会尝试将这两个Gatherer合并成一个,以减少中间流和操作开销。这是通过gatherer.andThen(gatherer)实现的。gather().collect()融合: 如果gather操作后紧跟着一个collect终端操作,GathererOp会覆盖collect方法,将Gatherer的逻辑与Collector的逻辑融合执行,避免生成中间集合,从而提高性能,尤其是在并行流中。
- 性能: 注释中提到 "The performance-critical code below contains some more complicated encodings",表明其内部实现对性能有较高要求,并可能采用了一些复杂的编码技巧。
静态工厂方法 of
@SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// 当一个 gather 操作附加到另一个 gather 操作上时,// 我们可以将它们融合成一个if (upstream.getClass() == GathererOp.class) {return new GathererOp<>(((GathererOp<P_IN, Object, P_OUT>) upstream).gatherer.andThen(gatherer),(GathererOp<?, ?, P_IN>) upstream);} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
- 这是创建
GathererOp实例的入口。当在流上调用.gather()时,内部会调用此方法。 - 参数:
upstream: 上游的ReferencePipeline,即前一个流操作。gatherer: 用户提供的Gatherer接口的实现。
- 逻辑:
- 它首先检查上游操作是否也是一个
GathererOp。 - 如果是 (
upstream.getClass() == GathererOp.class): 这意味着连续调用了.gather()。此时,它会使用upstream.gatherer.andThen(gatherer)将两个Gatherer组合成一个新的Gatherer,然后用这个组合后的Gatherer和更上游的流(upstream.upstream(),通过第二个构造函数间接获取)创建一个新的GathererOp。这就是gather().gather()融合。 - 如果不是: 则直接使用传入的
upstream和gatherer创建一个新的GathererOp。
- 它首先检查上游操作是否也是一个
构造函数
GathererOp 有两个私有构造函数:
private GathererOp(ReferencePipeline<?, T> upstream, Gatherer<T, A, R> gatherer):- 用于处理第一个(或非融合的)
.gather()调用。 - 它调用父类
ReferencePipeline的构造函数,并根据gatherer.integrator()的类型(是否为Integrator.Greedy)设置操作标志(opFlags),例如是否为短路操作。 - 保存传入的
gatherer。
- 用于处理第一个(或非融合的)
private GathererOp(Gatherer<T, A, R> gatherer, GathererOp<?, ?, T> upstream):- 用于融合连续的
.gather()调用。此时,gatherer参数是已经通过andThen组合后的Gatherer。 upstream参数是前一个GathererOp。它会追溯到这个upstream GathererOp的上游(即upstream.upstream())作为新GathererOp的直接上游。- 同样设置操作标志并保存
gatherer。
- 用于融合连续的
核心成员变量
final Gatherer<T, A, R> gatherer;: 存储与此GathererOp关联的Gatherer实例。
内部类
a. NodeBuilder<X>
static final class NodeBuilder<X> implements Consumer<X> {// ...
}
- 作用: 在并行流评估中,
NodeBuilder充当元素的惰性累加器。每个并行任务可能会使用一个NodeBuilder来收集其处理的元素。最终,这些NodeBuilder的内容会被构建成Node<X>对象(Stream API 内部用于表示一批数据的结构),然后这些Node可以被合并。 - 主要方法:
accept(X x): 向构建器中添加一个元素。它会维护一个rightMost的SpinedBuffer.Builder来高效追加。join(NodeBuilder<X> that): 将另一个NodeBuilder的内容合并到当前构建器。为了避免创建过深的Node树(不平衡的 Concat-trees),对于小的NodeBuilder,它会直接将元素追加到当前rightMost构建器中,而不是总是创建一个Nodes.conc节点。LINEAR_APPEND_MAX控制这个阈值。build(): 从累积的元素中构建一个Node<X>。
b. GatherSink<T, A, R>
static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {// ...
}
- 作用:
GatherSink是连接Gatherer逻辑和下游流操作(或终端操作)的桥梁。它实现了Sink<T>接口来接收上游元素,并实现了Gatherer.Downstream<R>接口供Gatherer的integrator和finisher推送结果。 - 实现接口:
Sink<T>: 使其能够作为流操作链中的一个环节,接收上游元素。begin(long size): 在处理开始前调用,用于初始化Gatherer的状态 (state = gatherer.initializer().get()) 和下游Sink。accept(T t): 每当上游产生一个元素t时调用。它会调用gatherer.integrator().integrate(state, t, this)来处理元素并更新状态。proceed标志会根据integrate的返回值更新,用于支持短路。cancellationRequested(): 判断是否应停止处理。这是实现短路的关键,它会检查自身的proceed和downstreamProceed标志以及下游sink.cancellationRequested()。end(): 在所有元素处理完毕后调用。它会调用gatherer.finisher().accept(state, this)来执行最终处理,并通知下游sink.end()。
Gatherer.Downstream<R>: 作为Gatherer的输出通道。isRejecting(): 返回!downstreamProceed,表示下游是否已拒绝接收更多元素。push(R r):Gatherer通过此方法将处理结果r推送出去。GatherSink会将r传递给下游的sink.accept(r)。
- 状态变量:
sink: 下游的Sink。gatherer: 当前的Gatherer。integrator: 缓存的gatherer.integrator(),用于优化。state:Gatherer的内部状态。proceed: 表示Gatherer本身是否希望继续处理。downstreamProceed: 表示下游Sink是否希望继续接收元素。
关键方法的覆盖
opIsStateful():boolean opIsStateful() {// TODOreturn true; }目前总是返回
true,表示GathererOp是一个有状态的操作。注释中提到了未来可能的优化方向,即根据Gatherer的具体特性(是否有initializer、combiner、finisher)来判断其是否真正有状态。有状态操作在并行流中通常需要更复杂的处理(例如,不能简单地流水线化)。opWrapSink(int flags, Sink<R> downstream):Sink<T> opWrapSink(int flags, Sink<R> downstream) {return new GatherSink<>(gatherer, downstream); }这是
AbstractPipeline中的一个核心方法,用于将下游的Sink包装成当前操作需要的Sink。在这里,它创建并返回一个GatherSink实例。opEvaluateParallel(PipelineHelper<R> unused1, Spliterator<I> spliterator, IntFunction<R[]> unused2): 此方法定义了GathererOp在并行流中的求值逻辑。它利用了evaluate辅助方法,并使用NodeBuilder来收集并行任务的结果,最后将这些结果合并成一个Node<R>。opEvaluateParallelLazy(PipelineHelper<R> helper, Spliterator<P_IN> spliterator): 用于并行流的惰性求值。注释提到,只有非常特定类型的Gatherer(无初始化器、有组合器、无完成器)才能直接、高效地表示为Spliterator。目前的实现是先通过opEvaluateParallel完全求值得到一个Node,然后再从这个Node创建Spliterator,这意味着它不是真正的惰性并行。collect(...)方法 (两个重载):@Override public <CR, CA> CR collect(Collector<? super R, CA, CR> c) {// ...return evaluate(...); }@Override public <RR> RR collect(Supplier<RR> supplier,BiConsumer<RR, ? super R> accumulator,BiConsumer<RR, RR> combiner) {// ...return evaluate(...); }这两个方法覆盖了
Stream接口的collect终端操作。它们实现了gather().collect()的融合。通过调用内部的evaluate方法,将Gatherer的逻辑和Collector的逻辑一起执行,避免了先完成gather生成中间结果再进行collect的开销。
私有辅助方法 evaluate
evaluate 方法是 GathererOp 的“大脑”和“执行引擎”。它位于整个操作的核心,负责统筹和调度,根据不同的运行环境(串行/并行)和 Gatherer 的特性,选择最合适的执行策略来完成 gather 和下游 collect 操作。
它的核心职责可以概括为:接收一个数据源 (Spliterator) 和一个终端收集器 (Collector) 的组件,然后智能地选择并执行一条最优路径,最终返回收集器的结果。
这个方法是 GathererOp 中两个主要的终端操作 collect 的最终实现。你会发现 collect 方法的主要工作就是解析 Collector,然后将所有组件(spliterator, gatherer, collector 的各个部分)打包传递给 evaluate。
// ... existing code ...private <CA, CR> CR evaluate(final Spliterator<T> spliterator,final boolean parallel,final Gatherer<T, A, R> gatherer,final Supplier<CA> collectorSupplier,final BiConsumer<CA, ? super R> collectorAccumulator,final BinaryOperator<CA> collectorCombiner,final Function<CA, CR> collectorFinisher) {
// ... existing code ...
spliterator: 数据源。parallel: 一个布尔标志,指示当前流是否处于并行模式。这是决定执行路径的第一个关键分叉点。gatherer: 用户提供的Gatherer实例。collectorSupplier,collectorAccumulator,collectorCombiner,collectorFinisher: 这四个参数是下游Collector的四个核心函数,被解构后传入。evaluate方法将它们与Gatherer的逻辑进行融合。
evaluate 的执行逻辑与决策树
evaluate 内部的执行逻辑可以看作一个清晰的决策树:
evaluate(...)
|
+-- 1. 是并行流吗 (is parallel?)|+-- 否 (No) -> **串行路径**| || +-- 创建一个 `Sequential` 实例| || +-- 调用 `sequential.evaluateUsing(spliterator)`| || +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **并行路径**|+-- 2. Gatherer 是否可合并 (has combiner?)|+-- 否 (No) -> **Hybrid 策略**| || +-- 创建一个 `Hybrid` 任务| || +-- `invoke()` 执行任务| || +-- `get()` 获取共享的 `Sequential` 实例| || +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **Parallel 策略**|+-- 创建一个 `Parallel` 任务|+-- `invoke()` 执行任务|+-- `get()` 获取根任务合并后的 `Sequential` 实例|+-- 调用 `sequential.get()` 返回最终结果
现在我们把这个决策树和代码对应起来看:
1. 串行路径 (!parallel)
// ... existing code ...if (!parallel)return new Sequential().evaluateUsing(spliterator).get();
// ... existing code ...
这是最简单直接的路径。如果流不是并行的,evaluate 会:
new Sequential(): 创建一个Sequential实例。在这个实例的构造函数中,Gatherer的状态 (state) 和Collector的容器 (collectorState) 都被初始化了。.evaluateUsing(spliterator): 驱动Sequential实例开始处理spliterator中的所有元素。如我们之前分析的,这个过程将Gatherer的输出直接推送给Collector的累加器。.get(): 在所有元素处理完毕后,调用Sequential的get方法,该方法会依次执行Gatherer的finisher和Collector的finisher,产出最终结果。
2. 并行路径 (parallel)
当 parallel 为 true 时,evaluate 进入并行处理逻辑。它首先要做的就是判断应该使用 Hybrid 还是 Parallel 策略。
// ... existing code ...// Parallel section starts here:final var combiner = gatherer.combiner();// ... (Hybrid 和 Parallel 类的定义) ...if (combiner == Gatherer.defaultCombiner()) {// NO COMBINER -> HYBRIDreturn new Hybrid(spliterator).invoke().get();} else {// HAS COMBINER -> PARALLELreturn new Parallel(spliterator).invoke().get();}
// ... existing code ...
(注:为了清晰,将 Hybrid 和 Parallel 的调用逻辑移到了一起,实际代码中它们的定义在调用之前)
gatherer.combiner(): 这是决策的关键。evaluate获取Gatherer的合并器。if (combiner == Gatherer.defaultCombiner()): 判断Gatherer是否提供了有效的合并器。true(不可合并): 选择Hybrid策略。创建一个Hybrid根任务,并通过invoke()启动ForkJoin计算。计算完成后,invoke()返回根任务,我们再调用.get()获取最终的Sequential实例(在Hybrid模式下,所有任务共享这一个实例),最后调用Sequential实例的.get()方法获取最终结果。false(可合并): 选择Parallel策略。创建一个Parallel根任务,并通过invoke()启动。在计算过程中,每个叶子任务有自己的Sequential实例,结果会逐级向上合并。invoke()完成后,返回的根任务中包含了完全合并后的Sequential实例。最后同样调用这个实例的.get()方法获取最终结果。
总结
evaluate 方法是 GathererOp 实现高性能和高灵活性的基石。它通过对 parallel 标志和 gatherer.combiner() 的判断,精确地将执行流导向三个专门设计的执行器之一:
Sequential: 用于串行流,实现了Gatherer和Collector的零开销融合。Hybrid: 用于并行的、但Gatherer不可合并的场景。它通过“上游并行,下游串行”的混合模式,在保证正确性的前提下最大化并行度。Parallel: 用于并行的、且Gatherer可合并的场景。它实现了彻底的分治并行计算,将性能发挥到极致。
这个方法完美地体现了Java Stream API在设计上的深思熟虑,即如何将一个复杂的操作分解,并为不同的场景提供高度优化的执行路径。
GatherSink
GatherSink 是 Gatherer 操作在串行流(Sequential Stream)或作为并行处理中叶子节点的执行体。它扮演着一个“适配器”和“状态管理器”的关键角色,连接上游的元素流和下游的 Sink。
- 角色:它是一个
Sink(接收器)。在 Stream 的流水线模型中,每个操作都会包装下游的Sink,形成一个链条。GatherSink正是gather()操作的Sink实现。 - 双重身份:它实现了两个关键接口:
Sink<T>: 使其能被上游操作调用,接收上游流过来的元素T。Gatherer.Downstream<R>: 使其能被Gatherer的integrator和finisher调用,将处理后的结果R推送给下游。
- 核心目标:
- 管理
Gatherer的生命周期:调用initializer创建初始状态,在接收每个元素时调用integrator,在流结束时调用finisher。 - 管理
Gatherer的状态(state)。 - 处理短路(Short-Circuiting):当
Gatherer或下游Sink不再需要更多元素时,能够有效地向上游传递“取消”信号。
- 管理
我们来逐个部分解析 GatherSink 的实现。
// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink;private final Gatherer<T, A, R> gatherer;private final Integrator<A, T, R> integrator; // Optimization: reuseprivate A state;private boolean proceed = true;private boolean downstreamProceed = true;GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {this.gatherer = gatherer;this.sink = sink;this.integrator = gatherer.integrator(); // 提前获取,避免重复调用}
// ... existing code ...
sink: 指向下游的Sink。GatherSink处理完元素后,通过这个sink将结果传递下去。gatherer: 当前操作所使用的Gatherer实例。integrator: 这是从gatherer中提前获取的integrator。这是一个性能优化,因为integrator是accept方法中的热点代码,提前缓存可以避免在每次调用时都通过gatherer.integrator()访问。state:Gatherer的状态对象,由initializer创建。proceed: 一个布尔标志,表示当前Gatherer是否还想继续处理上游的元素。如果integrator返回false,这个标志就会被设为false。downstreamProceed: 一个布尔标志,表示下游Sink是否还想接收元素。它通过调用sink.cancellationRequested()来更新。
这两个 proceed 标志是实现短路机制的核心。
Sink<T> 接口实现 - 与上游交互
这是 GatherSink 作为标准 Sink 的行为。
// ... existing code ...// java.util.stream.Sink contract below:@Overridepublic void begin(long size) {final var initializer = gatherer.initializer();if (initializer != Gatherer.defaultInitializer()) // 优化:如果不是默认的无操作initializerstate = initializer.get();sink.begin(-1); // GathererOp 通常不知道输出大小,所以传递-1}@Overridepublic void accept(T t) {// ... 性能优化注释 ...proceed &= integrator.integrate(state, t, this);}@Overridepublic boolean cancellationRequested() {return cancellationRequested(proceed && downstreamProceed);}private boolean cancellationRequested(boolean knownProceed) {// 高性能敏感区域return !(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)));}@Overridepublic void end() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher()) // 优化:如果不是默认的无操作finisherfinisher.accept(state, this);sink.end();state = null; // 帮助GC}
// ... existing code ...
begin(long size): 在流处理开始时调用。它会调用gatherer的initializer来创建初始状态state,然后通知下游sink处理开始。accept(T t): 这是最核心的方法,每当上游传来一个元素t时被调用。- 它调用
integrator.integrate(state, t, this)。integrator会处理这个元素,可能会更新state,也可能会通过this(即Gatherer.Downstream)向下游推送零个或多个结果。 integrator返回一个布尔值,表示它自己是否还想接收更多元素。proceed &= ...这个写法非常巧妙。它确保一旦proceed变为false,它就再也不会变回true。这是一个比if (!integrator.integrate(...)) proceed = false;更高效的无分支写法。
- 它调用
cancellationRequested(): 上游操作会调用这个方法来查询是否应该停止发送元素。- 它同时检查
proceed(自己是否想继续) 和downstreamProceed(下游是否想继续)。 cancellationRequested(boolean knownProceed)内部的逻辑!(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)))稍微复杂,我们分解一下:- 如果
knownProceed是false(意味着proceed或downstreamProceed已经是false),则直接返回true(请求取消)。 - 如果
knownProceed是true,则检查!sink.cancellationRequested()。 - 如果下游没有请求取消,
!sink.cancellationRequested()为true,整个&&表达式为true,取反后为false(不请求取消)。 - 如果下游请求了取消,
!sink.cancellationRequested()为false。由于||的短路特性,会执行downstreamProceed = false,将下游的取消状态缓存起来。然后||表达式结果为false,整个&&表达式为false,取反后为true(请求取消)。
- 如果
- 它同时检查
end(): 在流处理结束时调用。它会调用gatherer的finisher来处理最终的状态,可能会向下游推送最后的元素。然后通知下游sink处理结束。
Gatherer.Downstream<R> 接口实现 - 与 Gatherer 内部交互
这是 GatherSink 作为 integrator 和 finisher 的回调通道的行为。
// ... existing code ...// Gatherer.Sink contract below:@Overridepublic boolean isRejecting() {return !downstreamProceed;}@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 将结果推给下游return !cancellationRequested(p);}}
// ... existing code ...
push(R r):integrator或finisher通过调用这个方法来产出结果。- 它首先检查
downstreamProceed,如果下游还在接收,就调用sink.accept(r)将结果r推送下去。 - 然后它返回一个布尔值,告诉调用者(
integrator/finisher)处理完这次push后,整个流水线是否还希望继续。这个返回值基于cancellationRequested的结果,这样integrator就可以在push之后立即知道是否应该短路。
- 它首先检查
isRejecting(): 允许integrator查询下游是否已经拒绝接收更多元素。这对于某些复杂的Gatherer很有用,它们可能在push之前就想知道下游的状态。
总结
GatherSink 是连接 Stream 流水线和 Gatherer 逻辑的核心桥梁。它通过实现 Sink 和 Gatherer.Downstream 两个接口,完美地扮演了双重角色:
- 对上游,它是一个标准的
Sink,遵循begin -> accept* -> end的生命周期,并能通过cancellationRequested向上游传递短路信号。 - 对
Gatherer内部,它是一个Downstream回调,为integrator和finisher提供了推送结果 (push) 和查询下游状态 (isRejecting) 的能力。
它的设计充满了性能优化的考量,例如缓存 integrator、使用无分支的 &= 操作、以及精巧的 cancellationRequested 逻辑,确保了 gather 操作在串行模式下的高效执行。
Sequential
Sequential 类是 GathererOp 中 evaluate 方法的核心组件之一,它专门用于 串行(Sequential) 执行模式。它的设计目标是将一个 Gatherer 和一个下游的 Collector 融合在一起,形成一个单一的、高效的串行处理单元。
- 角色:它是一个融合了
Gatherer逻辑和Collector逻辑的处理器。它同时扮演了多个角色,通过实现Consumer<T>和Gatherer.Downstream<R>接口来完成。 - 核心目标:
- 避免中间结果物化:常规的
stream.gather(...).collect(...)会先由gather操作完全处理完所有元素,生成一个中间的Stream<R>,然后再由collect操作来消费这个中间流。Sequential类通过融合,使得Gatherer产生一个结果R后,能立即被Collector的accumulator消费,从而避免了创建和存储整个中间结果集,极大地提升了效率和降低了内存消耗。 - 统一处理逻辑:它为串行流和
Hybrid并行模式的串行处理阶段提供了一个统一的、可复用的执行体。
- 避免中间结果物化:常规的
我们来逐个部分解析 Sequential 的实现。
// ... existing code ...// Sequential is the fusion of a Gatherer and a Collector which can// be evaluated sequentially.final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {A state;CA collectorState;boolean proceed;Sequential() {if (initializer != Gatherer.defaultInitializer())state = initializer.get();collectorState = collectorSupplier.get();proceed = true;}
// ... existing code ...
implements Consumer<T>, Gatherer.Downstream<R>: 这个类的双重身份。Consumer<T>: 使其accept(T)方法可以被Spliterator.forEachRemaining或tryAdvance调用,从而接收上游流过来的原始元素T。Gatherer.Downstream<R>: 使其push(R)方法可以被Gatherer的integrator调用,从而接收Gatherer处理后产生的结果R。
state:Gatherer的状态对象,由gatherer.initializer()创建。collectorState:Collector的状态对象(即累加容器),由collectorSupplier.get()创建。proceed: 一个布尔标志,用于处理非greedy模式下的短路。当integrator返回false时,它会被设为false,从而终止evaluateUsing中的循环。
evaluateUsing(Spliterator<T> spliterator) - 驱动方法
// ... existing code ...@ForceInlineSequential evaluateUsing(Spliterator<T> spliterator) {if (greedy)spliterator.forEachRemaining(this);elsedo {} while (proceed && spliterator.tryAdvance(this));return this;}
// ... existing code ...
- 这是驱动整个串行处理流程的入口。
if (greedy): 如果Gatherer是greedy的(即不会短路),就一次性调用spliterator.forEachRemaining(this)。这会遍历spliterator中的所有元素,并对每个元素调用this.accept(T)方法。这是最高效的处理方式。else: 如果Gatherer可能会短路,则使用do-while循环和spliterator.tryAdvance(this)。tryAdvance一次只处理一个元素,并且循环条件会检查proceed标志。一旦proceed变为false,循环就会立即终止,实现短路。
accept(T t) - 实现 Consumer<T>
// ... existing code ...@Overridepublic void accept(T t) {/** ...*/var ignore = integrator.integrate(state, t, this)|| (!greedy && (proceed = false));}
// ... existing code ...
- 当
evaluateUsing拉取上游元素时,此方法被调用。 integrator.integrate(state, t, this): 这是核心调用。它将上游元素t、Gatherer的当前状态state交给integrator处理。this作为Downstream被传入,以便integrator可以通过push方法输出结果。|| (!greedy && (proceed = false)): 这是一个精巧的短路实现。integrator.integrate返回一个布尔值,true表示希望继续,false表示不希望继续。- 如果
integrator返回true,由于||的短路特性,后面的部分不会执行,proceed保持true。 - 如果
integrator返回false,则会执行||后面的部分。!greedy条件确保这只在非贪婪模式下发生,然后proceed = false被执行,进行短路。
push(R r) - 实现 Gatherer.Downstream<R>
// ... existing code ...@Overridepublic boolean push(R r) {collectorAccumulator.accept(collectorState, r);return true;}
// ... existing code ...
- 当
integrator内部决定要产出一个结果r时,它会调用downstream.push(r),也就是这个方法。 collectorAccumulator.accept(collectorState, r): 这就是融合的关键所在。Gatherer产生的结果r,没有被放入任何中间集合,而是被直接传递给了Collector的累加器。return true;: 在这个融合的场景下,Collector本身是不能短路的(这是Collector的规范),所以它总是告诉Gatherer的integrator“可以继续推数据给我”。
get() - 获取最终结果
// ... existing code ...@SuppressWarnings("unchecked")public CR get() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher())finisher.accept(state, this);// IF collectorFinisher == null -> IDENTITY_FINISHreturn (collectorFinisher == null)? (CR) collectorState: collectorFinisher.apply(collectorState);}
// ... existing code ...
- 在所有元素都被
evaluateUsing处理完毕后,调用此方法来获取最终结果。 gatherer.finisher().accept(state, this): 首先,调用Gatherer的finisher,让它可以处理最终的状态,并可能通过push输出最后的元素。collectorFinisher.apply(collectorState): 然后,调用Collector的finisher,对累加容器collectorState进行最终的转换,得到最终结果CR。如果Collector有IDENTITY_FINISH特性,则直接返回累加容器本身。
总结
Sequential 类是一个高度优化的、专门用于串行执行的融合处理器。它通过巧妙地实现 Consumer 和 Downstream 接口,将 Gatherer 的输出无缝对接到 Collector 的输入,避免了不必要的中间数据结构,并将 Gatherer 和 Collector 的整个生命周期(初始化、累加、终结)紧密地结合在一起,形成了一个高效的单一处理循环。这对于提升串行流的性能至关重要。
Hybrid
Hybrid 是 Gatherer 操作在并行流(Parallel Stream)中,针对没有提供 combiner 的有状态 Gatherer 的一种特殊执行策略。它的名字“Hybrid”(混合)恰如其分地描述了它的工作模式:上游并行处理,下游串行。
Hybrid 的设计借鉴了 ForEachOrderedTask,它将任务分解成一个链表式的结构,确保了即使任务被 ForkJoinPool 中的不同线程并行执行,最终结果的处理顺序也和原始 Spliterator 的顺序一致。
其工作流程可以概括为:
- 任务分裂(
compute):将原始的Spliterator递归地分裂成更小的块,形成一个任务树。关键在于,它会维护一个leftPredecessor(左前驱)的引用,构建出一个逻辑上的任务链表。 - 上游并行处理(
compute-greedy分支):对于叶子任务,如果Gatherer是greedy的(即它会消耗所有上游元素),Hybrid会先执行完所有上游操作(如map,filter),并将结果缓冲到一个NodeBuilder中。这一步是完全并行的。 - 串行化处理(
onCompletion):当一个任务完成时(无论是完成了上游处理,还是它本身就是叶子节点),onCompletion方法会被调用。这个方法会按照任务链表的顺序,依次执行localResult.evaluateUsing(s),即调用Sequential实例来串行地处理自己持有的那一小块数据。 - 短路支持(
cancelled):通过一个共享的AtomicBoolean cancelled标志,实现非greedy模式下的短路。一旦下游的Sequential处理器表示不再需要数据 (proceed变为false),就会设置cancelled标志,后续的任务在执行前会检查此标志,从而避免不必要的计算。
为什么需要Hybrid
场景:一个无法并行合并的 Gatherer
让我们以 Gatherers.windowSliding(3) 为例。这是一个典型的有状态且无法并行合并的 Gatherer。
- 有状态:它需要维护一个内部队列(大小最多为3),记录最近看到的元素,以便生成滑动窗口。
- 无法并行合并:假设我们有一个流
[1, 2, 3, 4, 5, 6]。- 线程A处理
[1, 2, 3],它的最终状态是队列[1, 2, 3],并输出了窗口[1, 2, 3]。 - 线程B处理
[4, 5, 6],它的最终状态是队列[4, 5, 6],并输出了窗口[4, 5, 6]。 - 现在,我们如何合并这两个结果?我们丢失了
[2, 3, 4]和[3, 4, 5]这两个跨越了数据块边界的窗口。没有一个通用的combiner能猜到需要这样去合并。因此,它没有提供combiner。
- 线程A处理
对于这样一个 Gatherer,如果没有 Hybrid 策略,在并行流中,我们只有两个选择,但都不理想:
完全串行执行:
- 做法:放弃并行,整个流从头到尾都在一个线程里执行。
- 优点:能得到正确的结果。
- 缺点:性能极差。如果流是这样的:
source.filter(...).map(...).gather(windowSliding(3)),那么filter和map这两个本可以高效并行的无状态操作,也被迫在单线程里执行,完全浪费了多核CPU的优势。
常规的并行执行(会出错):
- 做法:像处理
map或filter一样,把数据分块,让每个线程独立处理自己的那部分gather逻辑。 - 优点:速度快。
- 缺点:结果是错误的!就像上面例子展示的,我们会丢失所有跨越数据块边界的窗口。
- 做法:像处理
这就是 Hybrid 策略要解决的核心矛盾:我们既想要利用多核CPU并行处理 filter 和 map 等上游操作来提升性能,又必须保证 windowSliding 这个 Gatherer 本身是按顺序处理元素的以确保结果正确。
Hybrid(混合)策略的名字完美地诠释了它的解决方案:把一个流处理过程拆分成两部分,分别对待。
上游并行(Upstream Parallelism):
Hybrid任务在ForkJoinPool中被分裂成许多小任务,分布到不同CPU核心上。- 每个任务负责一小块数据。它会首先执行完所有上游的无状态操作(如
filter,map)。比如,线程A处理source的前1000个元素,它会先对这1000个元素进行filter和map。线程B同时对后1000个元素做同样的事。 - 这一步是完全并行的,极大地利用了CPU资源。
- 处理完的结果被临时缓冲起来(存入
NodeBuilder)。
下游串行(Downstream Serial):
- 当上游的并行计算完成后,
Hybrid任务会进入onCompletion阶段。 Hybrid内部通过一个精巧的链表结构 (next指针) 保证了,即使各个任务块的并行计算完成时间是乱序的,它们向下游提交数据(即执行gather逻辑)的顺序也严格遵循原始数据流的顺序。- 所有任务共享同一个
Sequential实例,这个实例包含了Gatherer的状态。 - 任务A处理完后,把它的缓冲结果交给
Sequential实例处理;然后,任务B才能把它缓冲的结果交过来。这就确保了windowSliding看到的元素流是[..., element_N, element_N+1, ...]这样连续不断的,从而可以正确地生成所有窗口。
- 当上游的并行计算完成后,
所以,Hybrid 策略的必要性在于:
- 对于那些有状态且不可合并的
Gatherer,我们不能使用常规的并行模式,否则结果会出错。 - 但我们又不想因为这一个
Gatherer操作而放弃整个流的并行化潜力,让map、filter等无辜的操作也跟着串行执行,这会造成巨大的性能损失。
Hybrid 提供了一个两全其美的方案:它像一个聪明的调度器,将可以并行的部分(上游无状态操作)充分并行化,然后将必须串行的部分(有状态的 Gatherer 逻辑)严格串行化,并在这两者之间建立了一座桥梁,从而在保证结果正确性的前提下,最大化地压榨了多核CPU的性能。
类的定义与关键成员变量
// ... existing code ...@SuppressWarnings("serial")final class Hybrid extends CountedCompleter<Sequential> {private final long targetSize;private final Hybrid leftPredecessor;private final AtomicBoolean cancelled;private final Sequential localResult;private Spliterator<T> spliterator;private Hybrid next;private static final VarHandle NEXT = MhUtil.findVarHandle(MethodHandles.lookup(), "next", Hybrid.class);
// ... existing code ...
extends CountedCompleter<Sequential>: 继承自CountedCompleter,这是ForkJoin框架中用于处理完成依赖关系的核心类。它的结果类型是Sequential,即下游的串行处理器。targetSize:ForkJoin任务分裂的建议目标大小。leftPredecessor: 指向其在逻辑顺序上的前一个任务。这是保证顺序执行的关键。cancelled: 一个共享的AtomicBoolean,用于在非greedy模式下实现短路。所有分裂出的任务共享同一个cancelled实例。localResult: 下游的串行处理器。所有任务也共享同一个Sequential实例,因为处理逻辑必须是串行的。spliterator: 当前任务负责处理的数据片段。next: 指向其在逻辑顺序上的后一个任务。通过VarHandle进行原子更新,用于在任务完成时触发下一个任务的执行。
compute() - 任务分裂与上游并行化
Hybrid.compute() 的执行流程可以分为两个主要阶段:
- 任务分裂阶段(Task Splitting):通过一个
while循环,递归地将大的数据块(Spliterator)分解成更小的、适合并行处理的子任务。这个阶段的核心是构建一个逻辑上的任务链表,为后续的有序处理做准备。 - 叶子任务处理阶段(Leaf Task Processing):当任务块小到不再适合继续分裂时,循环停止。此时,当前任务成为“叶子任务”,它需要真正地处理自己所持有的那一小块数据。“存入临时
nb”就发生在这个阶段。
// ... existing code ...@Overridepublic void compute() {var task = this;Spliterator<T> rightSplit = task.spliterator, leftSplit;long sizeThreshold = task.targetSize;boolean forkRight = false;
// ... existing code ...
- 初始化:
task指向当前执行的Hybrid实例。rightSplit是当前任务需要处理的数据源。sizeThreshold是决定是否继续分裂的阈值。
1. 任务分裂阶段 (while 循环)
// ... existing code ...while ((greedy || !cancelled.get())&& rightSplit.estimateSize() > sizeThreshold&& (leftSplit = rightSplit.trySplit()) != null) {var leftChild = new Hybrid(task, leftSplit, task.leftPredecessor);var rightChild = new Hybrid(task, rightSplit, leftChild);/* leftChild and rightChild were just created and not* fork():ed yet so no need for a volatile write*/leftChild.next = rightChild;// ... (pending count 和 fork 逻辑 和 ForEachOrdered一样) ...// 这部分是 ForkJoin 框架的控制逻辑,用于管理任务依赖和执行// 核心思想是交替地 fork 左/右子任务,让当前线程继续处理另一半if (forkRight) {rightSplit = leftSplit;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;}
// ... existing code ...
- 循环条件:
(greedy || !cancelled.get()): 如果不是greedy模式(即可能会短路),则每次分裂前都检查是否已被取消。rightSplit.estimateSize() > sizeThreshold: 数据块足够大,值得分裂。(leftSplit = rightSplit.trySplit()) != null: 数据源支持分裂。
- 创建子任务:创建
leftChild和rightChild两个新的Hybrid任务。 - 构建链表:
leftChild.next = rightChild;这一行至关重要。它在任务树的兄弟节点之间建立了一个逻辑上的先后关系。这保证了即使leftChild和rightChild被不同线程并行执行,我们也能在将来按正确的顺序处理它们的结果。 fork():rightChild.fork()或leftChild.fork()将一个子任务提交给ForkJoinPool,让其他空闲线程可以“窃取”并执行它。当前线程则继续在循环中处理剩下的那一半数据。
这个分裂过程会一直持续,直到数据块小得不能再分,此时 while 循环退出,进入叶子任务处理阶段。
2. 叶子任务处理阶段 (关键部分)
// ... existing code .../** ...* IMPORTANT: Currently we only perform the processing of this* upstream data if we know the operation is greedy -- as we cannot* safely speculate on the cost/benefit ratio of parallelizing* the pre-processing of upstream data under short-circuiting.*/if (greedy && task.getPendingCount() > 0) {// Upstream elements are bufferedNodeBuilder<T> nb = new NodeBuilder<>();rightSplit.forEachRemaining(nb); // Run the upstreamtask.spliterator = nb.build().spliterator();}task.tryComplete();}
// ... existing code ...
if (greedy && ...): 这个优化只在greedy模式下进行。greedy意味着Gatherer必须处理完所有上游元素才能完成,不会短路。这使得我们可以安全地预处理所有数据。NodeBuilder<T> nb = new NodeBuilder<>();: 创建一个临时的缓冲区。rightSplit.forEachRemaining(nb);: 这就是实现上游并行的魔法所在!rightSplit是什么?它不是原始的数据源,而是经过了上游所有操作(如filter,map)层层包装后的Spliterator。forEachRemaining(nb)的作用是:拉动rightSplit,让它开始吐出元素。每吐出一个元素,上游的map,filter等操作就会被执行。nb是一个Sink(通过accept方法),它接收这些经过上游操作处理后的结果,并把它们缓冲在内部。- 为什么能并行? 因为此时,多个不同的叶子任务(
Hybrid实例)正在不同的CPU核心上同时执行它们的compute方法。每个叶子任务都在调用forEachRemaining,从而驱动它所负责的那一小块数据流过上游的map/filter流水线。这就实现了对上游无状态操作的并行处理。
task.spliterator = nb.build().spliterator();: 处理完成后,nb中包含了当前任务块所有处理好的元素。我们用nb.build()将其固化成一个Node,然后替换掉任务原来的spliterator。现在,task.spliterator指向的是一块已经过上游并行处理、结果被缓存的内存数据。task.tryComplete();: 通知ForkJoin框架,当前任务的compute阶段已经完成。这会触发onCompletion的执行。
总结与流程梳理
我们把整个流程串起来看:
- 一个大的
Hybrid任务开始执行compute。 while循环将任务分裂成许多小的叶子任务,这些任务被fork()到ForkJoinPool中,由不同的线程并行执行。同时,通过next指针构建了一个逻辑顺序链。- 并行阶段:每个叶子任务在其
compute方法中,调用forEachRemaining,并行地执行gather之前的所有map,filter等操作,并将结果缓冲到各自的NodeBuilder中。 - 串行阶段的准备:当一个叶子任务的
compute完成后,它调用tryComplete(),这会触发onCompletion。 - 串行阶段:
onCompletion方法是按next链表顺序被调用的。它会把自己缓冲好的数据(task.spliterator)交给下游唯一的、共享的Sequential处理器进行串行处理。
所以,“存入临时 nb”这个步骤,本质上是将上游的并行计算和下游的串行处理解耦开。它允许我们在上游尽情地利用多核并行计算,然后把计算结果(乱序到达的)先暂存起来,最后再由 onCompletion 机制像接力赛一样,一棒一棒地、按正确的顺序,把这些暂存的结果交给下游的串行处理器。
onCompletion() - 串行化处理与任务串联
// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {var s = spliterator;spliterator = null; // GC// 1. 执行串行处理if (s != null&& (greedy || !cancelled.get())&& !localResult.evaluateUsing(s).proceed // 调用 Sequential 处理器&& !greedy)cancelled.set(true); // 如果需要,设置短路标志// 2. 触发下一个任务@SuppressWarnings("unchecked")var leftDescendant = (Hybrid) NEXT.getAndSet(this, null);if (leftDescendant != null) {leftDescendant.tryComplete();}}
// ... existing code ...
- 执行串行处理:这是
Hybrid模式的核心。当一个任务完成其compute阶段后,onCompletion被调用。它会调用共享的localResult(一个Sequential实例) 的evaluateUsing方法,把自己持有的spliterator(对于greedy模式,这是缓冲后的Node的spliterator) 交给它处理。由于所有任务共享同一个localResult,并且通过next指针的串联保证了onCompletion的调用顺序,这里的处理实际上是串行的。 - 触发下一个任务:
NEXT.getAndSet(this, null)是一个原子操作,它获取并清空next引用。如果next任务(即leftDescendant)存在,就调用它的tryComplete()。这就像一个接力赛,当前任务完成处理后,拍一下下一个任务的肩膀,让它开始处理。
总结
Hybrid 类是一个非常精巧的设计,它在面对“有状态且不可并行合并”的 Gatherer 时,找到了一条兼顾并行性能和顺序要求的中间道路:
- 它将问题分解为**上游(Upstream)和下游(Downstream)**两部分。
- 上游(
gather之前的操作)通过ForkJoin和NodeBuilder缓冲,实现了最大程度的并行计算。 - 下游(
gather和collect操作)通过共享的Sequential实例和CountedCompleter的有序完成机制,实现了严格的串行处理。
Hybrid 模式是 GathererOp 能够高效融入并行流体系,同时又不破坏复杂有状态操作的顺序依赖性的关键所在。
Parallel
Parallel 类是 GathererOp 为可并行合并的 Gatherer 操作量身定制的并行执行策略。当一个 Gatherer 提供了有效的 combiner(合并器)时,就会采用此策略。它借鉴了 AbstractShortCircuitTask 的设计思想,实现了高效的并行处理和结果合并,并支持短路操作。
- 角色:一个基于
ForkJoin框架的并行任务,用于执行那些状态可以被并行计算然后合并的Gatherer。 - 核心目标:
- 完全并行化:与
Hybrid不同,Parallel策略旨在将Gatherer的逻辑(包括状态的累积)也完全并行化。每个工作线程都会有一个独立的Gatherer状态。 - 状态合并:在并行计算完成后,通过用户提供的
gatherer.combiner()和下游collector.combiner()将各个线程的独立状态合并成最终结果。 - 支持短路:能够处理非
greedy的Gatherer,当某个分支决定短路时,能有效地取消后续不必要的计算。
- 完全并行化:与
我们来逐个部分解析 Parallel 的实现。
// ... existing code ...@SuppressWarnings("serial")final class Parallel extends CountedCompleter<Sequential> {private Spliterator<T> spliterator;private Parallel leftChild; // Only non-null if rightChild isprivate Parallel rightChild; // Only non-null if leftChild isprivate Sequential localResult;private volatile boolean canceled;private long targetSize; // lazily initialized// ... 构造函数 ...
// ... existing code ...
extends CountedCompleter<Sequential>: 继承自CountedCompleter,这是ForkJoin框架中用于管理依赖任务完成计数的关键类。Sequential是其最终的计算结果类型。spliterator: 当前任务需要处理的数据分片。leftChild,rightChild: 指向分裂出的左右子任务,用于构建任务树。localResult: 每个Parallel任务实例都拥有一个独立的Sequential实例。这个Sequential实例负责处理当前任务分片的数据,并持有该分片的局部Gatherer状态和Collector状态。这是与Hybrid模式(共享同一个localResult)的根本区别。canceled: 一个volatile的布尔值,用于实现短路。当一个任务需要取消时,它会通知其兄弟任务和父任务。targetSize: 任务分裂的阈值,延迟初始化。
compute() - 任务分裂与执行
// ... existing code ...@Overridepublic void compute() {Spliterator<T> rs = spliterator, ls;long sizeEstimate = rs.estimateSize();final long sizeThreshold = getTargetSize(sizeEstimate);Parallel task = this;boolean forkRight = false;boolean proceed;while ((proceed = (greedy || !task.isRequestedToCancel()))&& sizeEstimate > sizeThreshold&& (ls = rs.trySplit()) != null) {final var leftChild = task.leftChild = new Parallel(task, ls);final var rightChild = task.rightChild = new Parallel(task, rs);task.setPendingCount(1);if (forkRight) {rs = ls;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;sizeEstimate = rs.estimateSize();}if (proceed)task.doProcess();task.tryComplete();}
// ... existing code ...
- 任务分裂循环 (
while):- 逻辑与
Hybrid类似,通过trySplit将大任务递归地分解成小任务。 isRequestedToCancel(): 在每次分裂前检查是否已被其他任务请求取消,以实现短路。fork(): 将其中一个子任务提交给ForkJoinPool,让其他线程窃取执行,当前线程继续处理另一半。
- 逻辑与
- 叶子任务处理:
- 当任务不再分裂时,循环退出。
if (proceed): 如果没有被取消,则调用task.doProcess()。
doProcess():
private void doProcess() {if (!(localResult = new Sequential()).evaluateUsing(spliterator).proceed&& !greedy)cancelLaterTasks();
}
localResult = new Sequential(): 关键点! 每个叶子任务都会创建一个全新的Sequential实例来处理自己的数据分片。evaluateUsing(spliterator): 调用Sequential的方法,完成对当前数据分片的Gatherer和Collector的处理。cancelLaterTasks(): 如果evaluateUsing返回false(表示当前分支短路了),则调用此方法去通知其他相关的任务取消执行。
merge(Sequential l, Sequential r) - 结果合并
// ... existing code ...Sequential merge(Sequential l, Sequential r) {/** Only join the right if the left side didn't short-circuit,* or when greedy*/if (greedy || (l != null && r != null && l.proceed)) {l.state = combiner.apply(l.state, r.state);l.collectorState =collectorCombiner.apply(l.collectorState, r.collectorState);l.proceed = r.proceed;return l;}return (l != null) ? l : r;}
// ... existing code ...
- 这是
Parallel策略的核心。当左右子任务都完成后,父任务会调用此方法来合并它们的结果。 if (greedy || ...): 检查是否需要合并。如果不是greedy模式,且左边的任务已经短路了 (l.proceed为false),则无需合并右边的结果。l.state = combiner.apply(l.state, r.state): 调用用户提供的Gatherer合并器,合并两个子任务的Gatherer状态。l.collectorState = collectorCombiner.apply(...): 调用下游Collector的合并器,合并两个子任务的Collector状态。
onCompletion
// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {spliterator = null; // GC assistanceif (leftChild != null) {/* ... */localResult = merge(leftChild.localResult, rightChild.localResult);leftChild = rightChild = null; // GC assistance}}
// ... existing code ...
- 当一个任务的所有子任务都完成时(即
pendingCount减到0),ForkJoin框架会调用此方法。 if (leftChild != null): 这判断当前任务是否是一个“父任务”(即它曾经分裂过)。叶子任务的leftChild为null,不会进入此逻辑。localResult = merge(...): 调用merge方法,将左右子任务的localResult合并,并将合并后的结果存入当前父任务的localResult中。这个过程会从叶子节点开始,逐级向上合并,直到根任务。
总结
Parallel 类实现了一种经典的分治(Divide and Conquer)并行计算模式:
- 分解 (Divide):
compute方法将数据源递归地分裂成小块,构建出一个任务树。 - 解决 (Conquer): 树的每个叶子节点独立地、并行地处理自己的数据块,生成一个局部的结果(一个包含局部状态的
Sequential对象)。 - 合并 (Combine):
onCompletion和merge方法协同工作,将子任务的局部结果两两合并,自底向上地在任务树中传递,直到根节点产生最终的、完全合并的结果。
这种策略的适用前提是 Gatherer 和 Collector 都必须是可结合的(Associative),即 (a op b) op c 等价于 a op (b op c),这样才能保证无论分裂和合并的顺序如何,最终结果都是一致的。
