Java Stream ReduceOps
ReduceOps
ReduceOps
是一个纯粹的工厂/工具类。它的唯一职责就是通过其静态方法,创建用于执行“规约(Reduction)”操作的 TerminalOp
实例。在 Stream API 中,我们熟知的 reduce()
, collect()
, count()
, sum()
, max()
, min()
等都属于规约操作。这个 ReduceOps
类就是这些操作在底层的“总工厂”。
在分析具体方法前,我们必须理解这个类里反复出现的一个核心设计模式。几乎每一个 make...
方法都遵循这个模式:
- 定义一个内部类
ReducingSink
:这个Sink
是真正的“工人”。它知道具体如何接收元素并进行累加。它通常实现了AccumulatingSink
接口。 - 返回一个
ReduceOp
实例:这是一个TerminalOp
(终端操作)的实现。它像一个“任务描述书”,描述了这是一个规约操作。这个ReduceOp
对象最重要的一个方法就是makeSink()
,当 Stream 框架要执行这个操作时,就会调用makeSink()
来获取一个全新的、可以工作的ReducingSink
实例。
这个“任务描述(ReduceOp
)”与“具体工人(ReducingSink
)”相分离的设计,是理解 Stream 实现的关键。
/*** A type of {@code TerminalSink} that implements an associative reducing* operation on elements of type {@code T} and producing a result of type* {@code R}.** @param <T> the type of input element to the combining operation* @param <R> the result type* @param <K> the type of the {@code AccumulatingSink}.*/private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>extends TerminalSink<T, R> {void combine(K other);}
现在,我们有逻辑地分析每一个 make...
工厂方法。
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)
- 对应API:
Stream.reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
- 作用: 这是最通用的
reduce
操作,可以改变结果的类型。 - 代码分析:
// ... class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {@Overridepublic void begin(long size) {state = seed; // 1. 开始时,状态(state)被初始化为给定的种子(seed)}@Overridepublic void accept(T t) {// 2. 每接收一个元素t,就用reducer函数更新状态state = reducer.apply(state, t);}@Overridepublic void combine(ReducingSink other) {// 3. 在并行计算中,合并两个子任务的结果时,用combiner函数合并它们的状态state = combiner.apply(state, other.state);} } return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {@Overridepublic ReducingSink makeSink() {return new ReducingSink(); // 返回一个全新的Sink实例} }; // ...
seed
、reducer
和combiner
三个函数,分别对应到了ReducingSink
的初始化、元素累加和结果合并这三个阶段。
makeRef(BinaryOperator<T> operator)
- 对应API:
Stream.reduce(BinaryOperator<T> accumulator)
- 作用: 这是不带初始值的
reduce
,因此结果是一个Optional<T>
,因为流可能为空。 - 代码分析:
// ... class ReducingSinkimplements AccumulatingSink<T, Optional<T>, ReducingSink> {private boolean empty; // 标志位,判断是否接收过元素private T state;public void begin(long size) {empty = true; // 一开始是空的state = null;}@Overridepublic void accept(T t) {if (empty) { // 如果是第一个元素empty = false;state = t; // 直接作为初始状态} else { // 如果不是第一个state = operator.apply(state, t); // 用operator进行累加}}@Overridepublic Optional<T> get() {// 最终获取结果时,根据empty标志判断是返回Optional.empty()还是包装后的statereturn empty ? Optional.empty() : Optional.of(state);}// ... } // ...
Sink
的设计巧妙地处理了没有初始值的情况。它用一个empty
标志来区分“第一个元素”和“后续元素”,并在最后根据这个标志返回正确的Optional
结果。
makeRef(Collector<? super T, I, ?> collector)
对应API:
Stream.collect(Collector<? super T, A, R> collector)
作用: 使用
Collector
进行规约的操作。代码分析:
// ... public static <T, I> TerminalOp<T, I>makeRef(Collector<? super T, I, ?> collector) { // 1. 首先从 Collector 中提取出三个核心函数 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner();class ReducingSink extends Box<I>implements AccumulatingSink<T, I, ReducingSink> {@Overridepublic void begin(long size) {// 2. 初始化时,调用 supplier 创建一个新的结果容器state = supplier.get();}@Overridepublic void accept(T t) {// 3. 累加时,调用 accumulator 将元素添加到结果容器中accumulator.accept(state, t);}@Overridepublic void combine(ReducingSink other) {// 4. 合并时,调用 combiner 合并两个结果容器state = combiner.apply(state, other.state);} } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {@Overridepublic ReducingSink makeSink() {return new ReducingSink(); // 同样,返回一个全新的Sink实例}// ... }; // ...
逻辑梳理: 这段代码是连接
collect
和reduce
模型的关键桥梁。它清晰地表明,collect(collector)
在底层就是一种reduce
操作。Collector
接口只是一个“规格说明书”,它把reduce
操作所需要的“三要素”(supplier
,accumulator
,combiner
)优雅地打包在了一起。请注意:
makeSink()
方法每次被调用都会通过supplier.get()
创建一个全新的结果容器(比如一个新的ArrayList
)。这一点对于我们稍后讨论并发至关重要,因为它意味着每个并行任务都会有自己独立的存储空间。
其他make方法
makeInt
,makeLong
,makeDouble
: 这些是针对原始类型流(IntStream
等)的优化版本。它们的逻辑与makeRef
完全相同,只是操作的是int
,long
,double
等原始类型,避免了自动装箱/拆箱的性能开销。makeRefCounting
: 这是为count()
操作定制的。它有一个重要的优化:在evaluateSequential
和evaluateParallel
方法里,它会先尝试调用helper.exactOutputSizeIfKnown(spliterator)
。如果流的大小是已知的(例如,源头是ArrayList
),它会直接返回大小,根本不会启动流的求值过程。只有在大小未知时,才会真正创建一个CountingSink
来一个个地数。
例如,makeInt(int identity, IntBinaryOperator operator)
对应 IntStream.reduce(int, IntBinaryOperator)
,其内部的 ReducingSink
直接操作 int state
。
// ...public static TerminalOp<Integer, Integer>makeInt(int identity, IntBinaryOperator operator) {Objects.requireNonNull(operator);class ReducingSinkimplements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {private int state;@Overridepublic void begin(long size) {state = identity;}@Overridepublic void accept(int t) {state = operator.applyAsInt(state, t);}
// ...
CountingSink<T>
类
这是一个非常典型的、为特定优化场景设计的内部类。
// ... existing code .../*** A sink that counts elements*/abstract static class CountingSink<T>extends Box<Long>implements AccumulatingSink<T, Long, CountingSink<T>> {long count;@Overridepublic void begin(long size) {count = 0L;}@Overridepublic Long get() {return count;}@Overridepublic void combine(CountingSink<T> other) {count += other.count;}
// ... existing code ...
从这段代码我们可以分析出它的核心职责和设计思想:
定位: 它的注释说的很清楚——"A sink that counts elements"(一个用来数元素的 Sink)。它的唯一目标就是实现
Stream.count()
操作。继承与实现:
extends Box<Long>
: 它继承了Box<Long>
,这意味着它内部有一个叫做state
的字段,类型是Long
。但请注意,CountingSink
自己又定义了一个long count
字段,并且get()
方法返回的是count
而不是state
。这里的继承主要是为了类型兼容,实际的状态存储在count
字段中,这更高效,因为它避免了Long
对象的包装。implements AccumulatingSink<T, Long, CountingSink<T>>
: 它实现了我们之前讨论过的AccumulatingSink
接口。这表明它是一个标准的、可累加的终端 Sink。T
: 输入的元素类型。Long
: 输出的结果类型(计数值)。CountingSink<T>
: 合并时,另一个 Sink 的类型。
核心字段:
long count;
: 这是最重要的字段,一个原始的long
类型变量,用来存储计数值。使用long
而不是Long
是为了性能,避免了不必要的对象创建和拆箱装箱。
已实现的方法:
begin(long size)
: 在流处理开始前调用。它的作用是将计数器count
初始化为 0。get()
: 在所有元素处理完毕后调用。它返回最终的计数值count
。combine(CountingSink<T> other)
: 在并行计算中,用于合并两个子任务的结果。它的逻辑非常简单直接:把另一个CountingSink
的count
值加到自己的count
上。count += other.count;
abstract
的原因: 这个类是抽象的,因为它没有实现Sink
接口中的accept
方法。accept
方法是用来接收单个元素的,但如何“接收”一个元素并更新计数器,对于引用类型 (T
)、int
、long
、double
是有细微差别的。因此,这个通用的父类把accept
的具体实现留给了子类。
子类的职责:实现 accept
CountingSink
有四个静态内部子类,分别对应不同的流类型。它们的唯一职责就是提供一个最高效的 accept
方法实现。
static final class OfRef<T> extends CountingSink<T>
static final class OfRef<T> extends CountingSink<T> {@Overridepublic void accept(T t) {count++;} }
- 作用: 用于
Stream<T>
,即引用类型的流。 - 实现:
accept(T t)
方法接收一个泛型对象t
。它完全忽略这个对象的内容,只是简单地将count
加一。
- 作用: 用于
static final class OfInt extends CountingSink<Integer> implements Sink.OfInt
static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {@Overridepublic void accept(int t) {count++;} }
- 作用: 用于
IntStream
。 - 实现: 它实现了
Sink.OfInt
接口,这让它可以接收一个原始类型int
。accept(int t)
方法接收一个int
值,同样忽略其内容,只将count
加一。这避免了将int
包装成Integer
再传递,性能更高。
- 作用: 用于
static final class OfLong extends CountingSink<Long> implements Sink.OfLong
static final class OfLong extends CountingSink<Long> implements Sink.OfLong {@Overridepublic void accept(long t) {count++;} }
- 作用: 用于
LongStream
。 - 实现: 与
OfInt
类似,它实现了Sink.OfLong
,直接处理long
类型,避免了包装成Long
对象。
- 作用: 用于
总结
CountingSink
(抽象父类) 搭建了count()
操作的通用框架:- 定义了
count
字段用于存储结果。 - 实现了
begin
(初始化)、get
(获取结果)、combine
(并行合并) 的通用逻辑。 - 将最核心的
accept
(消费元素) 方法定义为抽象方法,交由子类实现。
- 定义了
CountingSink
的子类 (OfRef
,OfInt
,OfLong
,OfDouble
) 的唯一职责是:- 为特定类型的流(引用、int、long、double)提供一个最高效的
accept
方法。 - 通过实现
Sink.OfInt
等特定接口,使得它们可以直接处理原始数据类型,避免了自动装箱/拆箱的性能损耗,这是 Stream 框架中一个非常重要的优化手段。
- 为特定类型的流(引用、int、long、double)提供一个最高效的
ReduceOp类
首先看定义: private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> implements TerminalOp<T, R>
private abstract static class
:private
: 只能在ReduceOps
内部被访问和继承。static
: 它不持有外部类ReduceOps
的实例引用。abstract
: 它是一个抽象类,不能被直接实例化。ReduceOps
中的每个make...
方法都会返回一个它的匿名子类实例。
implements TerminalOp<T, R>
:- 这明确了它的身份:一个终端操作(Terminal Operation)。这是 Stream 流水线的最后一环,负责触发整个流的计算并产生最终结果。
泛型参数解析:
T
(Input Type): 流中元素的类型,即终端操作的输入类型。例如,在Stream<String>
中,T
就是String
。R
(Result Type): 规约操作最终的结果类型。例如,collect(Collectors.toList())
的结果是List<T>
,那么R
就是List<T>
;count()
的结果是Long
,R
就是Long
。S
(Sink Type): 执行此规约操作的Sink
的具体类型。这个S
必须是AccumulatingSink<T, R, S>
的子类。这保证了S
知道如何接收T
类型的元素,产生R
类型的结果,并且知道如何与另一个S
类型的Sink
合并。
一句话总结定位:ReduceOp
是一个抽象的、通用的终端操作描述符,它专门用于描述“使用一个可累加的 Sink (S
) 来处理一个元素类型为 T
的流,并最终产生一个类型为 R
的结果”这类任务。
我们来看它的内部实现:
// ... existing code ...private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>implements TerminalOp<T, R> {private final StreamShape inputShape;/*** Create a {@code ReduceOp} of the specified stream shape which uses* the specified {@code Supplier} to create accumulating sinks.** @param shape The shape of the stream pipeline*/ReduceOp(StreamShape shape) {inputShape = shape;}public abstract S makeSink();@Overridepublic StreamShape inputShape() {return inputShape;}@Overridepublic <P_IN> R evaluateSequential(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return helper.wrapAndCopyInto(makeSink(), spliterator).get();}@Overridepublic <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();}}
// ... existing code ...
private final StreamShape inputShape;
:- 这个字段记录了该操作期望接收的流的“形状”(
REFERENCE
引用类型,INT_VALUE
,LONG_VALUE
,DOUBLE_VALUE
)。这对于 Stream 框架进行类型检查和优化很重要。
- 这个字段记录了该操作期望接收的流的“形状”(
public abstract S makeSink();
:- 这是
ReduceOp
最核心的抽象方法。它定义了一个契约:任何具体的ReduceOp
实现都必须能制造出用于执行该操作的Sink
实例 (S
)。 - 我们在之前的分析中看到,
ReduceOps.makeRef(...)
等工厂方法返回的匿名子类,其唯一目的就是实现这个makeSink()
方法,返回一个配置好的ReducingSink
或CountingSink
。
- 这是
evaluateSequential(...)
:- 这是执行串行规约操作的入口。
- 逻辑非常清晰:
makeSink()
: 创建一个全新的 Sink 实例。helper.wrapAndCopyInto(...)
: 这是一个关键的辅助方法。它会: a.wrapSink(...)
: 将这个全新的Sink
传递给上游,让流水线上的所有中间操作(map
,filter
等)对它进行层层包装,形成一个完整的Sink
链。 b.copyInto(...)
: 驱动Spliterator
遍历所有源数据,并将每个元素喂给包装好的Sink
链的头部。.get()
: 当所有元素都处理完毕后,从Sink
中获取最终的规约结果R
。
evaluateParallel(...)
:- 这是执行并行规约操作的入口。
- 它没有直接处理数据,而是将任务委托给了一个专门的
ForkJoinTask
:new ReduceTask<>(this, helper, spliterator)
: 创建一个ReduceTask
。注意,它把this
(也就是ReduceOp
自身)传递了进去。.invoke()
: 将这个任务提交给ForkJoinPool
去执行。ForkJoin
框架会负责任务的分解(split)和执行。.get()
:ReduceTask
在其内部完成了所有并行计算和结果合并后,会持有一个最终的Sink
,调用.get()
从这个Sink
中获取最终结果。
ReduceTask
与 ReduceOp
的协作
在并行模式下,ReduceTask
会使用它持有的 ReduceOp
。具体来说,在 ReduceTask
的 doLeaf()
方法(处理最小任务单元的方法)中,它会调用 op.makeSink()
。
// 在 ReduceTask 类中
@Override
protected S doLeaf() {return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}
这再次印证了 makeSink()
的重要性:在并行计算中,每一个最小的叶子任务(leaf task)都会通过 op.makeSink()
创建一个自己专属的、全新的、隔离的 Sink
实例(例如,一个新的 ArrayList
或一个新的 long[]
)。
当叶子任务完成,它的父任务会在 onCompletion
方法中将两个子任务的 Sink
结果进行合并:
// 在 ReduceTask 类中
@Override
public void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {S leftResult = leftChild.getLocalResult(); // 获取左子任务的SinkleftResult.combine(rightChild.getLocalResult()); // 调用Sink的combine方法合并setLocalResult(leftResult); // 将合并后的Sink作为当前任务的结果}// ...
}
总结
ReduceOp
是一个设计精良的抽象,它完美地封装了“规约”这一终端操作的共性:
- 是什么 (What): 它通过泛型
T
,R
,S
和inputShape
字段,清晰地描述了操作的输入、输出和内部机制。 - 怎么做 (How):
- 通过抽象方法
makeSink()
,将创建具体工作单元(Sink)的责任委托给子类。 - 通过
evaluateSequential
方法,定义了串行执行的标准流程:创建 Sink -> 包装 Sink -> 灌入数据 -> 获取结果。 - 通过
evaluateParallel
方法,定义了并行执行的标准流程:创建并行任务(ReduceTask
) -> 委托执行。
- 通过抽象方法
它作为连接上层 API (Stream.reduce
, Stream.collect
) 和底层执行框架 (Sink
, ForkJoinTask
) 的桥梁,是实现规约操作可扩展、可并行、类型安全的关键所在。
ReduceTask
这个类是 java.util.concurrent.ForkJoinTask
的一个子类,专门用于在 ForkJoinPool
中执行并行的规约操作。ReduceOp
在调用 evaluateParallel
时,就是创建并启动了这个任务。
首先看它的定义:
private static final class ReduceTask<P_IN, P_OUT, R,
S extends AccumulatingSink<P_OUT, R, S>>
extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>>
泛型参数解析:
P_IN
(Pipeline Input): 整个流水线最开始的输入元素类型。P_OUT
(Pipeline Output): 经过所有中间操作(map
,filter
等)后,到达这个终端操作的元素类型。这也就是ReduceOp
中的T
。R
(Result): 最终的规约结果类型。S
(Sink): 执行规约操作的Sink
的具体类型。
继承关系:
- 它继承自
AbstractTask
。AbstractTask
是 Stream API 内部对ForkJoinTask
的一个通用封装,处理了任务分裂(splitting)的通用逻辑。ReduceTask
在此基础上增加了规约操作的特定逻辑。
- 它继承自
一句话总结定位:ReduceTask
是一个具体的 ForkJoinTask
实现,它封装了如何将一个大的规约任务分解成小任务,并行执行,并最终将结果合并的全部逻辑。
我们来逐一分析它的代码实现。
// ... existing code ...@SuppressWarnings("serial")private static final class ReduceTask<P_IN, P_OUT, R,S extends AccumulatingSink<P_OUT, R, S>>extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {private final ReduceOp<P_OUT, R, S> op;ReduceTask(ReduceOp<P_OUT, R, S> op,PipelineHelper<P_OUT> helper,Spliterator<P_IN> spliterator) {super(helper, spliterator);this.op = op;}ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,Spliterator<P_IN> spliterator) {super(parent, spliterator);this.op = parent.op;}@Overrideprotected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, spliterator);}@Overrideprotected S doLeaf() {return helper.wrapAndCopyInto(op.makeSink(), spliterator);}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {S leftResult = leftChild.getLocalResult();leftResult.combine(rightChild.getLocalResult());setLocalResult(leftResult);}// GC spliterator, left and right childsuper.onCompletion(caller);}}
// ... existing code ...
private final ReduceOp<P_OUT, R, S> op;
:- 这是
ReduceTask
最重要的字段。它持有一个ReduceOp
的引用。我们之前分析过,ReduceOp
是一个“任务描述书”,ReduceTask
就是根据这个描述书来工作的。最关键的是,它需要通过op.makeSink()
来创建Sink
。
- 这是
构造函数:
- 有两个构造函数。第一个用于创建根任务(root task),它接收
ReduceOp
,PipelineHelper
和Spliterator
。第二个用于创建子任务(child task),它从父任务那里继承op
和helper
,只接收一个新的、分裂过的Spliterator
。
- 有两个构造函数。第一个用于创建根任务(root task),它接收
protected ReduceTask<...> makeChild(...)
:- 这是
AbstractTask
要求子类实现的方法。当ForkJoin
框架需要分裂任务时,会调用这个方法来创建一个新的子任务。它只是简单地调用了第二个构造函数。
- 这是
protected S doLeaf()
:- 这是并行执行的“叶子”逻辑。当任务被分裂到足够小,不能再继续分裂时(由
spliterator.trySplit()
返回null
决定),ForkJoin
框架会调用doLeaf()
方法来执行实际的工作。 - 它的逻辑和我们之前分析的
evaluateSequential
非常相似:op.makeSink()
: 调用ReduceOp
的工厂方法,创建一个全新的、局部的Sink
实例。这是并行安全的核心!每个叶子任务都有自己独立的、不被其他线程干扰的结果容器(比如一个独立的ArrayList
)。helper.wrapAndCopyInto(...)
: 将这个叶子任务所负责的一小部分数据,通过流水线灌入这个全新的Sink
中。return ...
: 返回这个已经包含了部分结果的Sink
。
- 这是并行执行的“叶子”逻辑。当任务被分裂到足够小,不能再继续分裂时(由
public void onCompletion(CountedCompleter<?> caller)
:- 这是并行执行的“合并”逻辑。
CountedCompleter
是ForkJoinTask
的一个子类,它支持在任务完成时触发一个回调,就是这个onCompletion
方法。 - 当一个任务的两个子任务(
leftChild
和rightChild
)都完成了,它的onCompletion
方法就会被调用。 - 逻辑如下:
if (!isLeaf())
: 检查当前任务不是叶子任务(叶子任务没有子任务,不需要合并)。S leftResult = leftChild.getLocalResult();
: 获取左子任务执行doLeaf()
或它自己的onCompletion
后得到的结果Sink
。leftResult.combine(rightChild.getLocalResult());
: 调用Sink
的combine
方法,将右子任务的结果Sink
合并到左子任务的Sink
中。例如,对于collect(Collectors.toList())
,这里执行的就是list1.addAll(list2)
。setLocalResult(leftResult);
: 将合并后的Sink
(leftResult
) 设置为当前任务的本地结果。这个结果又可以被它的父任务在onCompletion
中使用,如此层层向上合并。
super.onCompletion(caller);
: 调用父类的onCompletion
,主要用于清理引用(比如spliterator
,leftChild
,rightChild
),帮助垃圾回收。
- 这是并行执行的“合并”逻辑。
collect
并行安全性
现在,我们可以将所有分析串联起来,分析 collect
并行安全性的问题:
起点:
stream.parallel().collect(collector)
调用ReduceOps.makeRef(collector)
,创建了一个ReduceOp
。这个ReduceOp
的makeSink()
方法被实现为{ return new ReducingSink(); }
,而ReducingSink
的begin()
方法会调用collector.supplier().get()
。并行执行( evaluate( terminalOp) ):
ReduceOp
的evaluateParallel
方法创建并启动了一个ReduceTask
。任务分裂 (Fork):
ForkJoinPool
将ReduceTask
不断分裂成更小的子任务,直到任务不可再分,成为“叶子任务”。叶子任务处理 (Leaf Execution): 每个叶子任务在自己独立的线程中执行
doLeaf()
方法。在doLeaf()
中,它通过op.makeSink()
->new ReducingSink()
->supplier.get()
创建了一个全新的、线程私有的结果容器(如一个新的ArrayList
)。然后它把自己负责的一小部分数据累加到这个私有容器中。因为容器是私有的,所以完全没有线程安全问题。结果合并 (Join/Combine): 当子任务完成,它们的父任务在
onCompletion
中被唤醒。父任务获取两个子任务的结果容器(两个独立的ArrayList
),并调用leftResult.combine(rightResult)
,这实际上是调用了collector.combiner()
(例如list1.addAll(list2)
)。这个合并操作将两个部分结果合并成一个。层层合并: 这个合并过程从最底层的叶子任务的父任务开始,一路向上,直到根任务将所有部分结果都合 并成一个最终的结果容器。
这个 “分裂 -> 独立容器累加 -> 两两合并” 的模式,就是 MapReduce 思想的一种体现,也是 Fork/Join
框架的经典应用。它通过避免在并行阶段共享可变状态,从根本上保证了 collect
操作的线程安全和高效率。