当前位置: 首页 > news >正文

Java Flow API — Publisher、Subscriber 与 Processor 实战

Java 9 中,官方引入了 Flow APIjava.util.concurrent.Flow),它是对 Reactive Streams 规范 的标准实现,为 Java 提供了 响应式流编程 的原生支持。

本文将通过一个完整的示例,系统讲解 Flow API 的 四大核心角色,包括 Publisher、Subscriber、Subscription 和 Processor,并展示如何通过 Processor 处理器 构建一个数据加工的责任链。


一、Flow API 的四大核心角色

Flow API 定义了四个基础接口,分别对应响应式流中的不同角色:

  1. Publisher(发布者)
    数据的生产者,负责将数据推送给订阅者。

  2. Subscriber(订阅者)
    数据的消费者,定义了如何接收数据(onNext)、完成通知(onComplete)、异常处理(onError)。

  3. Subscription(订阅关系)
    发布者与订阅者之间的桥梁,用于控制 背压(Backpressure) —— 订阅者可以通过 request(n) 来告诉发布者:我准备好接收多少条数据。

  4. Processor(处理器)
    既是 Subscriber,又是 Publisher。它可以作为中间环节,对数据进行加工、转换,再继续向下游传递。


二、示例程序结构

下面的示例展示了一个典型的 发布者 → 多个处理器 → 订阅者 的责任链结构。

在这里插入图片描述

定义处理器 Processor

static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;  // 保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor 订阅绑定完成");this.subscription = subscription;subscription.request(1);  // 找上游要一个数据}// 数据到达,触发这个回调@Overridepublic void onNext(String item) {System.out.println("processor 拿到数据:" + item);// 再加工item += ": flux";submit(item); // 把我加工后的数据发出去subscription.request(1); // 再要新数据}@Overridepublic void onError(Throwable throwable) {// 将错误传播给下游this.closeExceptionally(throwable);}@Overridepublic void onComplete() {// 将完成事件传播给下游this.close();}}

这里 MyProcessor 的核心逻辑是:

  • 从上游拿到数据,在内容后加上 : flux,再传递给下游。
  • 同时正确处理 背压、异常、完成信号,保证整个流能 有序、安全、可控地流动。

在这里插入图片描述


定义订阅者 Subscriber

Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);this.subscription = subscription;subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println(Thread.currentThread() + "订阅者,接收到数据:" + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread() + "订阅者,接收到错误信号:" + throwable);}@Overridepublic void onComplete() {System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");}
};

这个订阅者会逐条打印接收到的数据,并通过 request(1) 逐步拉取。


绑定责任链

SubmissionPublisher<String> publisher = new SubmissionPublisher<>();MyProcessor myProcessor1 = new MyProcessor();
MyProcessor myProcessor2 = new MyProcessor();
MyProcessor myProcessor3 = new MyProcessor();// 发布者 -> 处理器1 -> 处理器2 -> 处理器3 -> 订阅者
publisher.subscribe(myProcessor1);
myProcessor1.subscribe(myProcessor2);
myProcessor2.subscribe(myProcessor3);
myProcessor3.subscribe(subscriber);

这里形成了一条典型的 责任链,数据会依次经过三个处理器加工,最终到达订阅者。


发布数据

for (int i = 0; i < 10; i++) {publisher.submit("hello-" + i);
}
publisher.close();

发布者提交 10 条数据,经过多个处理器逐步加工,最终被订阅者消费。


