当前位置: 首页 > news >正文

Reactor重试操作

在响应式编程中,retryretryWhen 是两个用于处理错误重试的重要操作符。它们允许你根据特定条件重新订阅上游的 Flux,从而在发生错误时尝试恢复执行。下面将结合你我搜索到的资料,详细解释这两个操作符的使用方式和区别。


1. retry 操作符

  • 作用retry 操作符用于在发生错误时重新订阅上游的 Flux,从而尝试再次执行。它会根据你指定的重试次数进行重试。
  • 使用方式retry(n) 表示最多重试 n 次。如果在 n 次尝试后仍然失败,错误将被传播到下游。
  • 示例
          // 创建一个以 250 毫秒间隔发出事件的 Flux 发布者Flux.interval(Duration.ofMillis(250))// 对发出的事件进行映射处理.map(input -> {// 当事件次数小于 3 时,返回格式化后的字符串if (input < 3) {return "tick " + input;}// 当事件次数达到 3 时,抛出异常throw new RuntimeException("boom");})// 配置异常重试策略,重试 1 次.retry(1)// 测量并输出每个事件的处理时间.elapsed()// 订阅事件流,输出事件处理时间和事件内容或错误信息.subscribe(System.out::println, System.err::println);// 程序暂停 2100 毫秒,以观察事件流的输出Thread.sleep(2100);
    
    在这个例子中,retry(1) 表示最多重试一次。第一次执行时,input 为 0、1、2 时正常输出,当 input 为 3 时抛出异常。由于设置了 retry(1),系统会重新订阅 interval,从 tick 0 重新开始。第二次执行时,input 为 0、1、2 时正常输出,但 input 为 3 时仍然抛出异常,最终错误被传播到下游。
    输出结果如下
    [263,tick 0]
    [250,tick 1]
    [253,tick 2]
    [522,tick 0]
    [250,tick 1]
    [251,tick 2]
    java.lang.RuntimeException: boom
    

2. retryWhen 操作符

  • 作用retryWhenretry 的更高级版本,它允许你自定义重试逻辑。它通过一个“同伴”(companion)Flux<RetrySignal> 来判断是否应该重试。

  • 使用方式retryWhen(companion -> ...) 接收一个函数,该函数返回一个 Flux<RetrySignal>,用于控制重试行为。

  • 示例

    Flux<String> flux =Flux.<String>error(new RuntimeException("boom")).retryWhen(Retry.from(companion -> {companion.map(rs -> {if (rs.totalRetries() < 3) return rs.totalRetries();else throw Exceptions.propagate(rs.failure());});}));
    

    在这个例子中,retryWhen 接收一个 Retry 构建器,该构建器通过 companion 来跟踪重试次数。如果重试次数小于 3,返回当前重试次数;否则,抛出原始异常,停止重试。

  • 关键点

    • retryWhen 通过 RetrySignal 提供了更细粒度的控制,例如可以访问错误信息、重试次数等。
    • RetrySignal 包含了 totalRetries()totalRetriesInARow() 等方法,用于跟踪重试状态。
    • retryWhen 可以结合 Retry.fixedDelay()Retry.exponentialBackoff() 等策略,实现更复杂的重试逻辑。

