当前位置: 首页 > news >正文

响应式编程框架Reactor【9】

文章目录

  • 一、Reactor核心目标
  • 二、Reactor 核心知识点
    • 2.1 响应式编程范式
    • 2.2 核心类型:Mono与Flux
    • 2.3 操作符
    • 2.4 背压(Backpressure)
    • 2.5 调度与线程模型(核心!)
        • `subscribeOn` vs `publishOn`
    • 2.6 错误处理
  • 三、底层实现原理
    • 3.1 响应式流协议(Reactive Streams)
    • 3.2 操作符的链式结构
    • 3.3 异步与线程切换原理
  • 四、典型应用场景
    • 4.1 WebFlux 微服务(Spring 生态)
    • 4.2 异步数据库访问(R2DBC)
    • 4.3 实时事件推送(WebSocket / SSE)
    • 4.4 微服务间通信
  • 四、实践经验与最佳实战
  • 五、总结

一、Reactor核心目标

在高并发、低延迟的现代系统中,阻塞式编程(如 synchronized, Thread.sleep())会导致线程资源耗尽,系统吞吐量急剧下降。

Reactor 基于 Reactive Streams 规范,提供了一套非阻塞、异步、支持背压的响应式编程模型,是构建高性能、高并发系统的利器。

核心目标:用更少的线程处理更多的请求。

二、Reactor 核心知识点

2.1 响应式编程范式

响应式编程是一种基于数据流和变化传播的编程范式。一切皆为流(Stream),你可以对流进行声明式操作(map, filter, flatMap 等)。

// 声明式:我关心“做什么”,而不是“怎么做”
Flux.just(1, 2, 3).map(x -> x * 2).filter(x -> x > 3).subscribe(System.out::println);
// 输出:4, 6

2.2 核心类型:Mono与Flux

类型元素数量用途
Mono<T>0 或 1 个单个结果(如 HTTP 请求、数据库查询)
Flux<T>0 到 N 个多个结果(如列表、事件流)
// ✅ Mono 示例
Mono<String> user = Mono.just("Alice");
Mono<Void> save = userRepository.save(userEntity); // 无返回值// ✅ Flux 示例
Flux<String> users = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> numbers = Flux.range(1, 5); // 1,2,3,4,5

2.3 操作符

Reactor 提供了丰富的操作符,所有操作符都是冷流(Cold Stream),即订阅时才执行

import reactor.core.publisher.Flux;
import java.time.Duration;public class OperatorsExample {public static void main(String[] args) throws InterruptedException {Flux.range(1, 10).filter(n -> n % 2 == 0)                    // 过滤偶数.map(n -> "Item-" + n)                      // 转换为字符串.delayElements(Duration.ofMillis(100))      // 每 100ms 发一个.doOnNext(System.out::println)              // 副作用:打印.take(3)                                    // 只取前 3 个.subscribe();Thread.sleep(500); // 等待输出}
}
// 输出:Item-2, Item-4, Item-6

🔍 delayElements 证明了非阻塞特性:不会阻塞主线程

2.4 背压(Backpressure)

问题:生产者太快,消费者太慢,导致内存溢出。

解决方案:背压 —— 消费者主动控制请求的数据量。

Flux.range(1, 1000).onBackpressureBuffer(100) // 缓冲区最多 100 个.onBackpressureDrop(item -> System.out.println("丢弃: " + item)) // 超出则丢弃.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(10); // 初始请求 10 个}@Overrideprotected void hookOnNext(Integer value) {System.out.println("处理: " + value);request(1); // 处理完一个,再要一个}});

2.5 调度与线程模型(核心!)

subscribeOn vs publishOn
Flux.just("A", "B").map(data -> {System.out.println("上游线程: " + Thread.currentThread().getName());return data + "-1";}).subscribeOn(Schedulers.boundedElastic()) // 影响上游执行线程.map(data -> {System.out.println("下游线程: " + Thread.currentThread().getName());return data + "-2";}).publishOn(Schedulers.parallel())         // 切换下游执行线程.subscribe(result -> System.out.println("订阅线程: " + Thread.currentThread().getName() + " => " + result));

🎯 关键区别

  • subscribeOn影响整个上游(从源头到当前位置)
  • publishOn只影响其后的下游操作

2.6 错误处理

Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("出错了");return "Result-" + n;}).onErrorResume(e -> {System.err.println("捕获错误: " + e.getMessage());return Flux.just("Fallback-1", "Fallback-2");}).retry(2) // 重试 2 次.subscribe(System.out::println);

三、底层实现原理

3.1 响应式流协议(Reactive Streams)

Reactor 实现了 Publisher, Subscriber, Subscription, Processor 四大接口。

public interface Publisher<T> {void subscribe(Subscriber<? super T> s);
}public interface Subscriber<T> {void onSubscribe(Subscription s);void onNext(T t);void onError(Throwable t);void onComplete();
}

3.2 操作符的链式结构

Flux.just
.filter
.map
.delayElements
.take
.subscribe

🔍 订阅时,链表逆向建立订阅关系,从 subscribe 回溯到源头。

