当前位置: 首页 > news >正文

publishOn and subscribeOn operators

Reactor 提供了两种在响应式链中切换执行上下文(或调度器)的方法:publishOnsubscribeOn。它们的核心作用是控制任务在哪个线程或线程池中执行,从而实现并发控制。理解它们的区别和使用方式是掌握 Reactor 并发模型的关键。


1. subscribeOn 的作用与特点

  • 作用subscribeOn 用于指定整个响应式链的订阅操作在哪个调度器(Scheduler)上执行。它会从源头开始,影响整个操作链的执行上下文。
  • 特点
    • 位置无关:无论 subscribeOn 出现在链中的哪个位置,它都会从源头开始生效,覆盖整个操作链。
    • 订阅时生效:只有在调用 subscribe() 之后,subscribeOn 才会真正生效,开始调度任务。
    • 适用于源头操作subscribeOn 通常用于修改源头操作的执行线程,例如将 Flux.fromIterable()Mono.just() 的执行线程切换到指定的调度器。
示例:
Flux<String> flux = Flux.just("A", "B", "C").subscribeOn(Schedulers.elastic()) // 从源头开始,所有操作都在 elastic 线程上执行.map(String::toUpperCase).subscribe(System.out::println);
  • 输出A, B, C,但执行在 elastic 线程中。

2. publishOn 的作用与特点

  • 作用publishOn 用于指定响应式链中后续操作的执行线程。它不会影响操作链的源头,只影响其后的内容。
  • 特点
    • 位置相关publishOn 的位置在链中非常重要,它只影响其后的内容。
    • 订阅后生效publishOn 在订阅之后才生效,它不会影响订阅前的操作。
    • 适用于中间操作publishOn 通常用于修改中间操作的执行线程,例如在 mapfilter 等操作之后切换线程。
示例:
Flux<String> flux = Flux.just("A", "B", "C").map(String::toUpperCase) // 在 main 线程执行.publishOn(Schedulers.elastic()) // 从 `publishOn` 之后的操作开始,使用 elastic 线程.subscribe(System.out::println);
  • 输出A, B, C,但 mapmain 线程执行,publishOn 之后的操作在 elastic 线程中。

3. 两者的核心区别

特性subscribeOnpublishOn
影响范围整个操作链仅后续操作
位置相关性无关相关
订阅时机订阅后生效订阅后生效
适用场景修改源头操作修改中间操作

4. 为什么 subscribeOn 会覆盖 publishOn

  • 在 Reactor 中,subscribeOn 会从源头开始,覆盖整个操作链的执行上下文。如果在 subscribeOn 之后又调用了 publishOn,那么 publishOn 的效果会被 subscribeOn 覆盖。
  • 例如,如果先调用 subscribeOn(Schedulers.elastic()),再调用 publishOn(Schedulers.parallel()),那么最终所有操作都会在 elastic 线程上执行,而不是 parallel 线程。
示例:
Flux<String> flux = Flux.just("A", "B", "C").subscribeOn(Schedulers.elastic()) // 从源头开始,使用 elastic 线程.publishOn(Schedulers.parallel())   // 之后的操作仍使用 elastic 线程.subscribe(System.out::println);
  • 输出A, B, C,所有操作都在 elastic 线程中执行。

5. 总结

  • subscribeOn 用于指定整个操作链的订阅线程,从源头开始生效。
  • publishOn 用于指定后续操作的执行线程,只影响其后的内容。
  • 理解两者的区别 是 Reactor 并发控制的关键。subscribeOn 更加“全局”,而 publishOn 更加“局部”。
  • 在实际开发中,subscribeOn 通常用于修改源头操作的执行线程,而 publishOn 用于优化中间操作的执行效率。

相关文章:

  • 算法第48天|单调栈:42. 接雨水、84.柱状图中最大的矩形
  • Java——Spring 非注解开发:IoC/DI 与 Bean 管理实战(含第三方组件整合)
  • 【机器学习深度学习】交互式线性回归 demo
  • day48-硬件学习之GPT定时器、UART及I2C
  • 【开源工具】Windows一键配置防火墙阻止策略(禁止应用联网)| 附完整Python源码
  • 事件循环(Event Loop)机制对比:Node.js vs 浏览器​
  • ethers.js express vue2 定时任务每天凌晨2点监听合约地址数据同步到Mysql整理
  • 【CMake基础入门教程】第六课:构建静态库 / 动态库 与安装规则(install)
  • MySQL至KES迁移最佳实践
  • 用 Spark 优化亿级用户画像计算:Delta Lake 增量更新策略详解
  • vue3 json 转 实体
  • 2.1、STM32 CAN外设简介
  • Vue3 中 Axios 深度整合指南:从基础到高级实践引言总结
  • MR30分布式IO:产线改造省时 70%
  • 22. 括号生成
  • AI编程工具深度对比:腾讯云代码助手CodeBuddy、Cursor与通义灵码
  • ubuntu20.04如何给appImage创建快捷方式
  • EXILIUM×亚矩云手机:重构Web3虚拟生存法则,开启多端跨链元宇宙自由征途
  • 【JeecgBoot AIGC】打造智能AI应用
  • 51c~嵌入式~PLC~三菱~合集1