Reactor重试操作
在响应式编程中,retry
和 retryWhen
是两个用于处理错误重试的重要操作符。它们允许你根据特定条件重新订阅上游的 Flux
,从而在发生错误时尝试恢复执行。下面将结合你我搜索到的资料,详细解释这两个操作符的使用方式和区别。
1. retry
操作符
- 作用:
retry
操作符用于在发生错误时重新订阅上游的Flux
,从而尝试再次执行。它会根据你指定的重试次数进行重试。 - 使用方式:
retry(n)
表示最多重试n
次。如果在n
次尝试后仍然失败,错误将被传播到下游。 - 示例:
在这个例子中,// 创建一个以 250 毫秒间隔发出事件的 Flux 发布者Flux.interval(Duration.ofMillis(250))// 对发出的事件进行映射处理.map(input -> {// 当事件次数小于 3 时,返回格式化后的字符串if (input < 3) {return "tick " + input;}// 当事件次数达到 3 时,抛出异常throw new RuntimeException("boom");})// 配置异常重试策略,重试 1 次.retry(1)// 测量并输出每个事件的处理时间.elapsed()// 订阅事件流,输出事件处理时间和事件内容或错误信息.subscribe(System.out::println, System.err::println);// 程序暂停 2100 毫秒,以观察事件流的输出Thread.sleep(2100);
retry(1)
表示最多重试一次。第一次执行时,input
为 0、1、2 时正常输出,当input
为 3 时抛出异常。由于设置了retry(1)
,系统会重新订阅interval
,从tick 0
重新开始。第二次执行时,input
为 0、1、2 时正常输出,但input
为 3 时仍然抛出异常,最终错误被传播到下游。
输出结果如下[263,tick 0] [250,tick 1] [253,tick 2] [522,tick 0] [250,tick 1] [251,tick 2] java.lang.RuntimeException: boom
2. retryWhen
操作符
-
作用:
retryWhen
是retry
的更高级版本,它允许你自定义重试逻辑。它通过一个“同伴”(companion)Flux<RetrySignal>
来判断是否应该重试。 -
使用方式:
retryWhen(companion -> ...)
接收一个函数,该函数返回一个Flux<RetrySignal>
,用于控制重试行为。 -
示例:
Flux<String> flux =Flux.<String>error(new RuntimeException("boom")).retryWhen(Retry.from(companion -> {companion.map(rs -> {if (rs.totalRetries() < 3) return rs.totalRetries();else throw Exceptions.propagate(rs.failure());});}));
在这个例子中,
retryWhen
接收一个Retry
构建器,该构建器通过companion
来跟踪重试次数。如果重试次数小于 3,返回当前重试次数;否则,抛出原始异常,停止重试。 -
关键点:
retryWhen
通过RetrySignal
提供了更细粒度的控制,例如可以访问错误信息、重试次数等。RetrySignal
包含了totalRetries()
和totalRetriesInARow()
等方法,用于跟踪重试状态。retryWhen
可以结合Retry.fixedDelay()
、Retry.exponentialBackoff()
等策略,实现更复杂的重试逻辑。
3. retryWhen 的工作流程
- (1) 错误发生时触发重试
当上游 Flux 发生错误时,retryWhen 会将 RetrySignal 发送到你定义的“同伴” Flux。
你可以在 RetrySignal 中访问错误信息和重试状态,从而决定是否进行重试。 - (2) 同伴 Flux 发出值 → 重试发生
如果“同伴” Flux 发出一个值(例如一个整数表示重试次数),则 retryWhen 会触发一次重试。
重试是通过重新订阅上游 Flux 实现的,而不是重新执行 Flux 中的逻辑(如 map、filter 等)。 - (3) 同伴 Flux 完成 → 重试停止
如果“同伴” Flux 完成(即没有更多值发出),则表示重试策略已经处理完毕。
此时,retryWhen 会停止重试循环,并将错误“吞掉”(即不再传播错误),最终生成的序列也会完成。 - (4) 同伴 Flux 抛出错误 → 重试终止
如果“同伴” Flux 抛出一个错误(e),则表示重试策略失败。
此时,retryWhen 会停止重试循环,并将该错误传播到下游,导致整个序列出错。
{// 创建一个故意出错的Flux,并设置错误处理和重试机制Flux<String> flux = Flux.<String>error(new IllegalArgumentException()).doOnError(System.out::println).retryWhen(Retry.from(companion ->companion.take(3)));// 订阅Flux,打印输出结果flux.subscribe(System.out::println);
}
{AtomicInteger errorCount = new AtomicInteger();Flux<String> flux =Flux.<String>error(new IllegalArgumentException()).doOnError(e -> errorCount.incrementAndGet()).retryWhen(Retry.from(companion ->// The companion emits RetrySignal objects, which bear number of retries so far and last failurecompanion.map(rs -> {if (rs.totalRetries() < 3) {// To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index)return rs.totalRetries();} else {// In order to terminate the sequence in error, we throw the original exception after these three retriesthrow Exceptions.propagate(rs.failure());}})));flux.subscribe(System.out::println);
}
4. retry
与 retryWhen
的区别
特性 | retry | retryWhen |
---|---|---|
重试次数 | 固定(如 retry(3) ) | 可自定义(通过 Retry 构建器) |
重试逻辑 | 无法自定义 | 支持自定义重试条件 |
重试信号 | 无 | 提供 RetrySignal 用于跟踪重试状态 |
适用场景 | 简单的重试需求 | 复杂的重试策略(如条件重试、指数退避等) |
5. 使用 retryWhen
的常见场景
-
条件重试:根据特定错误类型决定是否重试。
.retryWhen(Retry.any().filter(e -> e instanceof ConnectTimeoutException).retryMax(3).fixedBackoff(Duration.ofSeconds(1)) )
-
指数退避:根据重试次数动态调整重试间隔。
.retryWhen(companion -> companion.zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {if (index < MAX_RETRY) {return index;}throw Exceptions.propagate(error);}).flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) )
-
自定义重试策略:结合
doOnRetry()
、doAfterRetry()
等方法,记录重试日志。.retryWhen(companion -> companion.doOnNext(System.out::println).zipWith(Flux.range(1, MAX_RETRY), (error, index) -> {System.out.println("Retry #" + index);if (index < MAX_RETRY) {return index;}throw Exceptions.propagate(error);}).flatMap(index -> Mono.delay(Duration.ofMillis(100))) )
6. 总结
retry
是一个简单但功能有限的操作符,适合简单的重试需求。retryWhen
是一个更强大、灵活的操作符,适合需要自定义重试逻辑的场景。- 两者都通过重新订阅上游
Flux
来实现重试,但retryWhen
提供了更细粒度的控制和更丰富的功能。
通过合理使用 retry
和 retryWhen
,你可以构建出健壮的响应式错误处理机制,确保在发生错误时能够自动恢复执行。