publishOn and subscribeOn operators
Reactor 提供了两种在响应式链中切换执行上下文(或调度器)的方法:publishOn
和 subscribeOn
。它们的核心作用是控制任务在哪个线程或线程池中执行,从而实现并发控制。理解它们的区别和使用方式是掌握 Reactor 并发模型的关键。
1. subscribeOn
的作用与特点
- 作用:
subscribeOn
用于指定整个响应式链的订阅操作在哪个调度器(Scheduler)上执行。它会从源头开始,影响整个操作链的执行上下文。 - 特点:
- 位置无关:无论
subscribeOn
出现在链中的哪个位置,它都会从源头开始生效,覆盖整个操作链。 - 订阅时生效:只有在调用
subscribe()
之后,subscribeOn
才会真正生效,开始调度任务。 - 适用于源头操作:
subscribeOn
通常用于修改源头操作的执行线程,例如将Flux.fromIterable()
或Mono.just()
的执行线程切换到指定的调度器。
- 位置无关:无论
示例:
Flux<String> flux = Flux.just("A", "B", "C").subscribeOn(Schedulers.elastic()) // 从源头开始,所有操作都在 elastic 线程上执行.map(String::toUpperCase).subscribe(System.out::println);
- 输出:
A
,B
,C
,但执行在elastic
线程中。
2. publishOn
的作用与特点
- 作用:
publishOn
用于指定响应式链中后续操作的执行线程。它不会影响操作链的源头,只影响其后的内容。 - 特点:
- 位置相关:
publishOn
的位置在链中非常重要,它只影响其后的内容。 - 订阅后生效:
publishOn
在订阅之后才生效,它不会影响订阅前的操作。 - 适用于中间操作:
publishOn
通常用于修改中间操作的执行线程,例如在map
、filter
等操作之后切换线程。
- 位置相关:
示例:
Flux<String> flux = Flux.just("A", "B", "C").map(String::toUpperCase) // 在 main 线程执行.publishOn(Schedulers.elastic()) // 从 `publishOn` 之后的操作开始,使用 elastic 线程.subscribe(System.out::println);
- 输出:
A
,B
,C
,但map
在main
线程执行,publishOn
之后的操作在elastic
线程中。
3. 两者的核心区别
特性 | subscribeOn | publishOn |
---|---|---|
影响范围 | 整个操作链 | 仅后续操作 |
位置相关性 | 无关 | 相关 |
订阅时机 | 订阅后生效 | 订阅后生效 |
适用场景 | 修改源头操作 | 修改中间操作 |
4. 为什么 subscribeOn
会覆盖 publishOn
?
- 在 Reactor 中,
subscribeOn
会从源头开始,覆盖整个操作链的执行上下文。如果在subscribeOn
之后又调用了publishOn
,那么publishOn
的效果会被subscribeOn
覆盖。 - 例如,如果先调用
subscribeOn(Schedulers.elastic())
,再调用publishOn(Schedulers.parallel())
,那么最终所有操作都会在elastic
线程上执行,而不是parallel
线程。
示例:
Flux<String> flux = Flux.just("A", "B", "C").subscribeOn(Schedulers.elastic()) // 从源头开始,使用 elastic 线程.publishOn(Schedulers.parallel()) // 之后的操作仍使用 elastic 线程.subscribe(System.out::println);
- 输出:
A
,B
,C
,所有操作都在elastic
线程中执行。
5. 总结
subscribeOn
用于指定整个操作链的订阅线程,从源头开始生效。publishOn
用于指定后续操作的执行线程,只影响其后的内容。- 理解两者的区别 是 Reactor 并发控制的关键。
subscribeOn
更加“全局”,而publishOn
更加“局部”。 - 在实际开发中,
subscribeOn
通常用于修改源头操作的执行线程,而publishOn
用于优化中间操作的执行效率。