当前位置: 首页 > news >正文

响应式编程框架Reactor【1】

文章目录

  • 一、Reactor 框架概述与理论基础
    • 1.1 响应式编程(Reactive Programming)是什么?
    • 1.2 Reactive Streams 规范
    • 1.3 响应式编程与 Reactor 的诞生
    • 1.4 Reactor核心特性
    • 1.5 Reactor与其它响应式框架比较
  • 二、Reactor核心类型
    • 2.1 Reactor 核心概念
    • 2.2 核心类型
    • 2.3 Mono【0个或者1个元素的流】
    • 2.4 Flux【0到N个元素的流】
    • 2.5 数据流生命周期
    • 2.6 Reactor数据流模型
    • 2.7 操作符链式调用
    • 2.8 线程切换时序图
  • 三、基础应用
    • 3.1 基础Mono使用
    • 3.2 基础Flux使用
    • 3.3 异步与线程切换
    • 3.4 背压(Backpressure)演示
    • 3.5 错误处理

一、Reactor 框架概述与理论基础

官方文档:

Project Reactor官网

Getting Started :: Reactor Core Reference Guide

https://www.reactive-streams.org/

https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html

1.1 响应式编程(Reactive Programming)是什么?

响应式编程是一种面向数据流和变化传播的编程范式。它允许你声明式地定义数据流的转换、组合和处理逻辑,系统自动处理异步、背压、错误传播等复杂问题。

[!tip]

✅ 核心思想:数据流是第一公民,一切皆流(Everything is a Stream)。

1.2 Reactive Streams 规范

Reactor 实现了 Reactive Streams 规范,该规范定义了四个核心接口:

  • Publisher<T>:发布者
  • Subscriber<T>:订阅者
  • Subscription:订阅关系(支持背压)
  • Processor<T,R>:处理器

有兴趣参照网址查看: reactive-streams.org

[!note]

🔗 Reactor 是 Project Reactor 的简称,由 Pivotal(现 VMware)开发,是 Spring WebFlux 的底层引擎。

1.3 响应式编程与 Reactor 的诞生

响应式编程(Reactive Programming) 是一种面向数据流和变化传播的编程范式,其核心思想是:将程序视为数据流的处理管道,通过异步非阻塞的方式传递和处理数据,并通过背压(Backpressure) 机制平衡生产者和消费者的速度差异。

在 Java 生态中,Reactor 框架是 Reactive Streams 规范的优秀实现,由 Pivotal 公司开发(与 Spring 同属一个团队),于 2013 年首次发布。它的诞生解决了以下核心问题:

  • 传统同步阻塞 IO 在高并发场景下的性能瓶颈
  • 异步编程中的 “回调地狱” 问题
  • 缺乏标准化的背压机制导致的资源失控
  • 与 Spring 生态(如 Spring WebFlux、Spring Cloud)的深度集成需求

Reactor 的核心理念是:“以声明式的方式处理异步数据流,同时保持代码的可读性和可维护性”

1.4 Reactor核心特性

特性说明
异步非阻塞基于事件驱动模型,避免线程阻塞,提高系统吞吐量
背压支持消费者可主动告知生产者自己的处理能力,防止数据积压
声明式编程通过操作符组合描述 “做什么”,而非 “怎么做”
数据流组合支持复杂的流组合(合并、连接、嵌套等)
完善的错误处理提供丰富的错误捕获、恢复和传递机制
与 Java 生态融合兼容 Java 8 + 的 Stream API,支持 CompletableFuture 转换
轻量级核心库体积小,无强依赖

1.5 Reactor与其它响应式框架比较

flowchart LRA[响应式框架] --> B[Reactor]A --> C[RxJava]A --> D[Akka Streams]B --> B1[与Spring生态深度集成]B --> B2[严格遵循Reactive Streams]B --> B3[专为Java 8+优化]B --> B4[更简洁的API设计]C --> C1[更早出现,生态成熟]C --> C2[支持多语言]C --> C3[操作符更丰富但复杂]D --> D1[基于Actor模型]D --> D2[分布式场景优势]D --> D3[学习曲线陡峭]

