当前位置: 首页 > news >正文

Reactor 事件流 vs. Spring 事件 (ApplicationEvent)

Reactor 事件流 vs. Spring 事件 ApplicationEvent

  • Reactor 事件流 vs. Spring 事件 (`ApplicationEvent`)
    • 1️⃣ 核心区别
    • 2️⃣ Spring 事件 (`ApplicationEvent`)
      • ✅ 示例:Spring 事件发布 & 监听
        • 1️⃣ 定义事件
        • 2️⃣ 发布事件
        • 3️⃣ 监听事件
        • 🔹 进阶:异步监听
    • 3️⃣ Reactor 事件流(`Flux` / `Mono` / `Sinks.Many`)
      • ✅ 示例:事件流处理
        • 1️⃣ 冷流(每个订阅者独立获取数据)
          • ✅ 方法 1:使用 `Flux.create()`,手动推送数据
          • **✅ 方法 2:使用 `Flux.generate()`,按需推送数据**
          • **✅ 方法 3:使用 `Sinks.many().multicast()`,支持多个订阅者**
          • **✅ 结论**
        • 2️⃣ 热流(共享事件流)
        • **📌 ReplayProcessor:可重放历史事件的 Reactor 处理器**
        • **📌 1️⃣ 关键特点**
        • **📌 2️⃣ 基本使用**
        • **📌 运行结果**
        • **📌 3️⃣ `ReplayProcessor` vs `Sinks.Many`**
        • **📌 4️⃣ 适用场景**
    • 4️⃣ 什么时候用哪个?
    • 5️⃣ 总结

Reactor 事件流 vs. Spring 事件 (ApplicationEvent)

1️⃣ 核心区别

特性Spring ApplicationEventReactor Flux /Sinks.Many
数据处理方式一次性事件(同步或异步)流式处理(持续事件流)
是否支持多个订阅者✅ 支持(多个 @EventListener✅ 支持(Sinks.Many 广播)
是否支持流式操作❌ 不支持支持map(), filter(), zip()
是否支持回放历史事件❌ 不支持❌(默认不支持,但可用 ReplayProcessor
适用场景业务事件通知(用户注册、订单支付)高吞吐数据流处理(日志、消息队列、WebFlux)

2️⃣ Spring 事件 (ApplicationEvent)

🔹 特点

  • 适用于应用内部组件通信,解耦业务逻辑。
  • 默认同步,可使用 @Async 进行异步处理。
  • 一次性事件,无法流式处理。

✅ 示例:Spring 事件发布 & 监听

1️⃣ 定义事件
public class UserRegisteredEvent extends ApplicationEvent {
    private final String username;
    public UserRegisteredEvent(Object source, String username) {
        super(source);
        this.username = username;
    }
    public String getUsername() { return username; }
}
2️⃣ 发布事件
@Component
public class UserService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    public void registerUser(String username) {
        eventPublisher.publishEvent(new UserRegisteredEvent(this, username));
    }
}
3️⃣ 监听事件
@Component
public class WelcomeEmailListener {
    @EventListener
    public void handleUserRegisteredEvent(UserRegisteredEvent event) {
        System.out.println("📧 发送欢迎邮件给: " + event.getUsername());
    }
}

可多个 @EventListener 监听同一个事件,同时触发

@Component
public class LoggingListener {
    @EventListener
    public void logUserRegisteredEvent(UserRegisteredEvent event) {
        System.out.println("📜 记录日志: 用户 " + event.getUsername() + " 已注册");
    }
}
@Component
public class PointsRewardListener {
    @EventListener
    public void giveWelcomePoints(UserRegisteredEvent event) {
        System.out.println("🎁 赠送 100 积分给: " + event.getUsername());
    }
}
🔹 进阶:异步监听

🔹 1️⃣ 监听器可以指定异步 需要启用 Spring 异步,@EnableAsync

@Async
@EventListener
public void sendWelcomeEmail(UserRegisteredEvent event) {
    System.out.println("📧 发送欢迎邮件给: " + event.getUsername() + " [异步]");
}

🔹 2️⃣ 监听多个事件 如果多个事件需要相同的处理逻辑,你可以用 classes 监听多个事件:

@EventListener(classes = {UserRegisteredEvent.class, OrderPlacedEvent.class})
public void handleMultipleEvents(Object event) {
    System.out.println("📢 事件触发: " + event.getClass().getSimpleName());
}

🔹 3️⃣ 条件监听 可以使用 condition 属性,让监听器只处理 符合条件 的事件:

@EventListener(condition = "#event.username.startsWith('A')")
public void handleUserStartingWithA(UserRegisteredEvent event) {
    System.out.println("🎯 处理用户名以 A 开头的用户: " + event.getUsername());
}

🔹 适用场景 ✅ 适用于业务事件通知(如用户注册、订单支付)。 ❌ 不适用于流式数据处理

3️⃣ Reactor 事件流(Flux / Mono / Sinks.Many

🔹 特点

  • 异步 & 流式 处理,可以并行、合并、过滤、转换数据。
  • 冷流(Flux、Mono) 每个订阅者独立处理数据
  • 热流(Sinks.Many) 可用于事件广播

✅ 示例:事件流处理

1️⃣ 冷流(每个订阅者独立获取数据)
Flux<String> flux = Flux.just("事件1", "事件2", "事件3");
flux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));
flux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));
✅ 方法 1:使用 Flux.create(),手动推送数据
 import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CustomFluxExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<String> customFlux = Flux.create(emitter -> {
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
                String event = "自定义事件:" + System.currentTimeMillis();
                System.out.println("发布:" + event);
                emitter.next(event);
            }, 0, 1, TimeUnit.SECONDS);
        }, FluxSink.OverflowStrategy.BUFFER);

        // 订阅者 1
        customFlux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));

        Thread.sleep(5000); // 5 秒后再添加订阅者

        // 订阅者 2
        customFlux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));

        Thread.sleep(10000); // 让主线程等待一会儿,看效果
    }
}

