Reactor Hot Versus Cold
这段文字详细解释了 Reactor 中 热发布者(Hot Publisher) 和 冷发布者(Cold Publisher) 的区别,并通过示例展示了它们的行为差异。以下是对其含义的总结和解释:
1. 冷发布者(Cold Publisher)
-
定义:冷发布者在订阅时才开始生成数据。如果没有订阅者,数据不会被生成。
-
行为:每个订阅者都会独立地触发数据的生成和处理流程。
-
类比:就像 HTTP 请求,每个订阅者都会触发一次新的请求,即使之前已经有人订阅过。
-
示例:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")).map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: " + d)); source.subscribe(d -> System.out.println("Subscriber 2: " + d));
输出结果:
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
每个订阅者都会接收到所有数据,因为每个订阅都会重新执行整个操作链 。
2. 热发布者(Hot Publisher)
-
定义:热发布者在创建时就开始发布数据,不依赖于订阅者的数量。即使没有订阅者,数据也会持续发布。
-
行为:订阅者只会看到从订阅开始之后发布的数据。如果在订阅之前已经发布了数据,新订阅者不会看到这些数据。
-
类比:就像一个股票价格发布者,一旦价格发生变化,所有订阅者都会收到更新,但新订阅者只会看到之后的价格变化。
-
示例:
Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort(); Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d)); hotSource.emitNext("blue", FAIL_FAST); hotSource.tryEmitNext("green").orThrow(); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d)); hotSource.emitNext("orange", FAIL_FAST); hotSource.tryEmitNext("purple").orThrow();
输出结果:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: ORANGE Subscriber 2 to Hot Source: PURPLE
第二个订阅者只看到 “orange” 和 “purple”,因为它们是在第一个订阅者之后发布的 。
3. 如何将冷发布者转换为热发布者
share()
:将冷发布者转换为热发布者,多个订阅者可以共享同一个发布者。第一个订阅者触发发布,后续订阅者共享数据。replay(n)
:将冷发布者转换为热发布者,并保留最近的n
个元素,新订阅者可以接收到这些元素。Sinks.Many
:通过Sinks.Many
手动控制数据的发布,可以模拟热发布者的行为 。
4. 如何将热发布者转换为冷发布者
defer()
:将热发布者(如just
)转换为冷发布者,延迟执行,直到有订阅者订阅时才生成数据 。
5. 总结
- 冷发布者:每个订阅者都会独立地触发数据的生成和处理。
- 热发布者:数据在创建时就开始发布,订阅者只会看到从订阅开始之后的数据。
- 转换方法:
- 冷 → 热:
share()
、replay()
、Sinks.Many
- 热 → 冷:
defer()
- 冷 → 热:
这种区分对于构建高效、可扩展的响应式系统非常重要,尤其是在处理大量并发请求或实时数据流时 。