3. retryWhen 的工作流程

  • (1) 错误发生时触发重试
    当上游 Flux 发生错误时,retryWhen 会将 RetrySignal 发送到你定义的“同伴” Flux。
    你可以在 RetrySignal 中访问错误信息和重试状态,从而决定是否进行重试。
  • (2) 同伴 Flux 发出值 → 重试发生
    如果“同伴” Flux 发出一个值(例如一个整数表示重试次数),则 retryWhen 会触发一次重试。
    重试是通过重新订阅上游 Flux 实现的,而不是重新执行 Flux 中的逻辑(如 map、filter 等)。
  • (3) 同伴 Flux 完成 → 重试停止
    如果“同伴” Flux 完成(即没有更多值发出),则表示重试策略已经处理完毕。
    此时,retryWhen 会停止重试循环,并将错误“吞掉”(即不再传播错误),最终生成的序列也会完成。
  • (4) 同伴 Flux 抛出错误 → 重试终止
    如果“同伴” Flux 抛出一个错误(e),则表示重试策略失败。
    此时,retryWhen 会停止重试循环,并将该错误传播到下游,导致整个序列出错。
{// 创建一个故意出错的Flux,并设置错误处理和重试机制Flux<String> flux = Flux.<String>error(new IllegalArgumentException()).doOnError(System.out::println).retryWhen(Retry.from(companion ->companion.take(3)));// 订阅Flux,打印输出结果flux.subscribe(System.out::println);
}
{AtomicInteger errorCount = new AtomicInteger();Flux<String> flux =Flux.<String>error(new IllegalArgumentException()).doOnError(e -> errorCount.incrementAndGet()).retryWhen(Retry.from(companion ->// The companion emits RetrySignal objects, which bear number of retries so far and last failurecompanion.map(rs -> {if (rs.totalRetries() < 3) {// To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index)return rs.totalRetries();} else {// In order to terminate the sequence in error, we throw the original exception after these three retriesthrow Exceptions.propagate(rs.failure());}})));flux.subscribe(System.out::println);
}

4. retryretryWhen 的区别

特性retryretryWhen
重试次数固定(如 retry(3)可自定义(通过 Retry 构建器)
重试逻辑无法自定义支持自定义重试条件
重试信号提供 RetrySignal 用于跟踪重试状态
适用场景简单的重试需求复杂的重试策略(如条件重试、指数退避等)

5. 使用 retryWhen 的常见场景

  • 条件重试:根据特定错误类型决定是否重试。

    .retryWhen(Retry.any().filter(e -> e instanceof ConnectTimeoutException).retryMax(3).fixedBackoff(Duration.ofSeconds(1))
    )
    
  • 指数退避:根据重试次数动态调整重试间隔。

    .retryWhen(companion -> companion.zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {if (index < MAX_RETRY) {return index;}throw Exceptions.propagate(error);}).flatMap(index -> Mono.delay(Duration.ofMillis(index * 100)))
    )
    
  • 自定义重试策略:结合 doOnRetry()doAfterRetry() 等方法,记录重试日志。

    .retryWhen(companion -> companion.doOnNext(System.out::println).zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {System.out.println("Retry #" + index);if (index < MAX_RETRY) {return index;}throw Exceptions.propagate(error);}).flatMap(index -> Mono.delay(Duration.ofMillis(100)))
    )
    

6. 总结

  • retry 是一个简单但功能有限的操作符,适合简单的重试需求。
  • retryWhen 是一个更强大、灵活的操作符,适合需要自定义重试逻辑的场景。
  • 两者都通过重新订阅上游 Flux 来实现重试,但 retryWhen 提供了更细粒度的控制和更丰富的功能。

通过合理使用 retryretryWhen,你可以构建出健壮的响应式错误处理机制,确保在发生错误时能够自动恢复执行。

http://www.dtcms.com/a/263049.html

相关文章:

  • 十大排序算法汇总
  • 2025年06月30日Github流行趋势
  • 创客匠人解析强 IP 时代创始人 IP 打造的底层逻辑与破局之道
  • Java开发新变革!飞算JavaAI深度剖析与实战指南
  • 一文讲清楚React中类组件与函数组件的区别与联系
  • 手机屏暗点缺陷修复及相关液晶线路激光修复原理
  • 类图+案例+代码详解:软件设计模式----生成器模式(建造者模式)
  • Franka机器人赋能RoboCulture研究,打造生物实验室智能解决方案
  • Vue防抖节流
  • 最新版 JT/T808 终端模拟器,协议功能验证、平台对接测试、数据交互调试
  • Spring Cloud Bus 和 Spring Cloud Stream
  • HarmonyOS NEXT仓颉开发语言实战案例:外卖App
  • NAT 类型及 P2P 穿透
  • 人工智能和云计算对金融未来的影响
  • Docker 入门教程(九):容器网络与通信机制
  • Qt 前端开发
  • (3)pytest的setup/teardown
  • 文心大模型 4.5 系列开源首发:技术深度解析与应用指南
  • Python 数据分析与可视化 Day 12 - 建模前准备与数据集拆分
  • 【C语言 | 字符串处理】sscanf 用法(星号*、集合%[]等)详细介绍、使用例子源码
  • 嵌入式SoC多线程架构迁移多进程架构开发技巧
  • C++ std::list详解:深入理解双向链表容器
  • uniapp小程序蓝牙打印通用版(集成二维码打印)
  • 深度学习04 卷积神经网络CNN
  • 【Python】 Function
  • 计算整数二进制中1的个数
  • 障碍感知 | 基于3D激光雷达的三维膨胀栅格地图构建(附ROS C++仿真)
  • day47 注意力热图可视化
  • 展示折线图的后端数据连接
  • leetcode427.建立四叉树