🔹 运行结果

python-repl复制编辑发布:自定义事件:1712101234567
订阅者 1 收到:自定义事件:1712101234567
发布:自定义事件:1712101235567
订阅者 1 收到:自定义事件:1712101235567
发布:自定义事件:1712101236567
订阅者 1 收到:自定义事件:1712101236567
发布:自定义事件:1712101237567
订阅者 1 收到:自定义事件:1712101237567
发布:自定义事件:1712101238567
订阅者 1 收到:自定义事件:1712101238567
(5 秒后,订阅者 2 加入)
订阅者 2 收到:自定义事件:1712101239567
订阅者 1 收到:自定义事件:1712101239567
发布:自定义事件:1712101240567
订阅者 2 收到:自定义事件:1712101240567
订阅者 1 收到:自定义事件:1712101240567
...

📌 特点

  1. 你可以随时手动推送数据(每 1 秒发布一次)。

  2. 新订阅者不会收到历史数据,只会接收到之后的事件(如果你想让新订阅者也能收到历史数据,可以用 .replay())。

    示例:

    Flux<String> flux = Flux.just("事件A", "事件B", "事件C")
                            .replay(2)  // 缓存最后 2 个事件
                            .autoConnect();  // 至少一个订阅者连接后开始发布
    
    flux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));
    
    // 新的订阅者会从缓存中接收事件
    flux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));
    
    

    输出:

    订阅者 1 收到: 事件A
    订阅者 1 收到: 事件B
    订阅者 1 收到: 事件C
    订阅者 2 收到: 事件B
    订阅者 2 收到: 事件C
    
  3. 不会自动结束,Flux 会一直运行

✅ 方法 2:使用 Flux.generate(),按需推送数据

如果你的数据是基于前一个数据计算出来的,可以使用 Flux.generate()

import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

public class GenerateFluxExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<String> generatedFlux = Flux.generate(
                () -> new AtomicInteger(1),  // 初始状态
                (state, sink) -> {
                    String event = "事件 " + state.getAndIncrement();
                    System.out.println("发布:" + event);
                    sink.next(event);
                    try { Thread.sleep(1000); } catch (InterruptedException e) {}
                    return state;
                });

        // 订阅者 1
        generatedFlux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));

        Thread.sleep(5000); // 5 秒后再添加订阅者

        // 订阅者 2
        generatedFlux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));

        Thread.sleep(10000);
    }
}

🔹 运行结果

