Java Stream ForEach算子实现:ForEachOps
ForEachOps
的职责是为 Stream 流水线创建终端操作 (Terminal Operation),特别是 forEach
和 forEachOrdered
这两个操作。当在代码中调用 stream.forEach(...)
时,正是 ForEachOps
在幕后构建了相应的执行逻辑。
这个类是 final
且构造器私有,表明它不能被实例化或继承,只能通过其静态工厂方法来使用。
静态工厂方法 (Entry Points)
ForEachOps
提供了一系列 make
方法,作为创建 forEach
终端操作的入口。这些方法接收用户提供的操作(一个 Consumer
)和 一个 boolean
标记 ordered
,然后返回一个 TerminalOp
实例。
// ... existing code .../*** Constructs a {@code TerminalOp} that perform an action for every element* of a stream.** @param action the {@code Consumer} that receives all elements of a* stream* @param ordered whether an ordered traversal is requested* @param <T> the type of the stream elements* @return the {@code TerminalOp} instance*/public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,boolean ordered) {Objects.requireNonNull(action);return new ForEachOp.OfRef<>(action, ordered);}/*** Constructs a {@code TerminalOp} that perform an action for every element* of an {@code IntStream}.** @param action the {@code IntConsumer} that receives all elements of a* stream* @param ordered whether an ordered traversal is requested* @return the {@code TerminalOp} instance*/public static TerminalOp<Integer, Void> makeInt(IntConsumer action,boolean ordered) {Objects.requireNonNull(action);return new ForEachOp.OfInt(action, ordered);}// ... makeLong 和 makeDouble 的类似实现 ...
// ... existing code ...
makeRef
,makeInt
,makeLong
,makeDouble
: 分别对应Stream<T>
,IntStream
,LongStream
,DoubleStream
。action
: 这是用户传入的 lambda 表达式或方法引用,定义了要对每个元素执行的操作。ordered
: 这个布尔值至关重要。- 当调用
stream.forEach(...)
时,ordered
为false
。 - 当调用
stream.forEachOrdered(...)
时,ordered
为true
。 这个标志将决定在并行流中如何执行操作。
- 当调用
这些 make
方法内部会 new
一个对应的 ForEachOp
内部类实例,例如 ForEachOp.OfRef
。
核心抽象:ForEachOp<T>
ForEachOp
是一个抽象的静态内部类,它是所有 forEach
操作的基类。它有一个非常巧妙的设计:它同时实现了 TerminalOp
和 TerminalSink
接口。
// ... existing code ...abstract static class ForEachOp<T>implements TerminalOp<T, Void>, TerminalSink<T, Void> {private final boolean ordered;protected ForEachOp(boolean ordered) {this.ordered = ordered;}// ... TerminalOp 接口实现 ...@Overridepublic <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {return helper.wrapAndCopyInto(this, spliterator).get();}@Overridepublic <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {if (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();elsenew ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;}// ... TerminalSink 接口实现 ...@Overridepublic Void get() {return null;}
// ... existing code ...
- 双重身份:
TerminalOp
定义了“如何启动”一个终端操作,而TerminalSink
定义了“如何消费”元素。ForEachOp
将两者合二为一,意味着操作本身就是最终的消费者。 evaluateSequential
: 对于串行流,逻辑非常简单。它调用helper.wrapAndCopyInto(this, spliterator)
。这会组装好 Sink 链,然后把源Spliterator
的所有元素一个个喂给 Sink 链的头部。由于ForEachOp
自己就是最终的 Sink,元素最终会流到它自己的accept
方法中。evaluateParallel
: 对于并行流,逻辑出现分化:- 如果
ordered
为true
(forEachOrdered
),它会创建一个ForEachOrderedTask
并执行。 - 如果
ordered
为false
(forEach
),它会创建一个ForEachTask
并执行。 这两个 Task 都是ForkJoinTask
的子类,负责并行处理。
- 如果
具体实现:OfRef
, OfInt
等
这些是 ForEachOp
的具体子类,它们的实现非常直接:
// ... existing code .../** Implementation class for reference streams */static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}}
// ... existing code ...
它们的主要作用就是持有一个用户传入的 Consumer
,并实现 accept
方法。当元素流经整个流水线最终到达这个 Sink 时,accept
方法被调用,进而执行用户定义的操作 consumer.accept(t)
。
并行执行核心:ForEachTask
与 ForEachOrderedTask
这是 ForEachOps
中最复杂也最精华的部分,它揭示了 forEach
和 forEachOrdered
在并行模式下的巨大差异。两者都基于 ForkJoinPool
和 CountedCompleter
(一种可编排完成顺序的 ForkJoinTask
)。
ForEachTask
(对应 forEach
)
// ... existing code ...static final class ForEachTask<S, T> extends CountedCompleter<Void> {// ...public void compute() {Spliterator<S> rightSplit = spliterator, leftSplit;// ...while (!isShortCircuit || !taskSink.cancellationRequested()) {if (sizeEstimate <= sizeThreshold ||(leftSplit = rightSplit.trySplit()) == null) {// 当任务足够小,不再分裂,直接处理task.helper.copyInto(taskSink, rightSplit);break;}// ...// 创建左子任务并 forkForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);// ...taskToFork.fork();// ...}// ...}}
// ... existing code ...
- 目标:最大化并行度,不保证顺序。
- 策略:这是一个经典的 Fork/Join 分治模型。任务不断地将
Spliterator
对半分割 (trySplit
),然后将其中一半fork
出去成为一个新任务,自己继续处理另一半。 - 关键点:所有被
fork
出去的子任务,以及父任务,都共享同一个sink
实例(即ForEachOp
实例)。这意味着多个线程会并发地调用sink.accept(t)
。因此,对于stream.parallel().forEach(...)
,用户提供的操作必须是线程安全的。
System.out.println 通过 synchronize 保证了并发安全
ForEachOrderedTask
(用于 forEachOrdered
)
ForEachOrderedTask
是 Stream
并行处理中一个极为精巧和复杂的组件。它的核心使命是:在并行执行 forEachOrdered
操作时,既要利用多核 CPU 实现并行计算,又要严格保证最终消费元素的顺序与原始流的相遇顺序 (encounter order) 一致。
这是一个巨大的挑战,因为并行本身就是无序的,而它必须在无序中重建有序。下面我们分步解析它的实现原理。
定位与角色
extends CountedCompleter<Void>
:ForEachOrderedTask
是ForkJoinTask
的一个特殊子类CountedCompleter
。CountedCompleter
是一种带有完成计数的任务,非常适合用于协调具有依赖关系的一组任务。当一个任务的所有子任务都完成时,它会自动触发onCompletion
回调,这正是ForEachOrderedTask
实现有序的关键机制。- 服务于
forEachOrdered
: 它是专门为stream.forEachOrdered(action)
的并行执行而设计的。当你在一个并行流上调用forEachOrdered
时,底层就会创建一个ForEachOrderedTask
来驱动整个计算过程。
ForEachOrderedTask
的工作可以分为两个阶段:
- 分治 (Divide and Conquer): 和普通的
ForEachTask
一样,它会递归地将数据源 (Spliterator
) 分裂成更小的块,为每个块创建一个子任务,并提交到ForkJoinPool
中并行执行。 - 有序重组 (Ordered Reassembly): 这是它与
ForEachTask
根本不同的地方。它不能在处理完自己的数据块后就立即消费(调用action
),因为这会打乱顺序。相反,它必须确保在它前面的所有任务都处理并消费完它们的数据后,才能轮到自己。
在 forEachOrdered 中,并没有将多个 Node 合并成一个大 Node 的步骤。
如果还有等待任务,会把数据缓存在Node中
所谓的“合并”,其实是一个时间维度上的串行化过程:每个任务在轮到自己时,将自己持有的数据片段“拼接”到执行流中,然后把“接力棒”传给下一个任务。这是一种行为上的有序重组,而非数据结构上的合并。
ForEachOrderedTask
通过几个关键的成员变量和 CountedCompleter
的特性来构建一个“有序执行链”。
// ... existing code ...static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {// ...private final PipelineHelper<T> helper;private Spliterator<S> spliterator; // 当前任务负责的数据分片private final Sink<T> action; // 最终要执行的消费动作private final ForEachOrderedTask<S, T> leftPredecessor; // 指向其左侧的“前驱”任务private Node<T> node; // 用于暂存当前任务的处理结果private ForEachOrderedTask<S, T> next; // 指向其在有序链中的“后继”任务// ...}
// ... existing code ...
spliterator
: 每个任务负责处理的一小块数据。action
: 所有任务共享的、用户提供的最终Consumer
。node
: 结果缓冲区。当一个任务处理完它的数据分片后,它会把结果(经过流水线中间操作处理后的元素)收集到一个Node
对象中暂存起来,而不是立即消费。leftPredecessor
和next
: 这两个引用是构建有序链的关键,我们通过一个例子来理解。
构建“Happens-Before”依赖链
ForEachOrderedTask
的 Javadoc 中给出了一个经典的树形结构图,这对于理解其工作原理至关重要。
a/ \b c/ \ / \d e f g
元素的处理顺序应该是 d -> e -> f -> g
。ForEachOrderedTask
必须保证这个顺序。
它是如何做到的?通过 CountedCompleter
的完成计数和 next
指针构建依赖关系:
- 任务分裂: 当任务
b
分裂成d
和e
时,它会设置d.next = e
。这意味着d
是e
的直接前驱。 - 依赖建立:
- 任务
e
的pendingCount
会加 1,表示它依赖于某个任务的完成。 d
会通过NEXT.compareAndSet
尝试将b
的前驱任务(leftPredecessor
)的next
指针指向自己。- 通过这一系列原子操作,最终会形成一条逻辑上的链:
d -> e
,b -> f
,f -> g
。
- 任务
- 执行与等待:
- 一个任务(比如
e
)在compute
方法的最后,会调用tryComplete()
。 - 由于它的
pendingCount
大于 0(因为它依赖d
),tryComplete()
不会立即触发onCompletion
。它会进入等待状态。
- 一个任务(比如
- 触发完成 (
onCompletion
):- 当任务
d
完成时,它的onCompletion
方法会被调用。 - 在
onCompletion
中,它会处理自己的node
(即消费暂存的元素),然后获取它的后继任务next
(也就是e
),并调用e.tryComplete()
。 - 这次调用会使
e
的pendingCount
减 1,变为 0。现在e
满足了完成条件,于是e
的onCompletion
被触发。 - 这个过程像多米诺骨牌一样,沿着
d -> e -> f -> g
的顺序依次触发,从而保证了元素的有序消费。
- 当任务
compute
和 onCompletion
的协作
compute()
方法:- 职责: 主要是分治和处理数据。
- 它递归地分裂任务,直到任务块足够小。
- 对于无法再分裂的叶子任务,它会执行流水线操作 (
helper.wrapAndCopyInto
),并将结果暂存到node
成员变量中。 - 最后调用
tryComplete()
,尝试将自己标记为完成。
onCompletion()
方法:- 职责: 消费数据和触发后继任务。
- 当一个任务被其前驱任务“唤醒”并最终完成时,
onCompletion
被调用。 - 它首先检查自己的
node
是否有数据,如果有,就调用node.forEach(action)
,此时才真正消费元素。 - 然后,它会找到自己的后继任务
next
,并调用其tryComplete()
方法,将执行的接力棒传下去。
compute()
方法分析
compute
方法是 ForkJoinTask
的执行入口。这里它简单地调用了静态的 doCompute
方法,这是一种将实例方法逻辑委托给静态辅助方法的常见模式,有时可以简化逻辑或避免某些 this
相关的混淆。
doCompute
的核心职责是任务分解(分治)和建立依赖关系。
1. 循环分解任务
// ...
while (rightSplit.estimateSize() > sizeThreshold &&(leftSplit = rightSplit.trySplit()) != null) {// ... 循环体 ...
}
// ...
这是经典的分治模式。只要当前任务的数据块(rightSplit
)还足够大(大于 sizeThreshold
)并且可以被成功分割(trySplit()
),循环就会继续。在每次循环中,一个父任务会被分解成左右两个子任务。
2. 创建子任务并建立直接依赖
// ...
ForEachOrderedTask<S, T> leftChild =new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
ForEachOrderedTask<S, T> rightChild =new ForEachOrderedTask<>(task, rightSplit, leftChild);leftChild.next = rightChild;task.addToPendingCount(1);
rightChild.addToPendingCount(1);
// ...
- 创建子任务: 创建了
leftChild
和rightChild
。注意rightChild
的构造函数传入了leftChild
作为它的“左侧前驱”(leftPredecessor
)。 leftChild.next = rightChild
: 建立了左右兄弟之间的直接顺序关系。leftChild
完成后需要通知rightChild
。task.addToPendingCount(1)
: 父任务的完成依赖于其子任务。这里将父任务的待完成计数加 1。当子任务完成时,会递减这个计数。rightChild.addToPendingCount(1)
: 右子任务的完成依赖于左子任务。这是保证d -> e
这种顺序的关键。
3. 建立跨分支的依赖关系
这是整个算法最复杂的部分,用于处理像 b -> f
(左孩子要完成需要晚于左边)这样的跨分支依赖。
// ...
if (task.leftPredecessor != null) {leftChild.addToPendingCount(1);if (NEXT.compareAndSet(task.leftPredecessor, task, leftChild)) {task.addToPendingCount(-1);} else {leftChild.addToPendingCount(-1);}
}
// ...
if (task.leftPredecessor != null)
: 这段逻辑只对那些“非最左侧”的任务生效。在a-g
的树中,任务c
和它的子任务f
,g
就属于这种情况。leftChild.addToPendingCount(1)
: 预先增加左子节点的待完成计数,因为它需要等待其“叔叔”节点(task.leftPredecessor
)完成。NEXT.compareAndSet(...)
: 这是一个原子操作,尝试将“叔叔”节点(task.leftPredecessor
)的next
指针从指向父任务task
更新为指向leftChild
。- 成功: 如果更新成功,意味着“叔叔”节点还没完成,依赖关系成功建立。此时父任务
task
不再需要等待“叔叔”节点,所以将自己的待完成计数减 1。 - 失败: 如果失败,意味着“叔叔”节点已经完成了,并且已经触发了
task
的完成流程。在这种情况下,leftChild
不再需要等待,所以把它刚刚加上的待完成计数减回去。
- 成功: 如果更新成功,意味着“叔叔”节点还没完成,依赖关系成功建立。此时父任务
4. Fork 子任务
// ...
if (forkRight) {// ...taskToFork = rightChild;
}
else {// ...taskToFork = leftChild;
}
taskToFork.fork();
// ...
这是一个小的优化,交替地 fork
左右子任务,有助于在 ForkJoinPool
中保持更好的负载均衡。一个任务被 fork
出去异步执行,当前线程则继续循环处理剩下的那个任务。
5. 处理叶子节点并尝试完成
当循环结束时,意味着当前 task
已经是一个无法再分解的叶子节点。
// ...
if (task.getPendingCount() > 0) {// ...task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();task.spliterator = null;
}
task.tryComplete();
if (task.getPendingCount() > 0)
: 如果待完成计数大于 0,说明它依赖于其他前驱任务,不能立即消费数据。task.node = ...
: 此时,它会执行流水线操作,并将结果缓冲到task.node
中。task.tryComplete()
: 尝试完成自己。由于pendingCount > 0
,这个调用不会立即触发onCompletion
,任务会进入等待状态。如果pendingCount
为 0(说明它是一个没有前驱依赖的叶子节点),tryComplete
会直接触发onCompletion
。
onCompletion()
方法分析
当一个任务的所有依赖(子任务和前驱任务)都完成后,onCompletion
方法会被 ForkJoinPool
自动回调。
@Override
public void onCompletion(CountedCompleter<?> caller) {if (node != null) {node.forEach(action);node = null;}else if (spliterator != null) {helper.wrapAndCopyInto(action, spliterator);spliterator = null;}var leftDescendant = (ForEachOrderedTask<S, T>)NEXT.getAndSet(this, null);if (leftDescendant != null)leftDescendant.tryComplete();
}
1. 消费数据
// ...
if (node != null) {node.forEach(action);node = null;
}
else if (spliterator != null) {helper.wrapAndCopyInto(action, spliterator);spliterator = null;
}
// ...
这是真正执行用户 action
的地方。
- 如果
node
不为null
,说明数据之前被缓冲了,现在就从node
中取出并消费。 - 如果
node
为null
但spliterator
不为null
,这对应于那些没有前驱依赖的叶子节点,它们在compute
阶段没有缓冲数据,所以在这里直接处理spliterator
。
2. 触发后继任务
// ...
var leftDescendant = (ForEachOrderedTask<S, T>)NEXT.getAndSet(this, null);
if (leftDescendant != null)leftDescendant.tryComplete();
这是多米诺骨牌效应的关键一步。
NEXT.getAndSet(this, null)
: 原子地获取当前任务的后继任务(next
指针指向的那个),并将其设置为null
。if (leftDescendant != null)
: 如果存在后继任务。leftDescendant.tryComplete()
: 触发后继任务的完成流程。这个调用会使后继任务的pendingCount
减 1。如果减到 0,就会轮到后继任务执行它的onCompletion
方法,从而将有序执行链延续下去。
总结
ForEachOps
是 forEach
和 forEachOrdered
终端操作的实现细节所在。
- 它通过一个
ordered
标志,优雅地在同一个框架下支持了两种不同的forEach
语义。 - 它展示了 Stream API 在并行处理上的两种典型模式:
- 无序并行 (
forEach
):简单、高效,最大化吞吐量,但要求操作本身线程安全且不关心顺序。 - 有序并行 (
forEachOrdered
):保证了与串行执行相同的结果顺序,但通过引入缓冲和复杂的任务协调机制,牺牲了部分性能和增加了内存开销。
- 无序并行 (