当前位置: 首页 > news >正文

Java Stream ForEach算子实现:ForEachOps

ForEachOps 的职责是为 Stream 流水线创建终端操作 (Terminal Operation),特别是 forEach 和 forEachOrdered 这两个操作。当在代码中调用 stream.forEach(...) 时,正是 ForEachOps 在幕后构建了相应的执行逻辑。

这个类是 final 且构造器私有,表明它不能被实例化或继承,只能通过其静态工厂方法来使用。

静态工厂方法 (Entry Points)

ForEachOps 提供了一系列 make 方法,作为创建 forEach 终端操作的入口。这些方法接收用户提供的操作(一个 Consumer)和 一个 boolean 标记 ordered,然后返回一个 TerminalOp 实例。

// ... existing code .../*** Constructs a {@code TerminalOp} that perform an action for every element* of a stream.** @param action the {@code Consumer} that receives all elements of a*        stream* @param ordered whether an ordered traversal is requested* @param <T> the type of the stream elements* @return the {@code TerminalOp} instance*/public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,boolean ordered) {Objects.requireNonNull(action);return new ForEachOp.OfRef<>(action, ordered);}/*** Constructs a {@code TerminalOp} that perform an action for every element* of an {@code IntStream}.** @param action the {@code IntConsumer} that receives all elements of a*        stream* @param ordered whether an ordered traversal is requested* @return the {@code TerminalOp} instance*/public static TerminalOp<Integer, Void> makeInt(IntConsumer action,boolean ordered) {Objects.requireNonNull(action);return new ForEachOp.OfInt(action, ordered);}// ... makeLong 和 makeDouble 的类似实现 ...
// ... existing code ...
  • makeRefmakeIntmakeLongmakeDouble: 分别对应 Stream<T>IntStreamLongStreamDoubleStream
  • action: 这是用户传入的 lambda 表达式或方法引用,定义了要对每个元素执行的操作。
  • ordered: 这个布尔值至关重要。
    • 当调用 stream.forEach(...) 时,ordered 为 false
    • 当调用 stream.forEachOrdered(...) 时,ordered 为 true。 这个标志将决定在并行流中如何执行操作。

这些 make 方法内部会 new 一个对应的 ForEachOp 内部类实例,例如 ForEachOp.OfRef

核心抽象:ForEachOp<T>

ForEachOp 是一个抽象的静态内部类,它是所有 forEach 操作的基类。它有一个非常巧妙的设计:它同时实现了 TerminalOp 和 TerminalSink 接口

// ... existing code ...abstract static class ForEachOp<T>implements TerminalOp<T, Void>, TerminalSink<T, Void> {private final boolean ordered;protected ForEachOp(boolean ordered) {this.ordered = ordered;}// ... TerminalOp 接口实现 ...@Overridepublic <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {return helper.wrapAndCopyInto(this, spliterator).get();}@Overridepublic <S> Void evaluateParallel(PipelineHelper<T> helper,Spliterator<S> spliterator) {if (ordered)new ForEachOrderedTask<>(helper, spliterator, this).invoke();elsenew ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();return null;}// ... TerminalSink 接口实现 ...@Overridepublic Void get() {return null;}
// ... existing code ...
  • 双重身份TerminalOp 定义了“如何启动”一个终端操作,而 TerminalSink 定义了“如何消费”元素。ForEachOp 将两者合二为一,意味着操作本身就是最终的消费者。
  • evaluateSequential: 对于串行流,逻辑非常简单。它调用 helper.wrapAndCopyInto(this, spliterator)。这会组装好 Sink 链,然后把源 Spliterator 的所有元素一个个喂给 Sink 链的头部。由于 ForEachOp 自己就是最终的 Sink,元素最终会流到它自己的 accept 方法中。
  • evaluateParallel: 对于并行流,逻辑出现分化:
    • 如果 ordered 为 true (forEachOrdered),它会创建一个 ForEachOrderedTask 并执行。
    • 如果 ordered 为 false (forEach),它会创建一个 ForEachTask 并执行。 这两个 Task 都是 ForkJoinTask 的子类,负责并行处理。