python-repl复制编辑发布:事件 1
订阅者 1 收到:事件 1
发布:事件 2
订阅者 1 收到:事件 2
发布:事件 3
订阅者 1 收到:事件 3
发布:事件 4
订阅者 1 收到:事件 4
发布:事件 5
订阅者 1 收到:事件 5
(5 秒后,订阅者 2 加入)
发布:事件 6
订阅者 1 收到:事件 6
订阅者 2 收到:事件 6
发布:事件 7
订阅者 1 收到:事件 7
订阅者 2 收到:事件 7
...

📌 区别

  • Flux.generate() 一次只能推送一个数据,适合基于状态逐步生成数据
  • Flux.create() 可以异步推送多个数据,适合事件流、网络请求等异步数据
✅ 方法 3:使用 Sinks.many().multicast(),支持多个订阅者

如果你希望多个订阅者可以同时消费,并且可以随时加入

import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.time.Duration;

public class SinkExample {
    public static void main(String[] args) throws InterruptedException {
        Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
        Flux<String> flux = sink.asFlux();

        // 订阅者 1
        flux.subscribe(event -> System.out.println("订阅者 1 收到:" + event));

        // 模拟定时推送数据
        new Thread(() -> {
            int i = 1;
            while (true) {
                String event = "事件 " + i++;
                System.out.println("发布:" + event);
                sink.tryEmitNext(event);
                try { Thread.sleep(1000); } catch (InterruptedException e) { }
            }
        }).start();

        Thread.sleep(5000); // 5 秒后再添加订阅者

        // 订阅者 2
        flux.subscribe(event -> System.out.println("订阅者 2 收到:" + event));

        Thread.sleep(10000);
    }
}

🔹 运行结果

python-repl复制编辑发布:事件 1
订阅者 1 收到:事件 1
发布:事件 2
订阅者 1 收到:事件 2
发布:事件 3
订阅者 1 收到:事件 3
发布:事件 4
订阅者 1 收到:事件 4
发布:事件 5
订阅者 1 收到:事件 5
(5 秒后,订阅者 2 加入)
发布:事件 6
订阅者 1 收到:事件 6
订阅者 2 收到:事件 6
发布:事件 7
订阅者 1 收到:事件 7
订阅者 2 收到:事件 7
...

📌 特点

  • Sinks.many().multicast() 允许多个订阅者同时消费
  • 适用于 WebSocket、事件总线、消息队列等场景

✅ 结论
方式特点适用场景
Flux.create()手动推送数据,支持异步适合事件流、消息队列、WebSocket
Flux.generate()按需推送,每次一个适合基于前一个状态生成新数据
Sinks.many().multicast()支持多个订阅者,实时推送适合多订阅者共享数据
2️⃣ 热流(共享事件流)
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = sink.asFlux();

hotFlux.subscribe(event -> System.out.println("订阅者 1 收到: " + event));
hotFlux.subscribe(event -> System.out.println("订阅者 2 收到: " + event));

sink.tryEmitNext("全局事件 1");
sink.tryEmitNext("全局事件 2");

🔹 适用场景 ✅ 适用于高吞吐、异步、多订阅者的事件流。 ✅ 适用于数据流式处理(如 WebFlux、消息队列、日志流)。 ❌ 不适用于简单的业务事件通知

📌 ReplayProcessor:可重放历史事件的 Reactor 处理器

ReplayProcessor<T> 是 Reactor 提供的一种 热流(Hot Publisher),它允许新的订阅者 回放之前发送的事件。适用于 日志系统、消息队列、数据缓存 等场景。


📌 1️⃣ 关键特点

存储历史事件,新的订阅者可以看到之前的事件。
类似 RxJava 的 ReplaySubject,可以 指定缓存大小
适用于需要回放数据的场景,如 日志系统、WebSocket 消息队列


📌 2️⃣ 基本使用
import reactor.core.publisher.ReplayProcessor;

