Project Reactor响应式编程简介
前言:Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。在响应式编程(如 Project Reactor)中,理解 发布(Publish)与订阅(Subscribe)、生产者(Producer)与消费者(Consumer) 的概念非常重要。它们是构建异步、非阻塞数据流的基础模型。
一、Reactor基本概念
1. 发布者(Publisher)
-
是数据的提供方。
-
在 Project Reactor 中,
Flux
和Mono
都实现了Publisher<T>
接口。 -
它不主动发送数据,而是等待被订阅后才开始发射数据。
类比:就像一个电台频道,在没有人收听时它不会“广播”内容,只有当有人打开收音机(订阅),才会开始播放节目。
Flux<String> publisher = Flux.just("A", "B", "C"); // Publisher
2. 订阅者(Subscriber)
-
是数据的接收方。
-
实现
Subscriber<T>
接口,或者使用.subscribe()
方法作为简化方式。 -
订阅者会通过回调方法接收数据(
onNext
)、异常(onError
)或完成信号(onComplete
)。
publisher.subscribe(data -> System.out.println("Received: " + data), // onNexterr -> System.err.println("Error: " + err), // onError() -> System.out.println("Done!") // onComplete
);
3. 订阅(Subscription)
-
是连接
Publisher
和Subscriber
的桥梁。 -
每次调用
.subscribe()
都会创建一个新的Subscription
。 -
支持背压(backpressure)控制:消费者可以告诉生产者“我一次只能处理 N 个元素”。
publisher.subscribe(new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(1); // 请求第一个数据}@Overridepublic void onNext(String t) {System.out.println("Got: " + t);subscription.request(1); // 继续请求下一个}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Completed");}
});
二、生产者和消费者模型(Producer/Consumer)
角色 | 描述 |
---|---|
生产者(Producer) | 提供数据流的一方,即 Publisher (如 Flux , Mono ) |
消费者(Consumer) | 接收并处理数据的一方,即 Subscriber |
-
数据从生产者流向消费者。
-
这种模型支持异步非阻塞的数据传输。
-
可以通过 背压机制 控制流量,避免消费者被过量数据淹没。
三、Reactor 中的发布与订阅流程
[Publisher] --> (onSubscribe) --> [Subscriber]↓(request)↓
[Publisher emits data via onNext]↓
[Subscriber receives data]↓
[Eventually onComplete or onError]
流程说明:
-
订阅建立:
-
调用
.subscribe()
后,Publisher
会调用onSubscribe(Subscription)
。
-
-
请求数据:
-
Subscriber
调用subscription.request(n)
表示希望接收 n 个数据。
-
-
数据发射:
-
Publisher
发射数据项,调用onNext(T)
。
-
-
结束或错误:
-
成功结束:调用
onComplete()
。 -
出错:调用
onError(Throwable)
。
-
四、实际应用举例
示例:模拟生产者和消费者的协作(带背压)
Flux.range(1, 100).subscribe(new Subscriber<>() {private Subscription subscription;private int count = 0;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(5); // 初始请求5个数据}@Overridepublic void onNext(Integer integer) {System.out.println("Consuming: " + integer);count++;if (count % 5 == 0) {subscription.request(5); // 每消费5个再请求5个}}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("All items consumed!");}});
五、常见误区
错误理解 | 正确理解 |
---|---|
Flux.just(...) 会立即发射数据 | 不会,除非有订阅者才会发射 |
Flux 是热源(Hot) | 默认是冷源(Cold),每次订阅都会重新开始 |
subscribe() 返回值无关紧要 | 可用于取消订阅(返回 Disposable ) |
所有操作符都是同步的 | 很多操作符是异步的,比如 flatMap , delayElements 等 |
六、总结
概念 | 说明 |
---|---|
Publisher | 数据源,如 Flux 或 Mono |
Subscriber | 数据消费者,实现 onNext , onError , onComplete |
Subscription | 控制数据流动的接口,支持背压 |
生产者/消费者模型 | 数据从生产者流向消费者,由订阅驱动 |
背压(Backpressure) | 消费者可以控制生产者的发射速率 |
冷流 vs 热流 | 冷流每次订阅都从头开始;热流共享数据流(如 ConnectableFlux ) |
如果你正在使用 Spring WebFlux、RSocket、Kafka Streams 或其他响应式框架,理解这些核心概念将帮助你更好地设计和调试异步系统。