当前位置: 首页 > news >正文

Java Stream ReduceOps

ReduceOps

ReduceOps 是一个纯粹的工厂/工具类。它的唯一职责就是通过其静态方法,创建用于执行“规约(Reduction)”操作的 TerminalOp 实例。在 Stream API 中,我们熟知的 reduce()collect()count()sum()max()min() 等都属于规约操作。这个 ReduceOps 类就是这些操作在底层的“总工厂”。

在分析具体方法前,我们必须理解这个类里反复出现的一个核心设计模式。几乎每一个 make... 方法都遵循这个模式:

  1. 定义一个内部类 ReducingSink:这个 Sink 是真正的“工人”。它知道具体如何接收元素并进行累加。它通常实现了 AccumulatingSink 接口。
  2. 返回一个 ReduceOp 实例:这是一个 TerminalOp(终端操作)的实现。它像一个“任务描述书”,描述了这是一个规约操作。这个 ReduceOp 对象最重要的一个方法就是 makeSink(),当 Stream 框架要执行这个操作时,就会调用 makeSink() 来获取一个全新的、可以工作的 ReducingSink 实例。

这个“任务描述(ReduceOp)”与“具体工人(ReducingSink)”相分离的设计,是理解 Stream 实现的关键。

    /*** A type of {@code TerminalSink} that implements an associative reducing* operation on elements of type {@code T} and producing a result of type* {@code R}.** @param <T> the type of input element to the combining operation* @param <R> the result type* @param <K> the type of the {@code AccumulatingSink}.*/private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>extends TerminalSink<T, R> {void combine(K other);}

现在,我们有逻辑地分析每一个 make... 工厂方法。

makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)

  • 对应APIStream.reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
  • 作用: 这是最通用的 reduce 操作,可以改变结果的类型。
  • 代码分析:
    // ...
    class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {@Overridepublic void begin(long size) {state = seed; // 1. 开始时,状态(state)被初始化为给定的种子(seed)}@Overridepublic void accept(T t) {// 2. 每接收一个元素t,就用reducer函数更新状态state = reducer.apply(state, t);}@Overridepublic void combine(ReducingSink other) {// 3. 在并行计算中,合并两个子任务的结果时,用combiner函数合并它们的状态state = combiner.apply(state, other.state);}
    }
    return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {@Overridepublic ReducingSink makeSink() {return new ReducingSink(); // 返回一个全新的Sink实例}
    };
    // ...
    
    逻辑梳理: 这个工厂方法精确地将用户传入的 seedreducer 和 combiner 三个函数,分别对应到了 ReducingSink 的初始化、元素累加和结果合并这三个阶段。

makeRef(BinaryOperator<T> operator)

  • 对应APIStream.reduce(BinaryOperator<T> accumulator)
  • 作用: 这是不带初始值的 reduce,因此结果是一个 Optional<T>,因为流可能为空。
  • 代码分析:
    // ...
    class ReducingSinkimplements AccumulatingSink<T, Optional<T>, ReducingSink> {private boolean empty; // 标志位,判断是否接收过元素private T state;public void begin(long size) {empty = true; // 一开始是空的state = null;}@Overridepublic void accept(T t) {if (empty) { // 如果是第一个元素empty = false;state = t; // 直接作为初始状态} else { // 如果不是第一个state = operator.apply(state, t); // 用operator进行累加}}@Overridepublic Optional<T> get() {// 最终获取结果时,根据empty标志判断是返回Optional.empty()还是包装后的statereturn empty ? Optional.empty() : Optional.of(state);}// ...
    }
    // ...
    
    逻辑梳理: 这个 Sink 的设计巧妙地处理了没有初始值的情况。它用一个 empty 标志来区分“第一个元素”和“后续元素”,并在最后根据这个标志返回正确的 Optional 结果。

makeRef(Collector<? super T, I, ?> collector)

  • 对应APIStream.collect(Collector<? super T, A, R> collector)

  • 作用: 使用 Collector 进行规约的操作。

  • 代码分析:

    // ...
    public static <T, I> TerminalOp<T, I>makeRef(Collector<? super T, I, ?> collector) {
    // 1. 首先从 Collector 中提取出三个核心函数
    Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
    BiConsumer<I, ? super T> accumulator = collector.accumulator();
    BinaryOperator<I> combiner = collector.combiner();class ReducingSink extends Box<I>implements AccumulatingSink<T, I, ReducingSink> {@Overridepublic void begin(long size) {// 2. 初始化时,调用 supplier 创建一个新的结果容器state = supplier.get();}@Overridepublic void accept(T t) {// 3. 累加时,调用 accumulator 将元素添加到结果容器中accumulator.accept(state, t);}@Overridepublic void combine(ReducingSink other) {// 4. 合并时,调用 combiner 合并两个结果容器state = combiner.apply(state, other.state);}
    }
    return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {@Overridepublic ReducingSink makeSink() {return new ReducingSink(); // 同样,返回一个全新的Sink实例}// ...
    };
    // ...
    

    逻辑梳理: 这段代码是连接 collect 和 reduce 模型的关键桥梁。它清晰地表明,collect(collector) 在底层就是一种 reduce 操作Collector 接口只是一个“规格说明书”,它把 reduce 操作所需要的“三要素”(supplieraccumulatorcombiner)优雅地打包在了一起。

    请注意makeSink() 方法每次被调用都会通过 supplier.get() 创建一个全新的结果容器(比如一个新的 ArrayList)。这一点对于我们稍后讨论并发至关重要,因为它意味着每个并行任务都会有自己独立的存储空间。

其他make方法

  • makeIntmakeLongmakeDouble: 这些是针对原始类型流(IntStream 等)的优化版本。它们的逻辑与 makeRef 完全相同,只是操作的是 intlongdouble 等原始类型,避免了自动装箱/拆箱的性能开销。
  • makeRefCounting: 这是为 count() 操作定制的。它有一个重要的优化:在 evaluateSequential 和 evaluateParallel 方法里,它会先尝试调用 helper.exactOutputSizeIfKnown(spliterator)。如果流的大小是已知的(例如,源头是 ArrayList),它会直接返回大小,根本不会启动流的求值过程。只有在大小未知时,才会真正创建一个 CountingSink 来一个个地数。

 例如,makeInt(int identity, IntBinaryOperator operator) 对应 IntStream.reduce(int, IntBinaryOperator),其内部的 ReducingSink 直接操作 int state

// ...public static TerminalOp<Integer, Integer>makeInt(int identity, IntBinaryOperator operator) {Objects.requireNonNull(operator);class ReducingSinkimplements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {private int state;@Overridepublic void begin(long size) {state = identity;}@Overridepublic void accept(int t) {state = operator.applyAsInt(state, t);}
// ...

CountingSink<T> 类

这是一个非常典型的、为特定优化场景设计的内部类。

// ... existing code .../*** A sink that counts elements*/abstract static class CountingSink<T>extends Box<Long>implements AccumulatingSink<T, Long, CountingSink<T>> {long count;@Overridepublic void begin(long size) {count = 0L;}@Overridepublic Long get() {return count;}@Overridepublic void combine(CountingSink<T> other) {count += other.count;}
// ... existing code ...

从这段代码我们可以分析出它的核心职责和设计思想:

  1. 定位: 它的注释说的很清楚——"A sink that counts elements"(一个用来数元素的 Sink)。它的唯一目标就是实现 Stream.count() 操作。

  2. 继承与实现:

    • extends Box<Long>: 它继承了 Box<Long>,这意味着它内部有一个叫做 state 的字段,类型是 Long。但请注意,CountingSink 自己又定义了一个 long count 字段,并且 get() 方法返回的是 count 而不是 state。这里的继承主要是为了类型兼容,实际的状态存储在 count 字段中,这更高效,因为它避免了 Long 对象的包装。
    • implements AccumulatingSink<T, Long, CountingSink<T>>: 它实现了我们之前讨论过的 AccumulatingSink 接口。这表明它是一个标准的、可累加的终端 Sink。
      • T: 输入的元素类型。
      • Long: 输出的结果类型(计数值)。
      • CountingSink<T>: 合并时,另一个 Sink 的类型。
  3. 核心字段:

    • long count;: 这是最重要的字段,一个原始的 long 类型变量,用来存储计数值。使用 long 而不是 Long 是为了性能,避免了不必要的对象创建和拆箱装箱。
  4. 已实现的方法:

    • begin(long size): 在流处理开始前调用。它的作用是将计数器 count 初始化为 0。
    • get(): 在所有元素处理完毕后调用。它返回最终的计数值 count
    • combine(CountingSink<T> other): 在并行计算中,用于合并两个子任务的结果。它的逻辑非常简单直接:把另一个 CountingSink 的 count 值加到自己的 count 上。count += other.count;
  5. abstract 的原因: 这个类是抽象的,因为它没有实现 Sink 接口中的 accept 方法。accept 方法是用来接收单个元素的,但如何“接收”一个元素并更新计数器,对于引用类型 (T)、intlongdouble 是有细微差别的。因此,这个通用的父类把 accept 的具体实现留给了子类。

子类的职责:实现 accept

CountingSink 有四个静态内部子类,分别对应不同的流类型。它们的唯一职责就是提供一个最高效的 accept 方法实现。

  1. static final class OfRef<T> extends CountingSink<T>

    static final class OfRef<T> extends CountingSink<T> {@Overridepublic void accept(T t) {count++;}
    }
    
    • 作用: 用于 Stream<T>,即引用类型的流。
    • 实现accept(T t) 方法接收一个泛型对象 t。它完全忽略这个对象的内容,只是简单地将 count 加一。
  2. static final class OfInt extends CountingSink<Integer> implements Sink.OfInt

    static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {@Overridepublic void accept(int t) {count++;}
    }
    
    • 作用: 用于 IntStream
    • 实现: 它实现了 Sink.OfInt 接口,这让它可以接收一个原始类型 intaccept(int t) 方法接收一个 int 值,同样忽略其内容,只将 count 加一。这避免了将 int 包装成 Integer 再传递,性能更高。
  3. static final class OfLong extends CountingSink<Long> implements Sink.OfLong

    static final class OfLong extends CountingSink<Long> implements Sink.OfLong {@Overridepublic void accept(long t) {count++;}
    }
    
    • 作用: 用于 LongStream
    • 实现: 与 OfInt 类似,它实现了 Sink.OfLong,直接处理 long 类型,避免了包装成 Long 对象。

总结

  • CountingSink (抽象父类) 搭建了 count() 操作的通用框架:

    • 定义了 count 字段用于存储结果。
    • 实现了 begin (初始化)、get (获取结果)、combine (并行合并) 的通用逻辑。
    • 将最核心的 accept (消费元素) 方法定义为抽象方法,交由子类实现。
  • CountingSink 的子类 (OfRefOfIntOfLongOfDouble) 的唯一职责是:

    • 为特定类型的流(引用、int、long、double)提供一个最高效的 accept 方法。
    • 通过实现 Sink.OfInt 等特定接口,使得它们可以直接处理原始数据类型,避免了自动装箱/拆箱的性能损耗,这是 Stream 框架中一个非常重要的优化手段。

ReduceOp类

首先看定义: private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> implements TerminalOp<T, R>

  1. private abstract static class:

    • private: 只能在 ReduceOps 内部被访问和继承。
    • static: 它不持有外部类 ReduceOps 的实例引用。
    • abstract: 它是一个抽象类,不能被直接实例化。ReduceOps 中的每个 make... 方法都会返回一个它的匿名子类实例。
  2. implements TerminalOp<T, R>:

    • 这明确了它的身份:一个终端操作(Terminal Operation)。这是 Stream 流水线的最后一环,负责触发整个流的计算并产生最终结果。
  3. 泛型参数解析:

    • T (Input Type): 流中元素的类型,即终端操作的输入类型。例如,在 Stream<String> 中,T 就是 String
    • R (Result Type): 规约操作最终的结果类型。例如,collect(Collectors.toList()) 的结果是 List<T>,那么 R 就是 List<T>count() 的结果是 LongR 就是 Long
    • S (Sink Type): 执行此规约操作的 Sink 的具体类型。这个 S 必须是 AccumulatingSink<T, R, S> 的子类。这保证了 S 知道如何接收 T 类型的元素,产生 R 类型的结果,并且知道如何与另一个 S 类型的 Sink 合并。

一句话总结定位ReduceOp 是一个抽象的、通用的终端操作描述符,它专门用于描述“使用一个可累加的 Sink (S) 来处理一个元素类型为 T 的流,并最终产生一个类型为 R 的结果”这类任务。


我们来看它的内部实现:

// ... existing code ...private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>implements TerminalOp<T, R> {private final StreamShape inputShape;/*** Create a {@code ReduceOp} of the specified stream shape which uses* the specified {@code Supplier} to create accumulating sinks.** @param shape The shape of the stream pipeline*/ReduceOp(StreamShape shape) {inputShape = shape;}public abstract S makeSink();@Overridepublic StreamShape inputShape() {return inputShape;}@Overridepublic <P_IN> R evaluateSequential(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return helper.wrapAndCopyInto(makeSink(), spliterator).get();}@Overridepublic <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();}}
// ... existing code ...
  1. private final StreamShape inputShape;:

    • 这个字段记录了该操作期望接收的流的“形状”(REFERENCE 引用类型, INT_VALUELONG_VALUEDOUBLE_VALUE)。这对于 Stream 框架进行类型检查和优化很重要。
  2. public abstract S makeSink();:

    • 这是 ReduceOp 最核心的抽象方法。它定义了一个契约:任何具体的 ReduceOp 实现都必须能制造出用于执行该操作的 Sink 实例 (S)。
    • 我们在之前的分析中看到,ReduceOps.makeRef(...) 等工厂方法返回的匿名子类,其唯一目的就是实现这个 makeSink() 方法,返回一个配置好的 ReducingSink 或 CountingSink
  3. evaluateSequential(...):

    • 这是执行串行规约操作的入口。
    • 逻辑非常清晰:
      1. makeSink()创建一个全新的 Sink 实例
      2. helper.wrapAndCopyInto(...): 这是一个关键的辅助方法。它会: a. wrapSink(...): 将这个全新的 Sink 传递给上游,让流水线上的所有中间操作(mapfilter等)对它进行层层包装,形成一个完整的 Sink 链。 b. copyInto(...): 驱动 Spliterator 遍历所有源数据,并将每个元素喂给包装好的 Sink 链的头部。
      3. .get(): 当所有元素都处理完毕后,从 Sink 中获取最终的规约结果 R
  4. evaluateParallel(...):

    • 这是执行并行规约操作的入口。
    • 它没有直接处理数据,而是将任务委托给了一个专门的 ForkJoinTask
      1. new ReduceTask<>(this, helper, spliterator): 创建一个 ReduceTask。注意,它把 this(也就是 ReduceOp 自身)传递了进去。
      2. .invoke(): 将这个任务提交给 ForkJoinPool 去执行。ForkJoin 框架会负责任务的分解(split)和执行。
      3. .get()ReduceTask 在其内部完成了所有并行计算和结果合并后,会持有一个最终的 Sink,调用 .get() 从这个 Sink 中获取最终结果。

ReduceTask 与 ReduceOp 的协作

在并行模式下,ReduceTask 会使用它持有的 ReduceOp。具体来说,在 ReduceTask 的 doLeaf() 方法(处理最小任务单元的方法)中,它会调用 op.makeSink()

// 在 ReduceTask 类中
@Override
protected S doLeaf() {return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}

这再次印证了 makeSink() 的重要性:在并行计算中,每一个最小的叶子任务(leaf task)都会通过 op.makeSink() 创建一个自己专属的、全新的、隔离的 Sink 实例(例如,一个新的 ArrayList 或一个新的 long[])。

当叶子任务完成,它的父任务会在 onCompletion 方法中将两个子任务的 Sink 结果进行合并:

// 在 ReduceTask 类中
@Override
public void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {S leftResult = leftChild.getLocalResult(); // 获取左子任务的SinkleftResult.combine(rightChild.getLocalResult()); // 调用Sink的combine方法合并setLocalResult(leftResult); // 将合并后的Sink作为当前任务的结果}// ...
}

总结

ReduceOp 是一个设计精良的抽象,它完美地封装了“规约”这一终端操作的共性:

  • 是什么 (What): 它通过泛型 TRS 和 inputShape 字段,清晰地描述了操作的输入、输出和内部机制。
  • 怎么做 (How):
    • 通过抽象方法 makeSink(),将创建具体工作单元(Sink)的责任委托给子类。
    • 通过 evaluateSequential 方法,定义了串行执行的标准流程:创建 Sink -> 包装 Sink -> 灌入数据 -> 获取结果。
    • 通过 evaluateParallel 方法,定义了并行执行的标准流程:创建并行任务(ReduceTask) -> 委托执行。

它作为连接上层 API (Stream.reduceStream.collect) 和底层执行框架 (SinkForkJoinTask) 的桥梁,是实现规约操作可扩展、可并行、类型安全的关键所在。

ReduceTask

这个类是 java.util.concurrent.ForkJoinTask 的一个子类,专门用于在 ForkJoinPool 中执行并行的规约操作。ReduceOp 在调用 evaluateParallel 时,就是创建并启动了这个任务。

首先看它的定义: 

private static final class ReduceTask<P_IN, P_OUT, R, 
S extends AccumulatingSink<P_OUT, R, S>> 
extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>>

  1. 泛型参数解析:

    • P_IN (Pipeline Input): 整个流水线最开始的输入元素类型。
    • P_OUT (Pipeline Output): 经过所有中间操作(mapfilter 等)后,到达这个终端操作的元素类型。这也就是 ReduceOp 中的 T
    • R (Result): 最终的规约结果类型。
    • S (Sink): 执行规约操作的 Sink 的具体类型。
  2. 继承关系:

    • 它继承自 AbstractTaskAbstractTask 是 Stream API 内部对 ForkJoinTask 的一个通用封装,处理了任务分裂(splitting)的通用逻辑。ReduceTask 在此基础上增加了规约操作的特定逻辑。

一句话总结定位ReduceTask 是一个具体的 ForkJoinTask 实现,它封装了如何将一个大的规约任务分解成小任务,并行执行,并最终将结果合并的全部逻辑。


我们来逐一分析它的代码实现。

// ... existing code ...@SuppressWarnings("serial")private static final class ReduceTask<P_IN, P_OUT, R,S extends AccumulatingSink<P_OUT, R, S>>extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {private final ReduceOp<P_OUT, R, S> op;ReduceTask(ReduceOp<P_OUT, R, S> op,PipelineHelper<P_OUT> helper,Spliterator<P_IN> spliterator) {super(helper, spliterator);this.op = op;}ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,Spliterator<P_IN> spliterator) {super(parent, spliterator);this.op = parent.op;}@Overrideprotected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, spliterator);}@Overrideprotected S doLeaf() {return helper.wrapAndCopyInto(op.makeSink(), spliterator);}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {S leftResult = leftChild.getLocalResult();leftResult.combine(rightChild.getLocalResult());setLocalResult(leftResult);}// GC spliterator, left and right childsuper.onCompletion(caller);}}
// ... existing code ...
  1. private final ReduceOp<P_OUT, R, S> op;:

    • 这是 ReduceTask 最重要的字段。它持有一个 ReduceOp 的引用。我们之前分析过,ReduceOp 是一个“任务描述书”,ReduceTask 就是根据这个描述书来工作的。最关键的是,它需要通过 op.makeSink() 来创建 Sink
  2. 构造函数:

    • 有两个构造函数。第一个用于创建根任务(root task),它接收 ReduceOpPipelineHelper 和 Spliterator。第二个用于创建子任务(child task),它从父任务那里继承 op 和 helper,只接收一个新的、分裂过的 Spliterator
  3. protected ReduceTask<...> makeChild(...):

    • 这是 AbstractTask 要求子类实现的方法。当 ForkJoin 框架需要分裂任务时,会调用这个方法来创建一个新的子任务。它只是简单地调用了第二个构造函数。
  4. protected S doLeaf():

    • 这是并行执行的“叶子”逻辑。当任务被分裂到足够小,不能再继续分裂时(由 spliterator.trySplit() 返回 null 决定),ForkJoin 框架会调用 doLeaf() 方法来执行实际的工作。
    • 它的逻辑和我们之前分析的 evaluateSequential 非常相似:
      1. op.makeSink()调用 ReduceOp 的工厂方法,创建一个全新的、局部的 Sink 实例。这是并行安全的核心!每个叶子任务都有自己独立的、不被其他线程干扰的结果容器(比如一个独立的 ArrayList)。
      2. helper.wrapAndCopyInto(...): 将这个叶子任务所负责的一小部分数据,通过流水线灌入这个全新的 Sink 中。
      3. return ...: 返回这个已经包含了部分结果的 Sink
  5. public void onCompletion(CountedCompleter<?> caller):

    • 这是并行执行的“合并”逻辑CountedCompleter 是 ForkJoinTask 的一个子类,它支持在任务完成时触发一个回调,就是这个 onCompletion 方法。
    • 当一个任务的两个子任务(leftChild 和 rightChild)都完成了,它的 onCompletion 方法就会被调用。
    • 逻辑如下:
      1. if (!isLeaf()): 检查当前任务不是叶子任务(叶子任务没有子任务,不需要合并)。
      2. S leftResult = leftChild.getLocalResult();: 获取左子任务执行 doLeaf() 或它自己的 onCompletion 后得到的结果 Sink
      3. leftResult.combine(rightChild.getLocalResult());调用 Sink 的 combine 方法,将右子任务的结果 Sink 合并到左子任务的 Sink 中。例如,对于 collect(Collectors.toList()),这里执行的就是 list1.addAll(list2)
      4. setLocalResult(leftResult);: 将合并后的 Sink (leftResult) 设置为当前任务的本地结果。这个结果又可以被它的父任务在 onCompletion 中使用,如此层层向上合并。
    • super.onCompletion(caller);: 调用父类的 onCompletion,主要用于清理引用(比如 spliteratorleftChildrightChild),帮助垃圾回收。

collect 并行安全性

现在,我们可以将所有分析串联起来,分析 collect 并行安全性的问题:

  1. 起点stream.parallel().collect(collector) 调用 ReduceOps.makeRef(collector),创建了一个 ReduceOp。这个 ReduceOp 的 makeSink() 方法被实现为 { return new ReducingSink(); },而 ReducingSink 的 begin() 方法会调用 collector.supplier().get()

  2. 并行执行( evaluate( terminalOp)  ReduceOp 的 evaluateParallel 方法创建并启动了一个 ReduceTask

  3. 任务分裂 (Fork)ForkJoinPool 将 ReduceTask 不断分裂成更小的子任务,直到任务不可再分,成为“叶子任务”。

  4. 叶子任务处理 (Leaf Execution): 每个叶子任务在自己独立的线程中执行 doLeaf() 方法。在 doLeaf() 中,它通过 op.makeSink() -> new ReducingSink() -> supplier.get() 创建了一个全新的、线程私有的结果容器(如一个新的 ArrayList)。然后它把自己负责的一小部分数据累加到这个私有容器中。因为容器是私有的,所以完全没有线程安全问题。

  5. 结果合并 (Join/Combine): 当子任务完成,它们的父任务在 onCompletion 中被唤醒。父任务获取两个子任务的结果容器(两个独立的 ArrayList),并调用 leftResult.combine(rightResult),这实际上是调用了 collector.combiner()(例如 list1.addAll(list2))。这个合并操作将两个部分结果合并成一个。

  6. 层层合并: 这个合并过程从最底层的叶子任务的父任务开始,一路向上,直到根任务将所有部分结果都合 并成一个最终的结果容器。

这个 “分裂 -> 独立容器累加 -> 两两合并” 的模式,就是 MapReduce 思想的一种体现,也是 Fork/Join 框架的经典应用。它通过避免在并行阶段共享可变状态,从根本上保证了 collect 操作的线程安全和高效率。

http://www.dtcms.com/a/327208.html

相关文章:

  • 负载均衡详解
  • 小程序排名优化:用户行为数据背后的提升密码
  • PostgreSQL 范围、空间唯一性约束
  • 「ECG信号处理——(23)基于ECG和PPG信号的血压预测」2025年8月12日
  • SQL 生成日期与产品的所有组合:CROSS JOIN(笛卡尔积)
  • Linux 系统运维、网络、SQL Server常用命令
  • 机器学习 [白板推导](九)[变分推断]
  • DRAM、SRAM、NAND Flash、NOR Flash、EEPROM、MRAM存储器你分得清吗?
  • 用pom文件从nexus3拉依赖,无法拉取的一个问题
  • 逻辑删除 vs 物理删除:MyBatis-Plus 实现指南与实践
  • 可泛化逻辑推理Python编程作为医疗AI发展方向研究
  • 关于数据库的restful api接口工具SqlRest的使用
  • 如何在 Ubuntu 24.04 LTS Linux 中安装 JSON Server
  • 2025年国赛新规解读:8-12最新发布文件
  • 初识数据结构——优先级队列(堆!堆!堆!)
  • 偶遇冰狐智能辅助的录音
  • Python初学者笔记第二十四期 -- (面向对象编程)
  • 教程 | 用Parasoft SOAtest实现高效CI回归测试
  • 从零到一的 Python CI/CD 实战指南:用 GitHub Actions 与 Jenkins 打造稳定、可持续交付的工程力
  • 下一代防火墙技术
  • 【ad-hoc 最小生成树 构造】P8957 「CGOI-3」巫泡弹弹乐|普及+
  • 【Redis在智能健身镜中的作用:运动指导与用户数据同步】
  • 计算机网络摘星题库800题笔记 第6章 应用层
  • 使用正则中的sub实现获取我们匹配的字符串,然后追加指定字符
  • 计算机网络---防火墙(Firewall)
  • pyside控件_左右范围滑动控件
  • 深层神经网络
  • torch.max() 函数使用
  • uv 配置和简单使用
  • 6深度学习Pytorch-神经网络--过拟合欠拟合问题解决(Dropout、正则化、早停法、数据增强)、批量标准化