3.3 异步与线程切换原理

  • Schedulers 封装了 ExecutorService
  • publishOn 内部使用 Queue + Worker 实现线程切换。
  • subscribeOn 在源头就切换执行线程。

四、典型应用场景

4.1 WebFlux 微服务(Spring 生态)

@RestController
public class UserController {@GetMapping("/users/{id}")public Mono<User> getUser(@PathVariable String id) {return userService.findById(id); // 非阻塞返回}@GetMapping("/users")public Flux<User> getAllUsers() {return userService.findAll(); // 流式返回}
}

✅ 优势:高并发下内存占用低,吞吐量高。

4.2 异步数据库访问(R2DBC)

@Repository
public class UserRepository {@Autowiredprivate DatabaseClient client;public Mono<User> findById(String id) {return client.sql("SELECT * FROM users WHERE id = $1").bind(0, id).map(row -> User.from(row)).one();}
}

✅ 替代 JDBC,实现真正的非阻塞数据库访问。

4.3 实时事件推送(WebSocket / SSE)

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEvents() {return eventBus.getEventFlux() // 事件流.map(event -> "Event: " + event.getData());
}

4.4 微服务间通信

WebClient client = WebClient.create("https://api.example.com");Mono<User> user = client.get().uri("/users/123").retrieve().bodyToMono(User.class); // 非阻塞调用

✅ 替代 RestTemplate,异步非阻塞。

四、实践经验与最佳实战

✅ 正确实践

  • I/O 操作Schedulers.boundedElastic()

  • CPU 计算Schedulers.parallel()

  • 避免在 map 中阻塞

  • 使用 StepVerifier 测试响应式流

❌ 常见陷阱

优点缺点
✅ 高并发、低延迟❌ 学习曲线陡峭
✅ 资源利用率高(线程少)❌ 调试困难(异步栈)
✅ 支持背压,防止 OOM❌ 与阻塞库集成复杂
✅ 与 Spring 深度集成❌ 团队技能要求高

五、总结

  • Reactor 是 Java 响应式编程的事实标准,其核心价值在于:

  • 非阻塞异步:提升系统吞吐量

    背压机制:保障系统稳定性

    声明式编程:代码更简洁、可读

  • 与 Spring 生态无缝集成

🚀 适用场景:高并发 Web 服务、实时系统、微服务、事件驱动架构。

⚠️ 不适用场景:简单 CRUD、低并发、团队无响应式经验。

掌握 Reactor,意味着你掌握了构建现代高性能系统的“核武器”。但切记:不要为了响应式而响应式,选择合适的工具解决合适的问题

http://www.dtcms.com/a/360892.html

相关文章:

  • 《论文阅读》从心到词:通过综合比喻语言和语义上下文信号产生同理心反应 2025 ACL findings
  • 【HTML】draggable 属性:解锁网页交互新维度
  • SpringAI模型评估
  • python爬虫之requests库的使用(小白五分钟从入门到精通)
  • Selenium 自动化测试实战:绕过登录直接获取 Cookie
  • 如何用AI视频增强清晰度软件解决画质模糊问题
  • 血缘元数据采集开放标准:OpenLineage Guides 使用 Apache Airflow® 和 OpenLineage + Marquez 入门
  • IPC 进程间通信 interprocess communicate
  • 【macOS】垃圾箱中文件无法清理的“含特殊字符文件名”的方法
  • 应用平台更新:可定制目录、基于Git的密钥管理与K8s项目自动化管理
  • Python 爬虫基础教学
  • C#/.NET/.NET Core技术前沿周刊 | 第 52 期(2025年8.25-8.31)
  • C++ 面试高频考点 力扣 35. 搜索插入位置 二分查找 左右端点查找 题解 每日一题
  • RocksDB 在 macOS M 系列 上运行时报错的解决方案
  • 【公告】更新预告
  • vite基础讲解
  • 超越Transformer:语言模型未来的认知革命与架构重构
  • Golang之GoWorld深度解析:基于Go语言的分布式游戏服务器框架
  • 新启航技术白皮书:激光频率梳如何实现 130mm 深孔 2μm 级无遮挡 3D 轮廓测量
  • OpenCV-Python Tutorial : A Candy from Official Main Page(五)
  • 使用Spring Boot对接印度股票市场API开发实践
  • Burp Suite 插件 | 提供强大的框架自动化安全扫描功能。目前支持1000+POC、支持动态加载POC、指定框架扫描。
  • 一体化运维平台自动化模块:3 大场景解放运维双手
  • 开发中使用——鸿蒙CoreSpeechKit语音识别
  • 复杂计算任务的智能轮询优化实战
  • 教育项目管理工具新趋势:可视化与自动化如何提升效率?
  • 使用ansible的playbook完成以下操作
  • TFS-2010《Fuzzy PCA-Guided Robust k-Means Clustering》
  • macOS中Homebrew安装PHP的详细步骤(五)
  • React学习教程,从入门到精通, React 入门指南:创建 React 应用程序的语法知识点(7)