具体实现:OfRefOfInt 等

这些是 ForEachOp 的具体子类,它们的实现非常直接:

// ... existing code .../** Implementation class for reference streams */static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}}
// ... existing code ...

它们的主要作用就是持有一个用户传入的 Consumer,并实现 accept 方法。当元素流经整个流水线最终到达这个 Sink 时,accept 方法被调用,进而执行用户定义的操作 consumer.accept(t)

并行执行核心:ForEachTask 与 ForEachOrderedTask

这是 ForEachOps 中最复杂也最精华的部分,它揭示了 forEach 和 forEachOrdered 在并行模式下的巨大差异。两者都基于 ForkJoinPool 和 CountedCompleter(一种可编排完成顺序的 ForkJoinTask)。

ForEachTask (对应 forEach)

// ... existing code ...static final class ForEachTask<S, T> extends CountedCompleter<Void> {// ...public void compute() {Spliterator<S> rightSplit = spliterator, leftSplit;// ...while (!isShortCircuit || !taskSink.cancellationRequested()) {if (sizeEstimate <= sizeThreshold ||(leftSplit = rightSplit.trySplit()) == null) {// 当任务足够小,不再分裂,直接处理task.helper.copyInto(taskSink, rightSplit);break;}// ...// 创建左子任务并 forkForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);task.addToPendingCount(1);// ...taskToFork.fork();// ...}// ...}}
// ... existing code ...
  • 目标:最大化并行度,不保证顺序。
  • 策略:这是一个经典的 Fork/Join 分治模型。任务不断地将 Spliterator 对半分割 (trySplit),然后将其中一半 fork 出去成为一个新任务,自己继续处理另一半。
  • 关键点:所有被 fork 出去的子任务,以及父任务,都共享同一个 sink 实例(即 ForEachOp 实例)。这意味着多个线程会并发地调用 sink.accept(t)。因此,对于 stream.parallel().forEach(...),用户提供的操作必须是线程安全的。

System.out.println 通过 synchronize 保证了并发安全

ForEachOrderedTask (用于 forEachOrdered)

ForEachOrderedTask 是 Stream 并行处理中一个极为精巧和复杂的组件。它的核心使命是:在并行执行 forEachOrdered 操作时,既要利用多核 CPU 实现并行计算,又要严格保证最终消费元素的顺序与原始流的相遇顺序 (encounter order) 一致。

这是一个巨大的挑战,因为并行本身就是无序的,而它必须在无序中重建有序。下面我们分步解析它的实现原理。

定位与角色

  • extends CountedCompleter<Void>ForEachOrderedTask 是 ForkJoinTask 的一个特殊子类 CountedCompleterCountedCompleter 是一种带有完成计数的任务,非常适合用于协调具有依赖关系的一组任务。当一个任务的所有子任务都完成时,它会自动触发 onCompletion 回调,这正是 ForEachOrderedTask 实现有序的关键机制。
  • 服务于 forEachOrdered: 它是专门为 stream.forEachOrdered(action) 的并行执行而设计的。当你在一个并行流上调用 forEachOrdered 时,底层就会创建一个 ForEachOrderedTask 来驱动整个计算过程。

ForEachOrderedTask 的工作可以分为两个阶段:

  1. 分治 (Divide and Conquer): 和普通的 ForEachTask 一样,它会递归地将数据源 (Spliterator) 分裂成更小的块,为每个块创建一个子任务,并提交到 ForkJoinPool 中并行执行。
  2. 有序重组 (Ordered Reassembly): 这是它与 ForEachTask 根本不同的地方。它不能在处理完自己的数据块后就立即消费(调用 action),因为这会打乱顺序。相反,它必须确保在它前面的所有任务都处理并消费完它们的数据后,才能轮到自己。

在 forEachOrdered 中,并没有将多个 Node 合并成一个大 Node 的步骤。

如果还有等待任务,会把数据缓存在Node中

所谓的“合并”,其实是一个时间维度上的串行化过程:每个任务在轮到自己时,将自己持有的数据片段“拼接”到执行流中,然后把“接力棒”传给下一个任务。这是一种行为上的有序重组,而非数据结构上的合并。

