当前位置: 首页 > news >正文

Project Reactor响应式编程简介

前言:Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。在响应式编程(如 Project Reactor)中,理解 发布(Publish)与订阅(Subscribe)、生产者(Producer)与消费者(Consumer) 的概念非常重要。它们是构建异步、非阻塞数据流的基础模型。


一、Reactor基本概念

1. 发布者(Publisher)

  • 是数据的提供方。

  • 在 Project Reactor 中,Flux 和 Mono 都实现了 Publisher<T> 接口。

  • 它不主动发送数据,而是等待被订阅后才开始发射数据。

类比:就像一个电台频道,在没有人收听时它不会“广播”内容,只有当有人打开收音机(订阅),才会开始播放节目。

Flux<String> publisher = Flux.just("A", "B", "C"); // Publisher

2. 订阅者(Subscriber)

  • 是数据的接收方。

  • 实现 Subscriber<T> 接口,或者使用 .subscribe() 方法作为简化方式。

  • 订阅者会通过回调方法接收数据(onNext)、异常(onError)或完成信号(onComplete)。

publisher.subscribe(data -> System.out.println("Received: " + data), // onNexterr -> System.err.println("Error: " + err),      // onError() -> System.out.println("Done!")                // onComplete
);

3. 订阅(Subscription)

  • 是连接 Publisher 和 Subscriber 的桥梁。

  • 每次调用 .subscribe() 都会创建一个新的 Subscription

  • 支持背压(backpressure)控制:消费者可以告诉生产者“我一次只能处理 N 个元素”。

publisher.subscribe(new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(1); // 请求第一个数据}@Overridepublic void onNext(String t) {System.out.println("Got: " + t);subscription.request(1); // 继续请求下一个}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Completed");}
});

二、生产者和消费者模型(Producer/Consumer)

角色描述
生产者(Producer)提供数据流的一方,即 Publisher(如 FluxMono
消费者(Consumer)接收并处理数据的一方,即 Subscriber
  • 数据从生产者流向消费者。

  • 这种模型支持异步非阻塞的数据传输。

  • 可以通过 背压机制 控制流量,避免消费者被过量数据淹没。


三、Reactor 中的发布与订阅流程

[Publisher] --> (onSubscribe) --> [Subscriber]↓(request)↓
[Publisher emits data via onNext]↓
[Subscriber receives data]↓
[Eventually onComplete or onError]

流程说明:

  1. 订阅建立

    • 调用 .subscribe() 后,Publisher 会调用 onSubscribe(Subscription)

  2. 请求数据

    • Subscriber 调用 subscription.request(n) 表示希望接收 n 个数据。

  3. 数据发射

    • Publisher 发射数据项,调用 onNext(T)

  4. 结束或错误

    • 成功结束:调用 onComplete()

    • 出错:调用 onError(Throwable)


 四、实际应用举例

示例:模拟生产者和消费者的协作(带背压)

Flux.range(1, 100).subscribe(new Subscriber<>() {private Subscription subscription;private int count = 0;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(5); // 初始请求5个数据}@Overridepublic void onNext(Integer integer) {System.out.println("Consuming: " + integer);count++;if (count % 5 == 0) {subscription.request(5); // 每消费5个再请求5个}}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("All items consumed!");}});

 五、常见误区

错误理解正确理解
Flux.just(...) 会立即发射数据不会,除非有订阅者才会发射
Flux 是热源(Hot)默认是冷源(Cold),每次订阅都会重新开始
subscribe() 返回值无关紧要可用于取消订阅(返回 Disposable
所有操作符都是同步的很多操作符是异步的,比如 flatMapdelayElements 等

六、总结

概念说明
Publisher数据源,如 Flux 或 Mono
Subscriber数据消费者,实现 onNextonErroronComplete
Subscription控制数据流动的接口,支持背压
生产者/消费者模型数据从生产者流向消费者,由订阅驱动
背压(Backpressure)消费者可以控制生产者的发射速率
冷流 vs 热流冷流每次订阅都从头开始;热流共享数据流(如 ConnectableFlux

如果你正在使用 Spring WebFluxRSocketKafka Streams 或其他响应式框架,理解这些核心概念将帮助你更好地设计和调试异步系统。

相关文章:

  • Leetcode刷题(91~95)
  • 商品中心—3.商品可采可补可售的技术文档上
  • 与AI联手,ModbusTCP 转Ethercat控制系统升级解决刚需新思路
  • MyBatis-Plus 混合使用 XML 和注解
  • 一个教学项目pom.xml杂记
  • DevOps软件开发流程规范
  • 【笔记】NVIDIA AI Workbench 中安装 PyTorch
  • 山东大学软件学院项目实训-基于大模型的模拟面试系统-面试对话标题自动总结
  • 【计算机存储架构】层次化存储架构
  • JAVA-springboot Filter过滤器
  • Amazon Linux 2023 系统上 Radius 部署文档
  • 1Panel 部署 OpenResty + Redis 实现 IP 动态封禁教程
  • gbase8s数据库获取jdbc/odbc协议的几种方式
  • 合同管理登记台账是什么?合同管理登记台账有哪些功能?
  • 基于GA遗传优化的PID控制器最优控制参数整定matlab仿真
  • Matlab解决无法读取路径中的空格
  • 前端实战:用 HTML+JS 打造可拖动图像对比滑块,提升视觉交互体
  • 硬件行业职业规划四篇
  • (功能测试Charles)如何抓取手机http的报文
  • 软件测试之基于博客系统项目的功能测试
  • 建设求职网站/昆明自动seo
  • php网站开发实例报告/360广告投放平台
  • 做兼职翻译的网站/网站链接交易
  • 备案的时候网站要建设好吗/cpa广告联盟平台
  • 网站广告怎么做/国内搜索引擎排名2022
  • 网站开发能不能用win7系统/seo实战培训王乃用