当前位置: 首页 > news >正文

【响应式编程】Reactor 常用操作符与使用指南

文章目录

    • 一、创建操作符
      • 1. `just` —— 创建包含指定元素的流
      • 2. `fromIterable` —— 从集合创建 Flux
      • 3. `empty` —— 创建空的 Flux 或 Mono
      • 4. `fromArray` —— 从数组创建 Flux
      • 5. `fromStream` —— 从 Java 8 Stream 创建 Flux
      • 6. `create` —— 使用 FluxSink 手动发射元素
      • 7. `generate` —— 使用状态生成元素,适用于同步场景
      • 8. `fromFuture` —— 从 CompletableFuture 创建 Mono
      • 9. `interval` —— 创建周期性发射元素的 Flux
      • 10. `timer` —— 创建延迟发射的 Mono
    • 二、转换操作符
      • 1. `map` —— 映射每个元素为新值
      • 2. `flatMap` —— 扁平化异步流,将每个元素映射为异步 Publisher
      • 3. `concatMap` —— 顺序执行映射为 Publisher 的异步流
    • 三、过滤操作符
      • 1. `filter` —— 按条件过滤元素
      • 2. `take` —— 获取前 N 个元素
      • 3. `skip` —— 跳过前 N 个元素
    • 四、组合操作符
      • 1. `concat` —— 按顺序合并多个 Flux
      • 2. `merge` —— 并发合并多个 Flux(无序)
      • 3. `zip` —— 按索引组合多个 Flux 的元素
    • 五、错误处理操作符
      • 1. `onErrorReturn` —— 出错时返回默认值
      • 2. `onErrorResume` —— 出错时切换备用流
      • 3. `retry` —— 出错时重试指定次数
    • 六、延迟执行与懒加载:`Mono.defer` 和 `Flux.defer`:被订阅时才执行
      • `Mono.defer` —— 懒加载 Mono,直到subscribe时才创建执行
      • `Flux.defer` —— 懒加载 Flux,每次订阅时重新执行逻辑

Reactor 是一个用于构建反应式应用程序的 Java 库,提供了丰富的操作符(算子)来处理反应式流(FluxMono)。本文详细介绍了 Reactor 中常用的创建、转换、过滤、组合和错误处理操作符,以及一些高级用法示例。


一、创建操作符

1. just —— 创建包含指定元素的流

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");

2. fromIterable —— 从集合创建 Flux

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);

3. empty —— 创建空的 Flux 或 Mono

Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();

4. fromArray —— 从数组创建 Flux

Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);

5. fromStream —— 从 Java 8 Stream 创建 Flux

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);

6. create —— 使用 FluxSink 手动发射元素

Flux<Integer> flux = Flux.create(sink -> {
    for (int i = 0; i < 5; i++) {
        sink.next(i);
    }
    sink.complete();
});

7. generate —— 使用状态生成元素,适用于同步场景

Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {
    sink.next(state);
    if (state == 4) sink.complete();
    return state + 1;
});

8. fromFuture —— 从 CompletableFuture 创建 Mono

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);

9. interval —— 创建周期性发射元素的 Flux

Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));

10. timer —— 创建延迟发射的 Mono

Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));

 

二、转换操作符

1. map —— 映射每个元素为新值

Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);

2. flatMap —— 扁平化异步流,将每个元素映射为异步 Publisher

Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));

3. concatMap —— 顺序执行映射为 Publisher 的异步流

Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));

 

三、过滤操作符

1. filter —— 按条件过滤元素

Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);

2. take —— 获取前 N 个元素

Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);

3. skip —— 跳过前 N 个元素

Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);

 

四、组合操作符

1. concat —— 按顺序合并多个 Flux

Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));

2. merge —— 并发合并多个 Flux(无序)

Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));

3. zip —— 按索引组合多个 Flux 的元素

Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);

 

五、错误处理操作符

1. onErrorReturn —— 出错时返回默认值

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .onErrorReturn(-1);

2. onErrorResume —— 出错时切换备用流

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .onErrorResume(e -> Mono.just(-1));

3. retry —— 出错时重试指定次数

Flux<Integer> result = Flux.just(1, 2, 3)
    .map(n -> {
        if (n == 2) throw new RuntimeException("error");
        return n;
    })
    .retry(2);

 

六、延迟执行与懒加载:Mono.deferFlux.defer:被订阅时才执行

Mono.defer —— 懒加载 Mono,直到subscribe时才创建执行

Mono<String> deferredMono = Mono.defer(() -> {
    System.out.println("Generating value...");
    return Mono.just("Deferred Result");
});

只有当 subscribe() 被调用时,Mono.defer 中的逻辑才会真正执行。这对于需要确保执行时机晚于前一步完成场景特别重要,比如:

Mono.defer(() -> readQaResultType())
    .subscribe(result -> System.out.println("QA Result: " + result));

在这段代码中,读取 qaResultType 的操作只会在前面的步骤(例如数据预处理)完全完成后才被触发

Flux.defer —— 懒加载 Flux,每次订阅时重新执行逻辑

Flux<Integer> deferredFlux = Flux.defer(() -> {
    System.out.println("Evaluating source...");
    return Flux.just(1, 2, 3);
});

每次订阅都会重新生成数据,适用于带有状态的源或依赖最新上下文的处理逻辑。


相关文章:

  • 资深词源学家提示词
  • VirtualBox虚拟机转换到VMware
  • 波束形成(BF)从算法仿真到工程源码实现-第六节-广义旁瓣消除算法(GSC)
  • Android Compose 权限申请完整指南
  • Embracing your shadows reveals the wholeness of your light.
  • Spring Cloud-负载均衡
  • docker进行打包
  • Vue3+Element Plus如何实现左树右表页面案例:即根据左边的树筛选右侧表功能实现
  • DIP支付方式改革下各种疾病医疗费用的影响以及分析方法研究综述
  • 【XCP实战】AUTOSAR架构下XCP从0到1开发配置实践
  • SDHC接口协议底层传输数据是安全的
  • Git 远程仓库
  • 设计模式(8)——SOLID原则之依赖倒置原则
  • 39.[前端开发-JavaScript高级]Day04-函数增强-argument-额外知识-对象增强
  • docker创建容器添加启动--restart选项
  • 复刻系列-星穹铁道 3.2 版本先行展示页
  • 前端在线工具 CodePen 和 JSFiddle
  • 智能工厂调度系统设计方案研究报告
  • 《AI大模型应知应会100篇》第15篇:大模型训练资源需求:算力、数据与成本分析
  • ConcurrentHashMap 源码分析
  • 哪条线路客流最大?哪个站点早高峰人最多?上海地铁一季度客流报告出炉
  • 夜读丨什么样的前程值得把春天错过
  • 科普|认识谵妄:它有哪些表现?患者怎样走出“迷雾”?
  • 英德宣布开发射程超2000公里导弹,以防务合作加强安全、促进经济
  • 董军同德国国防部长举行会谈
  • 体坛联播|博洛尼亚时隔51年再夺意杯,皇马逆转马洛卡