ForEachOrderedTask 通过几个关键的成员变量和 CountedCompleter 的特性来构建一个“有序执行链”。

// ... existing code ...static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {// ...private final PipelineHelper<T> helper;private Spliterator<S> spliterator; // 当前任务负责的数据分片private final Sink<T> action; // 最终要执行的消费动作private final ForEachOrderedTask<S, T> leftPredecessor; // 指向其左侧的“前驱”任务private Node<T> node; // 用于暂存当前任务的处理结果private ForEachOrderedTask<S, T> next; // 指向其在有序链中的“后继”任务// ...}
// ... existing code ...
  • spliterator: 每个任务负责处理的一小块数据。
  • action: 所有任务共享的、用户提供的最终 Consumer
  • node结果缓冲区。当一个任务处理完它的数据分片后,它会把结果(经过流水线中间操作处理后的元素)收集到一个 Node 对象中暂存起来,而不是立即消费。
  • leftPredecessor 和 next: 这两个引用是构建有序链的关键,我们通过一个例子来理解。

构建“Happens-Before”依赖链

ForEachOrderedTask 的 Javadoc 中给出了一个经典的树形结构图,这对于理解其工作原理至关重要。

      a/ \b   c/ \ / \d  e f  g

元素的处理顺序应该是 d -> e -> f -> gForEachOrderedTask 必须保证这个顺序。

它是如何做到的?通过 CountedCompleter 的完成计数和 next 指针构建依赖关系:

  1. 任务分裂: 当任务 b 分裂成 d 和 e 时,它会设置 d.next = e。这意味着 d 是 e 的直接前驱。
  2. 依赖建立:
    • 任务 e 的 pendingCount 会加 1,表示它依赖于某个任务的完成。
    • d 会通过 NEXT.compareAndSet 尝试将 b 的前驱任务(leftPredecessor)的 next 指针指向自己。
    • 通过这一系列原子操作,最终会形成一条逻辑上的链:d -> eb -> ff -> g
  3. 执行与等待:
    • 一个任务(比如 e)在 compute 方法的最后,会调用 tryComplete()
    • 由于它的 pendingCount 大于 0(因为它依赖 d),tryComplete() 不会立即触发 onCompletion。它会进入等待状态。
  4. 触发完成 (onCompletion):
    • 当任务 d 完成时,它的 onCompletion 方法会被调用。
    • 在 onCompletion 中,它会处理自己的 node(即消费暂存的元素),然后获取它的后继任务 next(也就是 e),并调用 e.tryComplete()
    • 这次调用会使 e 的 pendingCount 减 1,变为 0。现在 e 满足了完成条件,于是 e 的 onCompletion 被触发。
    • 这个过程像多米诺骨牌一样,沿着 d -> e -> f -> g 的顺序依次触发,从而保证了元素的有序消费。

compute 和 onCompletion 的协作

  • compute() 方法:

    • 职责: 主要是分治处理数据
    • 它递归地分裂任务,直到任务块足够小。
    • 对于无法再分裂的叶子任务,它会执行流水线操作 (helper.wrapAndCopyInto),并将结果暂存到 node 成员变量中
    • 最后调用 tryComplete(),尝试将自己标记为完成。
  • onCompletion() 方法:

    • 职责消费数据触发后继任务
    • 当一个任务被其前驱任务“唤醒”并最终完成时,onCompletion 被调用。
    • 它首先检查自己的 node 是否有数据,如果有,就调用 node.forEach(action)此时才真正消费元素
    • 然后,它会找到自己的后继任务 next,并调用其 tryComplete() 方法,将执行的接力棒传下去。

compute() 方法分析

compute 方法是 ForkJoinTask 的执行入口。这里它简单地调用了静态的 doCompute 方法,这是一种将实例方法逻辑委托给静态辅助方法的常见模式,有时可以简化逻辑或避免某些 this 相关的混淆。

doCompute 的核心职责是任务分解(分治)建立依赖关系

1. 循环分解任务

// ...
while (rightSplit.estimateSize() > sizeThreshold &&(leftSplit = rightSplit.trySplit()) != null) {// ... 循环体 ...
}
// ...