三、完整代码

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class FlowDemo {// 定义流中间操作处理器;只需要实现订阅者接口static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {private Flow.Subscription subscription;  // 保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor 订阅绑定完成");this.subscription = subscription;subscription.request(1);  // 找上游要一个数据}// 数据到达,触发这个回调@Overridepublic void onNext(String item) {System.out.println("processor 拿到数据:" + item);// 再加工item += ": flux";submit(item); // 把我加工后的数据发出去subscription.request(1); // 再要新数据}@Overridepublic void onError(Throwable throwable) {// 将错误传播给下游this.closeExceptionally(throwable);}@Overridepublic void onComplete() {// 将完成事件传播给下游this.close();}}/*** 1. Publisher:发布者* 2. Subscriber:订阅者* 3. Subscription:订阅关系* 4. Processor:处理器** @param args*/// 发布订阅模式:观察者模式public static void main(String[] args) throws InterruptedException {// 1. 定义发布者:发布数据SubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 定一个中间操作:给每个元素加个 flux 标识MyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();// 3. 定义一个订阅者:订阅感兴趣的发布者数据Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {private Flow.Subscription subscription;// 在订阅时,onXXX:在XXX事件发生时,执行这个回调@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread() + "订阅开始了:" + subscription);this.subscription = subscription;// 从上游请求一个数据subscription.request(1);}// 在下一个元素到达时:执行这个回调,即接受到了新数据@Overridepublic void onNext(String item) {System.out.println(Thread.currentThread() + "订阅者,接收到数据:" + item);subscription.request(1);  // 背压模式,根据自身能力来控制接收的数据量}// 在错误时:执行这个回调@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread() + "订阅者,接收到错误信号:" + throwable);}// 在完成时:执行这个回调@Overridepublic void onComplete() {System.out.println(Thread.currentThread() + "订阅者,接收到完成信号");}};// 4. 绑定发布者和订阅者publisher.subscribe(myProcessor1);  // 此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2);myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber);  // 链表关系绑定责任链// 绑定操作;就是发布者记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。for (int i = 0; i < 10; i++) {// 发布 10 条数据publisher.submit("hello-" + i);// publisher发布的所有数据在它的buffer区}// 发布者通道关闭publisher.close();// 发布者有数据,订阅者就能拿到System.out.println(Thread.currentThread());Thread.sleep(1500);}
}

运行程序后,我们会看到如下输出:

processor 订阅绑定完成
processor 订阅绑定完成
processor 订阅绑定完成
Thread[#1,main,5,main]
processor 拿到数据:hello-0
processor 拿到数据:hello-0: flux
processor 拿到数据:hello-1
processor 拿到数据:hello-0: flux: flux
processor 拿到数据:hello-2
processor 拿到数据:hello-1: flux
processor 拿到数据:hello-3
processor 拿到数据:hello-2: flux
processor 拿到数据:hello-1: flux: flux
processor 拿到数据:hello-4
processor 拿到数据:hello-3: flux
processor 拿到数据:hello-2: flux: flux
processor 拿到数据:hello-5
processor 拿到数据:hello-4: flux
processor 拿到数据:hello-3: flux: flux
processor 拿到数据:hello-6
processor 拿到数据:hello-5: flux
processor 拿到数据:hello-4: flux: flux
processor 拿到数据:hello-6: flux
processor 拿到数据:hello-7
processor 拿到数据:hello-5: flux: flux
processor 拿到数据:hello-7: flux
processor 拿到数据:hello-6: flux: flux
processor 拿到数据:hello-8
processor 拿到数据:hello-7: flux: flux
processor 拿到数据:hello-8: flux
processor 拿到数据:hello-9
processor 拿到数据:hello-8: flux: flux
processor 拿到数据:hello-9: flux
processor 拿到数据:hello-9: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅开始了:java.util.concurrent.SubmissionPublisher$BufferedSubscription@658d57f7
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-0: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-1: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-2: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-3: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-4: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-5: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-6: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-7: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-8: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到数据:hello-9: flux: flux: flux
Thread[#30,ForkJoinPool.commonPool-worker-1,5,main]订阅者,接收到完成信号

可以看到:

  1. 每条数据先被 Processor1 加工一次,再传给 Processor2,再传给 Processor3
  2. 最终到订阅者时,数据已经多次加工,例如 "hello-0: flux: flux: flux"
  3. 每次数据流动都遵循 背压机制request(1)),不会无限制地淹没下游。

四、背压机制(Backpressure)

Flow API 最大的特点之一就是 支持背压

  • 订阅者主动调用 request(n) 来告诉上游:我只准备好接收 n 条数据;
  • 发布者在推送时会遵循这个信号,避免内存膨胀或下游处理不过来。

这使得 Flow API 特别适合在 消息流处理、响应式系统 中使用。


五、责任链模式的优势

这种 Publisher → Processor → Subscriber 的链式结构有几个明显优势:

  1. 解耦:数据生产、加工、消费各司其职;
  2. 可扩展:Processor 可以无限插入,灵活组合;
  3. 异步并发:底层基于 ForkJoinPool,支持异步回调;
  4. 可控背压:避免订阅者被“压垮”。

这也是现代响应式编程框架(如 Reactor、RxJava)赖以构建的基础。


六、总结

本文通过 FlowDemo 示例展示了 Java Flow API 的完整使用流程:

  • Publisher 负责发布数据;
  • Processor 负责中间加工;
  • Subscriber 负责消费;
  • Subscription 负责协调背压;
  • 多个 Processor 串联,形成一条责任链;
  • 流式处理具备 异步 + 背压 特性,非常适合高并发、消息驱动的场景。

Flow API 的设计理念与 Reactive Streams 保持一致,现已成为响应式编程在 Java 平台的重要基石。


文章转载自:

http://ct9pgSSK.wdgsp.cn
http://0JB63KUl.wdgsp.cn
http://eMammLbC.wdgsp.cn
http://FEetNrDS.wdgsp.cn
http://E6Kyf8mo.wdgsp.cn
http://2TX09O8x.wdgsp.cn
http://LwLol25H.wdgsp.cn
http://WbDOcY2z.wdgsp.cn
http://emRs2Xm9.wdgsp.cn
http://V31DbQxx.wdgsp.cn
http://qytNSDc9.wdgsp.cn
http://YHC1hFvO.wdgsp.cn
http://3VdZAuQR.wdgsp.cn
http://tc0Ffffl.wdgsp.cn
http://RCTnOMNo.wdgsp.cn
http://plxOhrTX.wdgsp.cn
http://jxEWRK8h.wdgsp.cn
http://pKIoHD4k.wdgsp.cn
http://s4GDUd6f.wdgsp.cn
http://HuzLSYYf.wdgsp.cn
http://4xCLtrSM.wdgsp.cn
http://nxNE2GHT.wdgsp.cn
http://heapgGQa.wdgsp.cn
http://XesjZonx.wdgsp.cn
http://fDTjIbnh.wdgsp.cn
http://JzbU2ij2.wdgsp.cn
http://RTJTN2L6.wdgsp.cn
http://uq8rBBv7.wdgsp.cn
http://4o7zSovQ.wdgsp.cn
http://8tCuKL6P.wdgsp.cn
http://www.dtcms.com/a/376692.html

相关文章:

  • 基于POI-TL实现动态Word模板数据填充(含图表):从需求到落地的完整开发实践
  • 【大模型-写作】STORM提升文章深度
  • (纯新手教学)计算机视觉(opencv)实战十四——模板与多个对象匹配
  • 论文阅读:arxiv 2024 Large Language Model Enhanced Recommender Systems: A Survey
  • 微店平台商品详情接口技术实现:从接口解析到数据结构化全方案
  • (12)使用 Vicon 室内定位系统(一)
  • 疯狂星期四文案网第65天运营日记
  • 【从零开始】12. 一切回归原点
  • JavaSE之深入浅出 IO 流:字节流、字符流与序列化流详解(含完整代码示例)
  • 【大模型推理】Qwen2.5模型硬件要求与4090Ti多并发推理方案
  • Node 中进程与子进程的区别及使用场景
  • 【C++进阶系列】:万字详解红黑树(附模拟实现的源码)
  • 以供应链思维为钥,启数字化转型之门——读《供应链思维》有感
  • 体验访答浏览器
  • Zynq开发实践(FPGA之spi实现)
  • 2025年度总结
  • Redis 哨兵模式详解:实现高可用的自动故障转移方案
  • 电动汽车充电系统(EVCS)的入侵检测
  • 自定义事件发布器
  • 零基础学AI大模型之从0到1调用大模型API
  • vue3:调用接口的时候怎么只传递一个数组进去,得到一个key-value数据
  • Transformer 训不动:注意力 Mask 用反 / 广播错位
  • Prometheus部署监控实战
  • vue3引入海康监控视频组件并实现非分屏需求一个页面同时预览多个监控视频(2)
  • AGV 智能车驱动仓储效率提升:应用场景,智慧物流自动化实践指南
  • 【全栈实战】Elasticsearch 8.15.2 高可用集群部署与AI搜索全特性指南
  • Django REST Framework 构建安卓应用后端API:从开发到部署的完整实战指南
  • neo4j数据库创建范例(SQL文)
  • [rStar] docs | 求解协调器
  • WPF迁移avalonia之触发器