public class ReplayProcessorExample {
    public static void main(String[] args) {
        // 创建 ReplayProcessor,缓存 2 条数据
        ReplayProcessor<String> processor = ReplayProcessor.create(2);

        // 订阅者 1 订阅
        processor.subscribe(data -> System.out.println("订阅者 1 收到:" + data));

        // 发送数据
        processor.onNext("事件 A");
        processor.onNext("事件 B");
        processor.onNext("事件 C");

        // 订阅者 2 订阅
        processor.subscribe(data -> System.out.println("订阅者 2 收到:" + data));

        // 发送更多数据
        processor.onNext("事件 D");
    }
}
📌 运行结果
mathematica复制编辑订阅者 1 收到:事件 A
订阅者 1 收到:事件 B
订阅者 1 收到:事件 C
订阅者 2 收到:事件 B  // 只回放最近 2 条(事件 B 和 C)
订阅者 2 收到:事件 C
订阅者 1 收到:事件 D
订阅者 2 收到:事件 D

📌 说明

  • ReplayProcessor.create(2) 最多缓存 2 条数据,新订阅者只能收到最近的 2 条事件。
  • 订阅者 1 先订阅,它会收到所有事件。
  • 订阅者 2 后订阅,但它可以收到最近的 2 条缓存(事件 B 和 C)。

📌 3️⃣ ReplayProcessor vs Sinks.Many
特性ReplayProcessorSinks.Many(Multicast)
是否缓存历史数据,可指定缓存大小,不会缓存历史数据
新订阅者是否能收到旧数据可以不能
适用场景回放数据,如日志、历史消息实时消息推送,不存储历史

📌 4️⃣ 适用场景

日志回放(新订阅者也能看到之前的日志)。
聊天系统(新加入的用户可以看到最近的聊天记录)。
数据缓存(保存最近 N 条数据,避免重复请求)。

🚀 如果你需要 缓存历史数据,并让新的订阅者能收到过去的事件ReplayProcessor 是一个很好的选择!

4️⃣ 什么时候用哪个?

使用 ApplicationEvent(Spring 事件)

  • 简单的应用事件通知(如用户注册、邮件通知、订单完成)。
  • 解耦业务逻辑,但不需要流式处理

使用 Reactor 事件流

  • 高吞吐、并发的数据流(日志流、消息流、WebFlux)。
  • 多个订阅者同时消费事件,如广播消息、实时数据推送

结合使用(最佳实践)

@EventListener
public void handleUserRegisteredEvent(UserRegisteredEvent event) {
    Flux.just(event)
        .map(e -> "处理事件: " + e.getUsername())
        .doOnNext(System.out::println)
        .subscribe();
}

5️⃣ 总结

方案订阅者是否独立事件是否广播适用场景
Spring ApplicationEvent✅ 每个订阅者独立❌ 不是广播业务事件通知
Reactor Flux(冷流)✅ 每个订阅者独立❌ 不是广播流式数据处理
Reactor Sinks.Many(热流)❌ 共享数据流✅ 所有订阅者都收到事件驱动架构

🚀 如果你是做 WebFlux、日志流、消息队列,选 Reactor!如果是应用内部事件解耦,选 Spring ApplicationEvent 🎯

相关文章:

  • [cpp] cpp11--condition_variable(条件变量)
  • 【ESP32】VSCode配置ESP-IDF问题及解决方法
  • Promise的状态和方法是什么?
  • OpenHarmony子系统开发 - init启动引导组件(八)
  • 【AI编程学习之Python】第一天:Python的介绍
  • Python_电商erp自动拆分组合编码
  • Kafka中的消息是如何存储的?
  • 软件工程面试题(九)
  • CXL UIO Direct P2P学习
  • Python 服务器部署全解析:API 调用、数据处理与展示
  • 头歌 | Linux之用户高级管理
  • MYTOOL-笔记
  • Linux系统编程 | 线程的基本概念
  • 安装Webpack并创建vue项目
  • 深入理解 `git pull --rebase` 与 `--allow-unrelated-histories`:区别、原理与实战指南
  • 中医卫气营血辨证
  • STM32基础教程——旋转编码器测速
  • Django实战:打造美观的管理后台
  • 【银河麒麟高级服务器操作系统 】虚拟机运行数据库存储异常现象分析及处理全流程
  • Python np.vectorize函数介绍
  • 定制网站开发公司/河南郑州网站推广优化外包
  • 哪里有广告设计制作的培训/seo新方法
  • 怎样把自己做的网站发布/武汉百度网站优化公司
  • 自己建设影视网站/除了91还有什么关键词
  • 网站改版具体建议/百度账号登录入口
  • wordpress 挂马 清除/泸州网站seo