这是经典的分治模式。只要当前任务的数据块(rightSplit)还足够大(大于 sizeThreshold)并且可以被成功分割(trySplit()),循环就会继续。在每次循环中,一个父任务会被分解成左右两个子任务。

2. 创建子任务并建立直接依赖

// ...
ForEachOrderedTask<S, T> leftChild =new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
ForEachOrderedTask<S, T> rightChild =new ForEachOrderedTask<>(task, rightSplit, leftChild);leftChild.next = rightChild;task.addToPendingCount(1);
rightChild.addToPendingCount(1);
// ...
  • 创建子任务: 创建了 leftChild 和 rightChild。注意 rightChild 的构造函数传入了 leftChild 作为它的“左侧前驱”(leftPredecessor)。
  • leftChild.next = rightChild: 建立了左右兄弟之间的直接顺序关系。leftChild 完成后需要通知 rightChild
  • task.addToPendingCount(1): 父任务的完成依赖于其子任务。这里将父任务的待完成计数加 1。当子任务完成时,会递减这个计数。
  • rightChild.addToPendingCount(1): 右子任务的完成依赖于左子任务。这是保证 d -> e 这种顺序的关键。

3. 建立跨分支的依赖关系

这是整个算法最复杂的部分,用于处理像 b -> f (左孩子要完成需要晚于左边)这样的跨分支依赖。

// ...
if (task.leftPredecessor != null) {leftChild.addToPendingCount(1);if (NEXT.compareAndSet(task.leftPredecessor, task, leftChild)) {task.addToPendingCount(-1);} else {leftChild.addToPendingCount(-1);}
}
// ...
  • if (task.leftPredecessor != null): 这段逻辑只对那些“非最左侧”的任务生效。在 a-g 的树中,任务 c 和它的子任务 fg 就属于这种情况。
  • leftChild.addToPendingCount(1): 预先增加左子节点的待完成计数,因为它需要等待其“叔叔”节点(task.leftPredecessor)完成。
  • NEXT.compareAndSet(...): 这是一个原子操作,尝试将“叔叔”节点(task.leftPredecessor)的 next 指针从指向父任务 task 更新为指向 leftChild
    • 成功: 如果更新成功,意味着“叔叔”节点还没完成,依赖关系成功建立。此时父任务 task 不再需要等待“叔叔”节点,所以将自己的待完成计数减 1。
    • 失败: 如果失败,意味着“叔叔”节点已经完成了,并且已经触发了 task 的完成流程。在这种情况下,leftChild 不再需要等待,所以把它刚刚加上的待完成计数减回去。

4. Fork 子任务

// ...
if (forkRight) {// ...taskToFork = rightChild;
}
else {// ...taskToFork = leftChild;
}
taskToFork.fork();
// ...

这是一个小的优化,交替地 fork 左右子任务,有助于在 ForkJoinPool 中保持更好的负载均衡。一个任务被 fork 出去异步执行,当前线程则继续循环处理剩下的那个任务。

5. 处理叶子节点并尝试完成

当循环结束时,意味着当前 task 已经是一个无法再分解的叶子节点。

// ...
if (task.getPendingCount() > 0) {// ...task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();task.spliterator = null;
}
task.tryComplete();
  • if (task.getPendingCount() > 0): 如果待完成计数大于 0,说明它依赖于其他前驱任务,不能立即消费数据。
  • task.node = ...: 此时,它会执行流水线操作,并将结果缓冲到 task.node 中。
  • task.tryComplete(): 尝试完成自己。由于 pendingCount > 0,这个调用不会立即触发 onCompletion,任务会进入等待状态。如果 pendingCount 为 0(说明它是一个没有前驱依赖的叶子节点),tryComplete 会直接触发 onCompletion

onCompletion() 方法分析

当一个任务的所有依赖(子任务和前驱任务)都完成后,onCompletion 方法会被 ForkJoinPool 自动回调。

