当前位置: 首页 > news >正文

Java stream 并发问题

在并行流中,可能各个线程处理的是同一个sink输出,导致并发问题。

forEach (无序并行):将同步责任交给用户

对于 stream.parallel().forEach(action)多个线程确实会并发地调用同一个 Sink 实例的 accept 方法。

我们来看 ForEachOp 的实现:

// ... existing code .../** Implementation class for reference streams */static final class OfRef<T> extends ForEachOp<T> {final Consumer<? super T> consumer;OfRef(Consumer<? super T> consumer, boolean ordered) {super(ordered);this.consumer = consumer;}@Overridepublic void accept(T t) {consumer.accept(t);}}
// ... existing code ...

可以看到,accept(T t) 方法直接调用了用户传入的 consumer.accept(t)。它自身没有任何同步措施

这就是 Stream 框架的“契约”forEach 是一个非常底层的操作,它追求极致的性能,因此把线程安全的责任完全交给了开发者。如果你在 forEach 的 Consumer 中操作一个共享的可变对象(比如往一个普通的 ArrayList 中添加元素),你必须自己处理同步,否则就会出现竞态条件、数据丢失或抛出 ConcurrentModificationException 等问题。

正确(但通常不推荐)的做法是:

List<String> sharedList = Collections.synchronizedList(new ArrayList<>());
stream.parallel().forEach(sharedList::add);

collect:框架负责线程安全(推荐方式)

这才是并行收集数据的正确且高效的方式。collect 操作远比 forEach 聪明,它专门设计用来解决并发问题。

collect 操作需要三个函数:

  1. Supplier (供应器)() -> new ArrayList<>()
  2. Accumulator (累加器)(list, item) -> list.add(item)
  3. Combiner (组合器)(list1, list2) -> { list1.addAll(list2); return list1; }

在并行执行时,collect 的工作流程如下:

  1. 分裂ForkJoinPool 将任务分裂给多个线程。
  2. 供应每个线程都会调用 Supplier 来创建自己私有的、局部的结果容器。例如,线程A得到 listA,线程B得到 listB。它们操作的不是同一个 ArrayList
  3. 累加:每个线程使用 Accumulator 将自己负责的元素累加到各自的局部容器中。线程A往 listA 里加,线程B往 listB 里加。因为操作的是线程私有对象,所以完全没有并发问题,速度极快。
  4. 组合:当所有线程都完成了自己的部分后,框架会使用 Combiner 将所有线程的局部结果合并成一个最终结果。例如,执行 listA.addAll(listB)。这个合并过程可能是串行的,也可能是分层并行的。

通过这种“分头累加,最后合并”的策略,collect 完美地避开了在核心并行阶段操作共享可变状态的问题,从而既保证了线程安全,又实现了高并发。

forEachOrdered (有序并行):通过缓冲和串行消费来保证安全

forEachOrdered 为了保证顺序,会先让各个并行任务处理数据并缓冲在各自的 Node 对象里。

最终,当轮到某个任务消费它的结果时,它是在一个确定的“happens-before”关系链中被触发的。这意味着对用户 action 的调用,实际上是串行化的,一个任务消费完了才会轮到下一个。因此,它也从根本上避免了对同一个 Sink 的并发写入问题。

总结

  • forEach:最快、最底层,但不安全。它把同步的烂摊子留给了你。如果你想并行收集到集合里,几乎永远都不应该用它。
  • collect并行收集的正确姿势。通过“本地容器+最终合并”的策略,由框架优雅地解决了并发问题,既安全又高效。
  • forEachOrdered:为了保证顺序,其最终消费阶段是串行化的,因此也是线程安全的。

http://www.dtcms.com/a/305498.html

相关文章:

  • 2025年6月电子学会青少年软件编程(C语言)等级考试试卷(二级)
  • 潇洒郎: Kafka Ubuntu 安装部署,命令行或者python生产数据与消费数据(kafka-python)
  • makefile中include *.d文件的作用
  • 安全和AI方向的学习路线
  • aws(学习笔记第五十课) ECS集中练习(2)
  • 项目目标如何拆解,才能提高执行效率和效果
  • 获取TensorRT引擎文件(.engine)版本号的几种方法
  • GitPython02-Git使用方式
  • 【Datawhale AI夏令营】科大讯飞AI大赛(大模型技术)/夏令营:让AI理解列车排期表(Task3)
  • Elasticsearch 全文检索与过滤
  • MyBatis Plus Wrapper 详细分析与原理
  • 设计模式十四:适配器模式(Adapter Pattern)
  • MCP提示词工程:上下文注入的艺术与科学
  • 【计算机视觉与代码大模型全景解析:从理论基础到学习路线】
  • VSCode高效集成开发全流程优化
  • [论文阅读] 人工智能 + 软件工程 | 增强RESTful API测试:针对MongoDB的搜索式模糊测试新方法
  • Jaeger理论、实战、问题记录
  • Python 中使用 OpenCV 库来捕获摄像头视频流并在窗口中显示
  • RAG实战指南 Day 28:RAG系统缓存与性能优化
  • Web3:赛道划分与发展趋势解析
  • JDBC编程笔记
  • 创建型设计模式-Builder
  • Git链接备用手册
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 微博内容IP地图可视化分析实现
  • 《设计模式之禅》笔记摘录 - 11.策略模式
  • 15 - 多模态大语言模型 — 图文 “牵线” 系统 “成长记”:借 CLIP 练本领,从图像与文字里精准 “搭鹊桥” 的全过程 (呆瓜版 - 2 号)
  • Java源码构建智能名片小程序
  • 短剧小程序系统开发:重塑影视内容消费格局
  • 北京理工大学医工交叉教学实践分享(1)|如何以实践破解数据挖掘教学痛点
  • 招工招聘小程序系统开发——打造一站式招聘服务平台