Reactor 的独特优势在于:

  • 与 Spring WebFlux、Spring Cloud Gateway 等现代 Spring 组件无缝集成
  • 对 Java 新特性(如虚拟线程、密封类)的原生支持
  • 更简洁的 API 设计,降低响应式编程的学习门槛

二、Reactor核心类型

2.1 Reactor 核心概念

Reactive Streams
Reactor
Publisher
Flux: 0..N elements
Mono: 0..1 elements
Operators
Transformation
Filtering
Combination
Error Handling

Reactor执行流程

SubscriberPublisher (Flux/Mono)OperatorsSchedulersubscribe()创建操作链安排执行(如果需要)在指定线程执行onSubscribe(Subscription)request(n)请求数据onNext(data)应用转换/过滤onNext(processedData)request(m) (更多数据)onComplete() (数据完成)onComplete()错误处理路径onError(throwable)onError(throwable)SubscriberPublisher (Flux/Mono)OperatorsScheduler

2.2 核心类型

Reactor 提供了两个核心发布者类型:

类型特点适用场景
Mono<T>0 或 1 个元素的异步序列单个结果(如 HTTP 请求、数据库查询)
Flux<T>0 到 N 个元素的异步序列多个结果(如列表、事件流)

2.3 Mono【0个或者1个元素的流】

Mono用于表示包含 0 或 1 个元素的异步结果,适合处理单次操作(如数据库查询、HTTP 请求)的结果。

// 创建Mono【相当于事件的发布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "动态计算值"); // 延迟计算// 订阅Mono(触发执行)
mono.subscribe(value -> System.out.println("接收值:" + value), // 成功回调error -> System.err.println("错误:" + error), // 错误回调() -> System.out.println("完成") // 完成回调
);

2.4 Flux【0到N个元素的流】

Flux用于表示包含 0 到多个元素的异步数据流,支持完整的生命周期(正常结束、错误终止)。常见场景:集合数据处理、事件流、批量操作等。

// 创建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范围1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成递增数(需手动取消订阅)// 订阅Flux
flux.map(x -> x * 2) // 转换操作符.filter(x -> x % 3 != 0) // 过滤操作符.subscribe(System.out::println, // 简化写法:仅处理成功事件Throwable::printStackTrace,() -> System.out.println("Flux完成"));

2.5 数据流生命周期

无论是Flux还是Mono,都遵循相同的生命周期:

  • 正常事件:通过onNext()发送元素(Flux可多次调用,Mono最多调用一次)
  • 终止事件:
    • 成功终止:onComplete()(无元素发送)
    • 错误终止:onError(Throwable)(携带异常信息)
订阅(subscribe)
onNext(元素)
onComplete()
onError(异常)
初始化
运行中
完成
错误

2.6 Reactor数据流模型

subscribe
request(n)
onNext
onError
onComplete
Publisher
Subscriber

2.7 操作符链式调用

Flux.just(1,2,3)
.map(x*2)
.filter(>5)
.log()
.subscribe()

2.8 线程切换时序图

MainboundedElasticparallelsubscribeOn()map() 执行publishOn()subscribe() 回调MainboundedElasticparallel

三、基础应用

引入Maven依赖:

<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2024.0.6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>
</dependencies>

3.1 基础Mono使用

@Test
public void monoBasicTest() {// 1. 创建一个Mono对象(发射一个字符串)Mono<String> mono = Mono.just("Hello, Reactor!");// 2. 订阅并消费mono.subscribe(value -> System.out.println("✅ 接收到: " + value),error -> System.err.println("❌ 错误: " + error),() -> System.out.println("🎉 完成"),subscription -> {System.out.println("🔗 订阅建立");subscription.request(1); // 背压:请求 1 个});
}

在这里插入图片描述

3.2 基础Flux使用

@Test
public void fluxBasicTest() {// 创建一个Flux对象(发射多个字符串)Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail").map(String::toUpperCase).filter(s -> s.length() > 5).log();flux.subscribe(System.out::println,System.err::println,() -> System.out.println("流结束"));
}