@Override
public void onCompletion(CountedCompleter<?> caller) {if (node != null) {node.forEach(action);node = null;}else if (spliterator != null) {helper.wrapAndCopyInto(action, spliterator);spliterator = null;}var leftDescendant = (ForEachOrderedTask<S, T>)NEXT.getAndSet(this, null);if (leftDescendant != null)leftDescendant.tryComplete();
}

1. 消费数据

// ...
if (node != null) {node.forEach(action);node = null;
}
else if (spliterator != null) {helper.wrapAndCopyInto(action, spliterator);spliterator = null;
}
// ...

这是真正执行用户 action 的地方

  • 如果 node 不为 null,说明数据之前被缓冲了,现在就从 node 中取出并消费。
  • 如果 node 为 null 但 spliterator 不为 null,这对应于那些没有前驱依赖的叶子节点,它们在 compute 阶段没有缓冲数据,所以在这里直接处理 spliterator

2. 触发后继任务

// ...
var leftDescendant = (ForEachOrderedTask<S, T>)NEXT.getAndSet(this, null);
if (leftDescendant != null)leftDescendant.tryComplete();

这是多米诺骨牌效应的关键一步。

  • NEXT.getAndSet(this, null): 原子地获取当前任务的后继任务(next 指针指向的那个),并将其设置为 null
  • if (leftDescendant != null): 如果存在后继任务。
  • leftDescendant.tryComplete(): 触发后继任务的完成流程。这个调用会使后继任务的 pendingCount 减 1。如果减到 0,就会轮到后继任务执行它的 onCompletion 方法,从而将有序执行链延续下去。

总结

ForEachOps 是 forEach 和 forEachOrdered 终端操作的实现细节所在。

  • 它通过一个 ordered 标志,优雅地在同一个框架下支持了两种不同的 forEach 语义。
  • 它展示了 Stream API 在并行处理上的两种典型模式:
    • 无序并行 (forEach):简单、高效,最大化吞吐量,但要求操作本身线程安全且不关心顺序。
    • 有序并行 (forEachOrdered):保证了与串行执行相同的结果顺序,但通过引入缓冲和复杂的任务协调机制,牺牲了部分性能和增加了内存开销。

http://www.dtcms.com/a/335520.html

相关文章:

  • 半敏捷卫星观测调度系统的设计与实现
  • Git登录配置的详细方法
  • CSS中linear-gradient 的用法
  • Python字符串净化完全指南:专业级字符清理技术与实战
  • 开发者说 | EmbodiedGen:为具身智能打造可交互3D世界生成引擎
  • 区块链练手项目(持续更新)
  • Linux入门指南:基础开发工具---vim
  • 飞算AI 3.2.0实战评测:10分钟搭建企业级RBAC权限系统
  • ZKmall开源商城的移动商城搭建:Uni-app+Vue3 实现多端购物体验
  • PostgreSQL——用户管理
  • 轻松配置NAT模式让虚拟机上网
  • Go语言企业级权限管理系统设计与实现
  • Bootstrap4 轮播详解
  • Apollo 凭什么能 “干掉” 本地配置?
  • 使用Ansys Fluent进行倒装芯片封装Theta-JA热阻表征
  • Spring Cloud整合Eureka、ZooKeeper、原理分析
  • 牛客周赛 Round 104(小红的矩阵不动点/小红的不动点权值)
  • 【Netty核心解密】Channel与ChannelHandlerContext:网络编程的双子星
  • 适用监测农作物长势和病虫害的高光谱/多光谱相机有哪些?
  • Mac(四)自定义按键工具 Hammerspoon 的安装和使用
  • Baumer高防护相机如何通过YoloV8深度学习模型实现行人跌倒的检测识别(C#代码UI界面版)
  • Redis入门1
  • 【Leetcode hot 100】189.轮转数组
  • 【C2000】C2000例程使用介绍
  • 24. 什么是不可变对象,好处是什么
  • Flink作业执行的第一步:DataFlow graph的构建
  • 一周学会Matplotlib3 Python 数据可视化-多子图及布局实现
  • 【传奇开心果系列】Flet框架实现的家庭记账本示例自定义模板
  • Python可视化工具-Bokeh:动态显示数据
  • 【Golang】:错误处理