响应式编程框架Reactor【1】
文章目录
- 一、Reactor 框架概述与理论基础
- 1.1 响应式编程(Reactive Programming)是什么?
- 1.2 Reactive Streams 规范
- 1.3 响应式编程与 Reactor 的诞生
- 1.4 Reactor核心特性
- 1.5 Reactor与其它响应式框架比较
- 二、Reactor核心类型
- 2.1 Reactor 核心概念
- 2.2 核心类型
- 2.3 Mono【0个或者1个元素的流】
- 2.4 Flux【0到N个元素的流】
- 2.5 数据流生命周期
- 2.6 Reactor数据流模型
- 2.7 操作符链式调用
- 2.8 线程切换时序图
- 三、基础应用
- 3.1 基础Mono使用
- 3.2 基础Flux使用
- 3.3 异步与线程切换
- 3.4 背压(Backpressure)演示
- 3.5 错误处理
一、Reactor 框架概述与理论基础
官方文档
:
Project Reactor官网
Getting Started :: Reactor Core Reference Guide
https://www.reactive-streams.org/
https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html
1.1 响应式编程(Reactive Programming)是什么?
响应式编程
是一种面向数据流和变化传播的编程范式。它允许你声明式地定义数据流的转换、组合和处理逻辑,系统自动处理异步、背压、错误传播等复杂问题。
[!tip]
✅ 核心思想:数据流是第一公民,一切皆流(Everything is a Stream)。
1.2 Reactive Streams 规范
Reactor 实现了 Reactive Streams 规范,该规范定义了四个核心接口:
Publisher<T>
:发布者Subscriber<T>
:订阅者Subscription
:订阅关系(支持背压)Processor<T,R>
:处理器
有兴趣参照网址查看: reactive-streams.org
[!note]
🔗 Reactor 是 Project Reactor 的简称,由 Pivotal(现 VMware)开发,是 Spring WebFlux 的底层引擎。
1.3 响应式编程与 Reactor 的诞生
响应式编程(Reactive Programming) 是一种面向数据流和变化传播的编程范式,其核心思想是:将程序视为数据流的处理管道,通过异步非阻塞的方式传递和处理数据,并通过背压(Backpressure) 机制平衡生产者和消费者的速度差异。
在 Java 生态中,Reactor 框架是 Reactive Streams 规范的优秀实现,由 Pivotal 公司开发(与 Spring 同属一个团队),于 2013 年首次发布。它的诞生解决了以下核心问题:
- 传统同步阻塞 IO 在高并发场景下的性能瓶颈
- 异步编程中的 “回调地狱” 问题
- 缺乏标准化的背压机制导致的资源失控
- 与 Spring 生态(如 Spring WebFlux、Spring Cloud)的深度集成需求
Reactor 的核心理念是:“以声明式的方式处理异步数据流,同时保持代码的可读性和可维护性”。
1.4 Reactor核心特性
特性 | 说明 |
---|---|
异步非阻塞 | 基于事件驱动模型,避免线程阻塞,提高系统吞吐量 |
背压支持 | 消费者可主动告知生产者自己的处理能力,防止数据积压 |
声明式编程 | 通过操作符组合描述 “做什么”,而非 “怎么做” |
数据流组合 | 支持复杂的流组合(合并、连接、嵌套等) |
完善的错误处理 | 提供丰富的错误捕获、恢复和传递机制 |
与 Java 生态融合 | 兼容 Java 8 + 的 Stream API,支持 CompletableFuture 转换 |
轻量级 | 核心库体积小,无强依赖 |
1.5 Reactor与其它响应式框架比较
flowchart LRA[响应式框架] --> B[Reactor]A --> C[RxJava]A --> D[Akka Streams]B --> B1[与Spring生态深度集成]B --> B2[严格遵循Reactive Streams]B --> B3[专为Java 8+优化]B --> B4[更简洁的API设计]C --> C1[更早出现,生态成熟]C --> C2[支持多语言]C --> C3[操作符更丰富但复杂]D --> D1[基于Actor模型]D --> D2[分布式场景优势]D --> D3[学习曲线陡峭]
Reactor 的独特优势在于:
- 与 Spring WebFlux、Spring Cloud Gateway 等现代 Spring 组件无缝集成
- 对 Java 新特性(如虚拟线程、密封类)的原生支持
- 更简洁的 API 设计,降低响应式编程的学习门槛
二、Reactor核心类型
2.1 Reactor 核心概念
Reactor执行流程
2.2 核心类型
Reactor 提供了两个核心发布者类型:
类型 | 特点 | 适用场景 |
---|---|---|
Mono<T> | 0 或 1 个元素的异步序列 | 单个结果(如 HTTP 请求、数据库查询) |
Flux<T> | 0 到 N 个元素的异步序列 | 多个结果(如列表、事件流) |
2.3 Mono【0个或者1个元素的流】
Mono
用于表示包含 0 或 1 个元素的异步结果,适合处理单次操作(如数据库查询、HTTP 请求)的结果。
// 创建Mono【相当于事件的发布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "动态计算值"); // 延迟计算// 订阅Mono(触发执行)
mono.subscribe(value -> System.out.println("接收值:" + value), // 成功回调error -> System.err.println("错误:" + error), // 错误回调() -> System.out.println("完成") // 完成回调
);
2.4 Flux【0到N个元素的流】
Flux
用于表示包含 0 到多个元素的异步数据流,支持完整的生命周期(正常结束、错误终止)。常见场景:集合数据处理、事件流、批量操作等。
// 创建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范围1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成递增数(需手动取消订阅)// 订阅Flux
flux.map(x -> x * 2) // 转换操作符.filter(x -> x % 3 != 0) // 过滤操作符.subscribe(System.out::println, // 简化写法:仅处理成功事件Throwable::printStackTrace,() -> System.out.println("Flux完成"));
2.5 数据流生命周期
无论是Flux
还是Mono
,都遵循相同的生命周期:
- 正常事件:通过
onNext()
发送元素(Flux
可多次调用,Mono
最多调用一次) - 终止事件:
- 成功终止:
onComplete()
(无元素发送) - 错误终止:
onError(Throwable)
(携带异常信息)
- 成功终止:
2.6 Reactor数据流模型
2.7 操作符链式调用
2.8 线程切换时序图
三、基础应用
引入Maven依赖:
<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2024.0.6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>
</dependencies>
3.1 基础Mono使用
@Test
public void monoBasicTest() {// 1. 创建一个Mono对象(发射一个字符串)Mono<String> mono = Mono.just("Hello, Reactor!");// 2. 订阅并消费mono.subscribe(value -> System.out.println("✅ 接收到: " + value),error -> System.err.println("❌ 错误: " + error),() -> System.out.println("🎉 完成"),subscription -> {System.out.println("🔗 订阅建立");subscription.request(1); // 背压:请求 1 个});
}
3.2 基础Flux使用
@Test
public void fluxBasicTest() {// 创建一个Flux对象(发射多个字符串)Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail").map(String::toUpperCase).filter(s -> s.length() > 5).log();flux.subscribe(System.out::println,System.err::println,() -> System.out.println("流结束"));
}
🔍 log()
是调试利器,可查看所有信号(onNext, onError, onComplete)。
3.3 异步与线程切换
@Test
public void asyncTest(){Flux.just("张小三", "A", "B", "C").map(data -> {System.out.println("🔄 处理线程: " + Thread.currentThread().getName());return data + "-processed";}).subscribeOn(Schedulers.boundedElastic()) // 订阅在弹性线程池.publishOn(Schedulers.parallel()) // 发布在并行线程池.subscribe(result -> {System.out.println("📩 接收线程: " + Thread.currentThread().getName() + ", 数据: " + result);});System.out.println("MAIN THREAD: " + Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}
}
⚠️ subscribeOn()
影响上游执行线程,publishOn()
影响下游执行线程。
3.4 背压(Backpressure)演示
/*** 背压演示*/@Testpublic void backPressureTest() {Flux.range(1, 1000).onBackpressureDrop(item -> System.out.println("🗑️ 丢弃: " + item)) // 缓冲区满时丢弃.subscribe(new CoreSubscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(10); // 初始请求 10 个}@Overridepublic void onNext(Integer item) {System.out.println("✅ 接收: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {}subscription.request(1); // 每处理一个再要一个}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("✅ 完成");}});try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}
3.5 错误处理
/*** 错误处理*/
@Test
public void errorHandlerTest() {Flux.range(1, 5).map(i -> {if (i == 3) throw new RuntimeException("模拟错误");return "Item " + i;}).onErrorResume(e -> {System.err.println("⚠️ 捕获错误: " + e.getMessage());return Flux.just("Fallback 1", "Fallback 2"); // 错误后返回备用数据}).retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重试 2 次.subscribe(System.out::println);
}