在这里插入图片描述

🔍 log() 是调试利器,可查看所有信号(onNext, onError, onComplete)。

3.3 异步与线程切换

@Test
public void asyncTest(){Flux.just("张小三", "A", "B", "C").map(data -> {System.out.println("🔄 处理线程: " + Thread.currentThread().getName());return data + "-processed";}).subscribeOn(Schedulers.boundedElastic()) // 订阅在弹性线程池.publishOn(Schedulers.parallel()) // 发布在并行线程池.subscribe(result -> {System.out.println("📩 接收线程: " + Thread.currentThread().getName() + ", 数据: " + result);});System.out.println("MAIN THREAD: " + Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}
}

在这里插入图片描述

⚠️ subscribeOn() 影响上游执行线程,publishOn() 影响下游执行线程。

3.4 背压(Backpressure)演示

/*** 背压演示*/@Testpublic void backPressureTest() {Flux.range(1, 1000).onBackpressureDrop(item -> System.out.println("🗑️ 丢弃: " + item)) // 缓冲区满时丢弃.subscribe(new CoreSubscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(10);  // 初始请求 10 个}@Overridepublic void onNext(Integer item) {System.out.println("✅ 接收: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {}subscription.request(1); // 每处理一个再要一个}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("✅ 完成");}});try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}

在这里插入图片描述

3.5 错误处理

/*** 错误处理*/
@Test
public void errorHandlerTest() {Flux.range(1, 5).map(i -> {if (i == 3) throw new RuntimeException("模拟错误");return "Item " + i;}).onErrorResume(e -> {System.err.println("⚠️ 捕获错误: " + e.getMessage());return Flux.just("Fallback 1", "Fallback 2"); // 错误后返回备用数据}).retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重试 2 次.subscribe(System.out::println);
}

在这里插入图片描述

http://www.dtcms.com/a/355220.html

相关文章:

  • React 类生命周期 和 React Hooks 比对
  • 算力沸腾时代,如何保持“冷静”?国鑫液冷SY4108G-G4解锁AI服务器的“绿色空调”!
  • 第五章:Go运行时、内存管理与性能优化之性能分析与pprof工具
  • 配置windows下apache+PHP环境
  • 前端技术之---复制文本
  • docker安装kafka、zookeeper详细步骤
  • 【TEC045-KIT】基于复旦微 FMQL45T900 的全国产化 ARM 开发套件
  • COLMAP 和 SFM的关系是什么?
  • 微服务即时通信系统(十三)--- 项目部署
  • 第十七章 Java基础-常用API-System
  • ArkTS 与 TypeScript 的关系及鸿蒙开发常见错误案例
  • Upload Symbols Failed
  • 万字详解架构设计:业务架构、应用架构、数据架构、技术架构、单体、分布式、微服务都是什么?
  • 只用三招,无招重启钉钉
  • Video Ocean 接入 GPT-5
  • GeoScene Maps 开发-核心地图-标记点管理-用户交互弹窗
  • 大白话拆解力扣算法 HOT 100 - 哈希/双指针/滑动窗口
  • Mac Pro M4芯片 安装 VMware Fusion 和 windows
  • Vue Router 路由守卫详解与面试指南
  • 实体门店怎么利用小程序做好分销
  • 目标检测领域基本概念
  • 【Python】QT(PySide2、PyQt5):Qt Designer,VS Code使用designer,可能的报错
  • 发那科机器人弧焊电源气体省气装置
  • esp32c2 at 请问通过HTTPS进行OTA升级的AT命令流程有吗?
  • 专项智能练习(多媒体概述)
  • 如果已经安装了electron的一个版本,再次使用命令npm install electron不指定electron版本时,会下载安装新版本么?
  • VS2022+QT6.7+Multimedia(捕获Windows音频数据,生成实时频谱)
  • Day16_【机器学习建模流程】
  • Python备份实战专栏第2/6篇:30分钟搭建企业级API认证系统,安全性吊打90%的方案
  • R语言贝叶斯方法在生态环境领域中的高阶技术应用