Java Flow API — Publisher、Subscriber 与 Processor 实战
在 Java 9 中,官方引入了 Flow API(java.util.concurrent.Flow
),它是对 Reactive Streams 规范 的标准实现,为 Java 提供了 响应式流编程 的原生支持。
本文将通过一个完整的示例,系统讲解 Flow API 的 四大核心角色,包括 Publisher、Subscriber、Subscription 和 Processor,并展示如何通过 Processor 处理器 构建一个数据加工的责任链。
一、Flow API 的四大核心角色
Flow API 定义了四个基础接口,分别对应响应式流中的不同角色:
-
Publisher(发布者)
数据的生产者,负责将数据推送给订阅者。 -
Subscriber(订阅者)
数据的消费者,定义了如何接收数据(onNext
)、完成通知(onComplete
)、异常处理(onError
)。 -
Subscription(订阅关系)
发布者与订阅者之间的桥梁,用于控制 背压(Backpressure) —— 订阅者可以通过request(n)
来告诉发布者:我准备好接收多少条数据。 -
Processor(处理器)
既是Subscriber
,又是Publisher
。它可以作为中间环节,对数据进行加工、转换,再继续向下游传递。
二、示例程序结构
下面的示例展示了一个典型的 发布者 → 多个处理器 → 订阅者 的责任链结构。
定义处理器 Processor
static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription; // 保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor 订阅绑定完成");this.subscription = subscription;subscription.request(1); // 找上游要一个数据}// 数据到达,触发这个回调@Overridepublic void onNext(String item) {System.out.println("processor 拿到数据:" + item);// 再加工item += ": flux";submit(item); // 把我加工后的数据发出去subscription.request(1); // 再要新数据}@Overridepublic void onError(Throwable throwable) {// 将错误传播给下游this.closeExceptionally(throwable);}@Overridepublic void onComplete() {// 将完成事件传播给下游this.close();}}
这里 MyProcessor
的核心逻辑是:
- 从上游拿到数据,在内容后加上
: flux
,再传递给下游。 - 同时正确处理 背压、异常、完成信号,保证整个流能 有序、安全、可控地流动。
定义订阅者 Subscriber
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);this.subscription = subscription;subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println(Thread.currentThread() + "订阅者,接收到数据:" + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread() + "订阅者,接收到错误信号:" + throwable);}@Overridepublic void onComplete() {System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");}
};
这个订阅者会逐条打印接收到的数据,并通过 request(1)
逐步拉取。
绑定责任链
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();MyProcessor myProcessor1 = new MyProcessor();
MyProcessor myProcessor2 = new MyProcessor();
MyProcessor myProcessor3 = new MyProcessor();// 发布者 -> 处理器1 -> 处理器2 -> 处理器3 -> 订阅者
publisher.subscribe(myProcessor1);
myProcessor1.subscribe(myProcessor2);
myProcessor2.subscribe(myProcessor3);
myProcessor3.subscribe(subscriber);
这里形成了一条典型的 责任链,数据会依次经过三个处理器加工,最终到达订阅者。
发布数据
for (int i = 0; i < 10; i++) {publisher.submit("hello-" + i);
}
publisher.close();
发布者提交 10 条数据,经过多个处理器逐步加工,最终被订阅者消费。
三、完整代码
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class FlowDemo {// 定义流中间操作处理器;只需要实现订阅者接口static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription; // 保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor 订阅绑定完成");this.subscription = subscription;subscription.request(1); // 找上游要一个数据}// 数据到达,触发这个回调@Overridepublic void onNext(String item) {System.out.println("processor 拿到数据:" + item);// 再加工item += ": flux";submit(item); // 把我加工后的数据发出去subscription.request(1); // 再要新数据}@Overridepublic void onError(Throwable throwable) {// 将错误传播给下游this.closeExceptionally(throwable);}@Overridepublic void onComplete() {// 将完成事件传播给下游this.close();}}/*** 1. Publisher:发布者* 2. Subscriber:订阅者* 3. Subscription:订阅关系* 4. Processor:处理器** @param args*/// 发布订阅模式:观察者模式public static void main(String[] args) throws InterruptedException {// 1. 定义发布者:发布数据SubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 定一个中间操作:给每个元素加个 flux 标识MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();// 3. 定义一个订阅者:订阅感兴趣的发布者数据Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;// 在订阅时,onXXX:在XXX事件发生时,执行这个回调@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);this.subscription = subscription;// 从上游请求一个数据subscription.request(1);}// 在下一个元素到达时:执行这个回调,即接受到了新数据@Overridepublic void onNext(String item) {System.out.println(Thread.currentThread() + "订阅者,接收到数据:" + item);subscription.request(1); // 背压模式,根据自身能力来控制接收的数据量}// 在错误时:执行这个回调@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread() + "订阅者,接收到错误信号:" + throwable);}// 在完成时:执行这个回调@Overridepublic void onComplete() {System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");}};// 4. 绑定发布者和订阅者publisher.subscribe(myProcessor1); // 此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2);myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber); // 链表关系绑定责任链// 绑定操作;就是发布者记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。for (int i = 0; i < 10; i++) {// 发布 10 条数据publisher.submit("hello-" + i);// publisher发布的所有数据在它的buffer区}// 发布者通道关闭publisher.close();// 发布者有数据,订阅者就能拿到System.out.println(Thread.currentThread());Thread.sleep(1500);}
}
运行程序后,我们会看到如下输出:
processor 订阅绑定完成
processor 订阅绑定完成
processor 订阅绑定完成
Thread[#1,main,5,main]
processor 拿到数据:hello-0
processor 拿到数据:hello-0: flux
processor 拿到数据:hello-1
processor 拿到数据:hello-0: flux: flux
processor 拿到数据:hello-2
processor 拿到数据:hello-1: flux
processor 拿到数据:hello-3
processor 拿到数据:hello-2: flux
processor 拿到数据:hello-1: flux: flux
processor 拿到数据:hello-4
processor 拿到数据:hello-3: flux
processor 拿到数据:hello-2: flux: flux
processor 拿到数据:hello-5
processor 拿到数据:hello-4: flux
processor 拿到数据:hello-3: flux: flux
processor 拿到数据:hello-6
processor 拿到数据:hello-5: flux
processor 拿到数据:hello-4: flux: flux
processor 拿到数据:hello-6: flux
processor 拿到数据:hello-7
processor 拿到数据:hello-5: flux: flux
processor 拿到数据:hello-7: flux
processor 拿到数据:hello-6: flux: flux
processor 拿到数据:hello-8
processor 拿到数据:hello-7: flux: flux
processor 拿到数据:hello-8: flux
processor 拿到数据:hello-9
processor 拿到数据:hello-8: flux: flux
processor 拿到数据:hello-9: flux
processor 拿到数据:hello-9: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅开始了:java.util.concurrent.SubmissionPublisher$BufferedSubscription@658d57f7
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-0: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-1: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-2: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-3: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-4: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-5: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-6: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-7: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-8: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-9: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到完成信号
可以看到:
- 每条数据先被 Processor1 加工一次,再传给 Processor2,再传给 Processor3;
- 最终到订阅者时,数据已经多次加工,例如
"hello-0: flux: flux: flux"
; - 每次数据流动都遵循 背压机制(
request(1)
),不会无限制地淹没下游。
四、背压机制(Backpressure)
Flow API 最大的特点之一就是 支持背压。
- 订阅者主动调用
request(n)
来告诉上游:我只准备好接收n
条数据; - 发布者在推送时会遵循这个信号,避免内存膨胀或下游处理不过来。
这使得 Flow API 特别适合在 消息流处理、响应式系统 中使用。
五、责任链模式的优势
这种 Publisher → Processor → Subscriber 的链式结构有几个明显优势:
- 解耦:数据生产、加工、消费各司其职;
- 可扩展:Processor 可以无限插入,灵活组合;
- 异步并发:底层基于
ForkJoinPool
,支持异步回调; - 可控背压:避免订阅者被“压垮”。
这也是现代响应式编程框架(如 Reactor、RxJava)赖以构建的基础。
六、总结
本文通过 FlowDemo
示例展示了 Java Flow API 的完整使用流程:
- Publisher 负责发布数据;
- Processor 负责中间加工;
- Subscriber 负责消费;
- Subscription 负责协调背压;
- 多个 Processor 串联,形成一条责任链;
- 流式处理具备 异步 + 背压 特性,非常适合高并发、消息驱动的场景。
Flow API 的设计理念与 Reactive Streams 保持一致,现已成为响应式编程在 Java 平台的重要基石。