响应式编程之Flow框架
文章目录
- 一、技术背景与产生原因
- 1.1 响应式编程的兴起
- 1.2 响应式流规范(Reactive Streams)
- 1.3 解决的问题
- 1.4 响应式编程
- 二、Flow API核心组件
- 2.1 核心概念
- 2.2 接口关系图
- 2.2 接口详解
- 2.3 背压机制
- 三、完整示例
- 3.1 入门示例
- 3.2 基础发布-订阅示例
- 3.3 带背压控制的完整示例
- 3.4 Processor处理器示例
- 四、应用场景与实践
- 4.1 适用场景
- 4.2 实践经验总结
- 4.2.1 背压策略选择
- 4.2.2 错误处理最佳实践
- 4.2.3 性能监控指标
- 4.2.4 资源管理建议
- 4.3 与其它技术对比
一、技术背景与产生原因
1.1 响应式编程的兴起
在分布式系统和大数据时代,应用程序需要处理:
- 高并发请求和海量数据流
- 异步和非阻塞I/O操作
- 实时数据处理需求
1.2 响应式流规范(Reactive Streams)
Java Flow API 是 Java 9 对响应式流规范(Reactive Streams Specification)的实现,该规范定义了:
- 异步流处理的标准
- 非阻塞背压(backpressure)机制
- 四个核心接口:Publisher、Subscriber、Subscription、Processor
1.3 解决的问题
- 背压管理:防止快速生产者淹没慢速消费者
- 资源控制:允许消费者控制数据流量
- 统一标准:提供跨库的互操作性
1.4 响应式编程
- 底层: 基于数据缓冲队列 + 消息驱动模型 + 异步回调机制(事件驱动)
- 编码: 流式编程 + 链式调用 + 声明式API
- 效果: 优雅全异步 + 消息实时处理 + 高吞量 + 占用少量资源
二、Flow API核心组件
2.1 核心概念
- Publisher: 发布者,负责发布一系列元素给订阅者。
- Subscriber: 订阅者,接收来自发布者的元素。
- Subscription: 订阅关系,定义了发布者和订阅者之间的联系,包括请求元素的数量和取消订阅的操作。
- Processor: 处理器,同时作为发布者和订阅者,可以修改或转换数据流中的元素。
组件 | 角色 | 核心方法 | 作用 |
---|---|---|---|
Flow.Publisher | 数据生产者 | void subscribe(Subscriber<? super T> subscriber) | 接收订阅者,负责生产并发送数据 |
Flow.Subscriber | 数据消费者 | onSubscribe(Subscription s) 、onNext(T item) 、onError(Throwable t) 、onComplete() | 接收并处理数据,通过Subscription 反馈处理能力 |
Flow.Subscription | 订阅关系 | void request(long n) 、void cancel() | 管理生产者与消费者的关系,实现背压(通过request(n) 请求 n 个数据) |
Flow.Processor | 数据处理器 | 继承Publisher 和Subscriber | 同时作为消费者和生产者,用于数据转换、过滤等中间处理 |
2.2 接口关系图
2.2 接口详解
Flow
框架由四个核心接口构成,遵循“发布-订阅”模式:
1. Publisher<T> 数据发布者
发布者:负责生成数据并发送给订阅者
@FunctionalInterface
public static interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
Publisher
是数据的源头,通过 subscribe()
方法注册订阅者。当有新的订阅时,会调用订阅者的 onSubscribe()
方法并传入 Subscription
对象。
另外它还有一个较为常用的实现类SubmissionPublisher
public class SubmissionPublisher<T> implements Publisher<T>,AutoCloseable {}
2. Subscriber<T> 数据订阅者
订阅者:接收并处理数据。
public interface Subscriber<T> {// 订阅建立时调用,接收Subscription用于控制流void onSubscribe(Subscription subscription);// 接收数据项void onNext(T item);// 错误通知void onError(Throwable throwable);// 完成通知void onComplete();
}
3. Subscription 订阅控制
订阅关系:连接发布者和订阅者,支持背压控制。
public interface Subscription {// 请求n个数据项void request(long n);// 取消订阅void cancel();
}
Processor<T,R> 处理器(既是发布者又是订阅者)
处理器:既是订阅者又是发布者,用于数据转换。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {// 同时具备 Subscriber 和 Publisher 的方法
}
Processor
是数据处理的中间节点,既可以订阅上游数据,也可以向下游发布处理后的数据
2.3 背压机制
背压(Backpressure)是 Flow API 的核心特性,允许**订阅者控制接收数据的速率,防止被数据淹没。**通过 Subscription.request(n)
方法,订阅者可以明确告知发布者自己准备接收多少数据。
在异步数据流中,如果数据生产的速度超过了消费的速度,可能会导致系统资源被耗尽,如内存溢出等问题。背压机制提供了一种方式,使得消费者能够向生产者发出信号,告诉生产者减慢数据生产的速率或者暂时停止发送数据,直到消费者准备好处理更多的数据为止。
背压工作原理
- 流量控制:背压机制本质上是一种流量控制的方法。它允许消费者根据自己的处理能力来调整从生产者那里接收数据的速率。这可以防止因快速产生的数据淹没消费者而导致的问题。
- 反馈环路:当消费者的处理能力达到上限时,它会通过某种机制向生产者发送一个信号,表明自己不能再接受更多的数据。这个信号构成了一个反馈环路,允许生产者知道何时应该放慢数据产生速度或暂停发送数据。
- 策略选择:不同的响应式编程库或框架可能提供了不同的背压策略供开发者选择,例如:
- 缓冲:临时存储无法立即处理的数据项。
- 丢弃:简单地丢弃超出消费者处理能力的数据项。
- 错误通知:当超过一定的阈值时,向生产者发送错误通知。
三、完整示例
3.1 入门示例
package cn.tcmeta.flow;import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;/*** @author: laoren* @description: 基础发布-订阅(含背压)* @version: 1.0.0*/
public class BasicFlowExample {static void main() {// 1. 创建发布者try(SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 2. 创建订阅者Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {// 订阅控制private Flow.Subscription subscription;// 第一步:建立连接@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;System.out.println("✅ 订阅成功,请求 2 个数据");subscription.request(2); // 背压: 先要2个数据}// 第二步:接收数据@Overridepublic void onNext(String item) {System.out.println("📩 收到数据: " + item);// 模拟处理耗时操作try {TimeUnit.MILLISECONDS.sleep(500);}catch (InterruptedException e){e.printStackTrace();}System.out.println("👉 请求下一个");subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("❌ 错误: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("🎉 数据流结束");}};// 3. 订阅publisher.subscribe(subscriber);// 4. 发送数据List<String> dataList = List.of("A", "B", "C", "D", "E", "F");for (String data : dataList) {// 发布数据, 返回值: 订阅者之间的估计最大延迟int accepted = publisher.submit(data);System.out.println("数据发布: " + data + ", 延迟: " + accepted);}// 5. 关闭发布者(触发onComplete)// publisher.close();} // 6. 自动调用publisher.close()try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}}
}
🔑 关键点:
- 订阅者通过
request(n)
控制消费速度(背压) submit()
非阻塞,可能失败(缓冲区满)try-with-resources
自动关闭发布者 → 触发onComplete
3.2 基础发布-订阅示例
定义发布者
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;/*** @author: laoren* @description: 简单数据发布者示例* @version: 1.0.0*/
public class SimplePublisher {private final SubmissionPublisher<Integer> publisher;public SimplePublisher() {this.publisher = new SubmissionPublisher<>(Runnable::run,Flow.defaultBufferSize());}public void startPublishing(){// 发布100个数据项IntStream.range(1, 101).forEach(i -> {System.out.println("数据发布: " + i);publisher.submit(i);try {TimeUnit.MILLISECONDS.sleep(30);}catch (InterruptedException e){e.printStackTrace();}});publisher.close();}public Flow.Publisher<Integer> getPublisher(){return publisher;}
}
定义订阅者
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;/*** @author: laoren* @description: Flow.Subscriber 订阅者* @version: 1.0.0*/
public class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;private final String name;private final long requestSize;private long receivedCount = 0;public SimpleSubscriber(String name, long requestSize) {this.name = name;this.requestSize = requestSize;}/*** 订阅事件源, 在事件发生时,会回调这个函数* @param subscription a new subscription*/@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(name + ": 订阅建立!");this.subscription = subscription;// 初始请求一批数据subscription.request(requestSize);}/*** 接收到数据时,会回调这个函数* @param item the item*/@Overridepublic void onNext(Integer item) {receivedCount ++;System.out.println("-----------------------------------------");System.out.println(name + ": 接收到数据: " + item);// 每处理requestSize个数据后请求下一批if(receivedCount % requestSize == 0){System.out.println(name + ": 请求下一批数据 - (" + requestSize + " )" + " 个");subscription.request(requestSize);}// 模拟处理延迟try {TimeUnit.MILLISECONDS.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}}/*** 当数据处理异常时调用* @param throwable the exception*/@Overridepublic void onError(Throwable throwable) {System.out.println(name + ": 发生错误: " + throwable.getMessage());}/*** 当数据处理完成时调用*/@Overridepublic void onComplete() {System.out.println(name + ": 数据处理完成,共接收: " + receivedCount + "个数据");}
}
测试代码
package cn.tcmeta.flow;import java.util.concurrent.TimeUnit;/*** @author: laoren* @description: TODO* @version: 1.0.0*/
public class FlowBaseExample {static void main() {// 1. 创建发布者SimplePublisher publisher = new SimplePublisher();// 2. 创建订阅者SimpleSubscriber simpleSubscriber1 = new SimpleSubscriber("订阅者1", 5);SimpleSubscriber simpleSubscriber2 = new SimpleSubscriber("订阅者2", 3);// 3. 建立订阅关系publisher.getPublisher().subscribe(simpleSubscriber1);publisher.getPublisher().subscribe(simpleSubscriber2);// 4. 启动发布者publisher.startPublishing();try {TimeUnit.MILLISECONDS.sleep(4000);}catch (InterruptedException e){e.printStackTrace();}}
}
应用场景:
- 实时数据处理:例如股票市场数据、社交媒体更新等需要即时处理的信息流。
- 高并发Web应用:使用Flow API可以有效地管理大量的并发连接,减少服务器资源消耗。
- 事件驱动系统:适用于需要监听并响应特定事件的应用程序
实践经验:
- 避免阻塞操作:确保在
onNext
,onSubscribe
,onError
和onComplete
方法中不执行耗时或阻塞操作,以免影响整个流的性能。 - 合理设置请求数量:通过
request(long n)
方法控制从发布者获取元素的速度,防止内存溢出或性能下降。 - 错误处理机制:良好的错误处理策略是保证系统稳定性的关键,应该正确地利用
onError
方法处理异常情况。
3.3 带背压控制的完整示例
背压是 Flow 框架的核心特性,允许消费者根据自身处理能力控制生产者的发送速度。下面示例展示当消费者处理速度慢于生产者时,背压如何避免数据积压。
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class BackpressureExample {static void main(String[] args) throws InterruptedException {// 创建发布者(使用JDK提供的SubmissionPublisher)try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {// 创建慢消费者(处理速度慢于生产速度)SlowSubscriber subscriber = new SlowSubscriber();publisher.subscribe(subscriber);// 快速生产数据(生产速度快于消费速度)System.out.println("开始快速生产数据...");for (int i = 1; i <= 10; i++) {// submit()返回当前未处理的消息数(背压作用下会逐渐增加)int pending = publisher.submit(i);System.out.println("生产数据: " + i + ",当前未处理数: " + pending);TimeUnit.MILLISECONDS.sleep(100); // 快速生产(100ms/个)}// 等待所有数据处理完成while (publisher.hasSubscribers()) {TimeUnit.MILLISECONDS.sleep(500);}}}// 慢消费者(处理速度慢,展示背压效果)static class SlowSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;private int bufferSize = 2; // 消费者缓冲区大小(一次最多处理2个数据)@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 初始请求bufferSize个数据(告知生产者自己的处理能力)System.out.println("消费者初始化,请求" + bufferSize + "个数据");subscription.request(bufferSize);}@Overridepublic void onNext(Integer item) {System.out.println("消费者开始处理数据: " + item);// 模拟慢速处理(500ms/个,慢于生产速度)try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}System.out.println("消费者完成处理: " + item);// 每处理1个数据,再请求1个(维持缓冲区大小)subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("处理错误: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("所有数据处理完成");}}
}
3.4 Processor处理器示例
使用Processor
时行数据转换
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** @author: laoren* @description: 自定义 Processor:将 Integer 转为 String* @version: 1.0.0*/
public class NumberToStringProcessor extends SubmissionPublisher<String>implements Flow.Processor<Integer, String> {// 绑定关系private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;subscription.request(1); // 向上游请求数据}@Overridepublic void onNext(Integer item) {// 转换数据发布submit("Number: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {closeExceptionally(throwable);}@Overridepublic void onComplete() {close();}
}
测试基本功能
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;/*** @author: laoren* @description: Processor* @version: 1.0.0*/
public class ProcessorExample2 {static void main() {try (var publisher = new SubmissionPublisher<Integer>();var processor = new NumberToStringProcessor()) {// 构建数据流:Publisher → Processor → Subscriber// 这一步是将发布者(publisher)与处理器(processor)关联起来,从而实现数据流转。publisher.subscribe(processor);// Processor -> Subscriberprocessor.subscribe(new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription s) {this.subscription = s;s.request(1);System.out.println("Processor -> Subscriber");}@Overridepublic void onNext(String item) {System.out.println("✅ " + item);this.subscription.request(1);}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("✨ 完成");}});// 发送消息IntStream.rangeClosed(1, 5).forEach(a -> {publisher.submit(a);System.out.println("数据发送了~~~ " + a);});} // 自动关闭try {TimeUnit.MILLISECONDS.sleep(2000);} catch (InterruptedException e) {}}
}
Processor
是连接生产者和消费者的中间组件,同时实现Publisher
和Subscriber
接口,用于数据转换、过滤或聚合。下面示例实现一个数据过滤和转换的 Processor。
package cn.tcmeta.flow;import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class ProcessorExample {static void main(String[] args) throws InterruptedException {// 1. 创建原始数据发布者(发布整数)try (SubmissionPublisher<Integer> sourcePublisher = new SubmissionPublisher<>()) {// 2. 创建处理器(过滤偶数,并转换为字符串)FilterAndConvertProcessor processor = new FilterAndConvertProcessor();// 3. 建立发布者->处理器->消费者的连接sourcePublisher.subscribe(processor); // 发布者订阅到处理器SimpleStringSubscriber subscriber = new SimpleStringSubscriber();processor.subscribe(subscriber); // 处理器订阅到消费者// 4. 发布原始数据(1-10的整数)for (int i = 1; i <= 10; i++) {sourcePublisher.submit(i);System.out.println("发布原始数据: " + i);TimeUnit.MILLISECONDS.sleep(100);}// 等待处理完成while (sourcePublisher.hasSubscribers() || processor.hasSubscribers()) {TimeUnit.MILLISECONDS.sleep(200);}}}// 自定义处理器:过滤偶数,将奇数转换为字符串static class FilterAndConvertProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {private Flow.Subscription subscription; // 上游订阅关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 向上游请求数据(初始请求3个)subscription.request(3);}@Overridepublic void onNext(Integer item) {System.out.println("处理器接收原始数据: " + item);// 过滤:只处理奇数if (item % 2 != 0) {// 转换:整数->字符串String converted = "奇数-" + item;// 向下游发布处理后的数据this.submit(converted);}// 每处理1个,再向上游请求1个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("处理器出错: " + throwable.getMessage());// 向下游传递错误this.closeExceptionally(throwable);}@Overridepublic void onComplete() {System.out.println("处理器完成处理");// 向下游传递完成信号this.close();}}// 字符串消费者(接收处理器转换后的数据)static class SimpleStringSubscriber implements Flow.Subscriber<String> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 向处理器请求2个数据subscription.request(2);}@Overridepublic void onNext(String item) {System.out.println("消费者接收处理后的数据: " + item);// 处理完成后再请求1个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("消费者出错: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("消费者处理完成");}}
}
流程图
时序图
四、应用场景与实践
4.1 适用场景
- 实时数据流处理
- 金融市场价格流
- IoT设备数据采集
- 实时日志处理
- 大数据处理管道
- ETL数据处理
- 数据转换和 enrichment
- 流水线式计算
- 高并发消息系统
- 消息队列消费者
- 事件驱动架构
- 微服务间通信
- 响应式Web应用
- Server-Sent Events
- WebSocket数据流
- 实时UI更新
4.2 实践经验总结
4.2.1 背压策略选择
// 策略1: 丢弃最新数据(适合实时性要求高的场景)
class DropNewestStrategy implements Flow.Subscriber<Data> {public void onNext(Data item) {if (!isOverloaded()) {process(item);subscription.request(1);}// 否则丢弃数据,不请求新数据}
}// 策略2: 缓冲数据(适合数据完整性要求高的场景)
class BufferStrategy implements Flow.Subscriber<Data> {private final Queue<Data> buffer = new ConcurrentLinkedQueue<>();public void onNext(Data item) {buffer.offer(item);if (canProcess()) {process(buffer.poll());}subscription.request(1);}
}
4.2.2 错误处理最佳实践
class ResilientSubscriber implements Flow.Subscriber<Data> {private Subscription subscription;private int retryCount = 0;public void onError(Throwable throwable) {if (retryCount < MAX_RETRIES) {retryCount++;System.out.println("重试 #" + retryCount);// 重新建立订阅// 需要保存对原始Publisher的引用} else {System.err.println("达到最大重试次数");}}
}
4.2.3 性能监控指标
class MonitoredPublisher<T> implements Flow.Publisher<T> {private final AtomicLong publishCount = new AtomicLong();private final AtomicLong requestCount = new AtomicLong();public void subscribe(Flow.Subscriber<? super T> subscriber) {// 包装订阅者以收集指标originalPublisher.subscribe(new Flow.Subscriber<T>() {public void onNext(T item) {publishCount.incrementAndGet();subscriber.onNext(item);}public void onSubscribe(Flow.Subscription subscription) {subscriber.onSubscribe(new Flow.Subscription() {public void request(long n) {requestCount.addAndGet(n);subscription.request(n);}});}});}public double getBackpressureRatio() {return (double) requestCount.get() / publishCount.get();}
}
4.2.4 资源管理建议
- 总是使用try-with-resources或确保调用close()
- 监控订阅者处理时间,避免阻塞onNext()
- 为不同的流类型使用不同的线程池
4.3 与其它技术对比
特性 | Java Flow API | Reactor | RxJava | Java Streams |
---|---|---|---|---|
背压支持 | ✅ 内置 | ✅ 内置 | ✅ 内置 | ❌ 无 |
异步支持 | ✅ | ✅ | ✅ | ❌ 同步 |
操作符丰富度 | 基础 | 丰富 | 极其丰富 | 中等 |
学习曲线 | 平缓 | 中等 | 陡峭 | 平缓 |
与Java集成 | 最好(标准库) | 很好 | 好 | 最好 |