Spring异步编程- 浅谈 Reactor 核心操作符
在响应式编程(Reactive Programming)中,数据流(Flux/Mono)的组合与变换是最常见的工作。Reactor 提供了大量操作符来应对不同场景,帮助我们以声明式的方式处理异步数据。
本文介绍了 Reactor 中的几个核心操作符,如 zip、merge、concat、flatMap、filter、transform、defaultIfEmpty / switchIfEmpty 等。
1. zip 与 zipWith:流的配对压缩
示例 1:Flux.zip
Flux.zip(Flux.just(1, 2, 3),Flux.just("a", "b", "c"),Flux.just("B", "C", "D", "E")).log().subscribe(v -> System.out.println("v = " + v));
特点
- 按位结对:zip 会等待每个流都有一个元素后,再组成一个元组。
- 短流优先:当某个流结束,结果流也随之结束。
- 支持最多 8 个流。
输出
20:37:42.952 [main] INFO reactor.Flux.Zip.1 -- onSubscribe(FluxZip.ZipCoordinator)
20:37:42.956 [main] INFO reactor.Flux.Zip.1 -- request(unbounded)
20:37:42.958 [main] INFO reactor.Flux.Zip.1 -- onNext([1,a,B])
v = [1,a,B]
20:37:42.959 [main] INFO reactor.Flux.Zip.1 -- onNext([2,b,C])
v = [2,b,C]
20:37:42.959 [main] INFO reactor.Flux.Zip.1 -- onNext([3,c,D])
v = [3,c,D]
20:37:42.959 [main] INFO reactor.Flux.Zip.1 -- onComplete()
示例 2:zipWith
Flux.just(1, 2, 3).zipWith(Flux.just("a", "b", "c", "d")).map(tuple -> tuple.getT1() + " -> " + tuple.getT2()).log().subscribe(System.out::println);
特点
- 双流压缩的语法糖,更直观。
- 元组通过
getT1()
和getT2()
获取元素。 - 当一方比另一方长,额外的元素会被舍弃。
输出
20:40:26.809 [main] INFO reactor.Flux.Map.1 -- onSubscribe(FluxMap.MapSubscriber)
20:40:26.813 [main] INFO reactor.Flux.Map.1 -- request(unbounded)
20:40:26.817 [main] INFO reactor.Flux.Map.1 -- onNext(1 -> a)
1 -> a
20:40:26.817 [main] INFO reactor.Flux.Map.1 -- onNext(2 -> b)
2 -> b
20:40:26.818 [main] INFO reactor.Flux.Map.1 -- onNext(3 -> c)
3 -> c
20:40:26.818 [main] INFO reactor.Flux.Map.1 -- onComplete()
2. merge 与 mergeWith、mergeSequential
示例 1:mergeSequential
Flux.mergeSequential(Flux.just(3, 4, 5),Flux.just(6, 7, 8),Flux.just(1, 2, 3)).log().subscribe();
特点
- merge:按照时间交错合并,顺序不可预测。
- mergeWith:两个流合并,类型必须相同。
- mergeSequential:整体流按顺序拼接,流内顺序也保证。
输出
20:42:20.140 [main] INFO reactor.Flux.MergeSequential.1 -- onSubscribe(FluxMergeSequential.MergeSequentialMain)
20:42:20.144 [main] INFO reactor.Flux.MergeSequential.1 -- request(unbounded)
20:42:20.144 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(3)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(4)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(5)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(6)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(7)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(8)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(1)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(2)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onNext(3)
20:42:20.145 [main] INFO reactor.Flux.MergeSequential.1 -- onComplete()
示例 2:mergeWith
Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6)).log().subscribe();
输出:
20:44:19.163 [main] INFO reactor.Flux.Merge.1 -- onSubscribe(FluxFlatMap.FlatMapMain)
20:44:19.166 [main] INFO reactor.Flux.Merge.1 -- request(unbounded)
20:44:19.167 [main] INFO reactor.Flux.Merge.1 -- onNext(1)
20:44:19.167 [main] INFO reactor.Flux.Merge.1 -- onNext(2)
20:44:19.167 [main] INFO reactor.Flux.Merge.1 -- onNext(3)
20:44:19.168 [main] INFO reactor.Flux.Merge.1 -- onNext(4)
20:44:19.168 [main] INFO reactor.Flux.Merge.1 -- onNext(5)
20:44:19.168 [main] INFO reactor.Flux.Merge.1 -- onNext(6)
20:44:19.168 [main] INFO reactor.Flux.Merge.1 -- onComplete()
示例 3:带延迟的 merge
Flux.merge(Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500))).log().subscribe();
特点
- 多源数据根据时间流入同一个结果流。
- 输出顺序取决于元素的实际到达时间,而不是写在代码里的顺序。
输出:
20:45:29.872 [main] INFO reactor.Flux.Merge.1 -- onSubscribe(FluxFlatMap.FlatMapMain)
20:45:29.876 [main] INFO reactor.Flux.Merge.1 -- request(unbounded)
20:45:30.394 [parallel-3] INFO reactor.Flux.Merge.1 -- onNext(haha)
20:45:30.893 [parallel-1] INFO reactor.Flux.Merge.1 -- onNext(1)
20:45:30.909 [parallel-4] INFO reactor.Flux.Merge.1 -- onNext(hehe)
20:45:31.394 [parallel-2] INFO reactor.Flux.Merge.1 -- onNext(a)
20:45:31.412 [parallel-6] INFO reactor.Flux.Merge.1 -- onNext(heihei)
20:45:31.907 [parallel-5] INFO reactor.Flux.Merge.1 -- onNext(2)
20:45:31.923 [parallel-8] INFO reactor.Flux.Merge.1 -- onNext(xixi)
20:45:32.906 [parallel-7] INFO reactor.Flux.Merge.1 -- onNext(b)
20:45:32.921 [parallel-9] INFO reactor.Flux.Merge.1 -- onNext(3)
20:45:32.922 [parallel-9] INFO reactor.Flux.Merge.1 -- onComplete()
3. defaultIfEmpty 与 switchIfEmpty:兜底策略
示例 1:defaultIfEmpty
Mono<String> haha() {
// return Mono.just("a");return Mono.empty();
}haha().defaultIfEmpty("x").subscribe(System.out::println);
输出:
x
如果流没有元素(
Mono.empty()
),返回兜底的静态值。
示例 2:switchIfEmpty
Mono<String> hehe() {return Mono.just("兜底数据...");
}haha().switchIfEmpty(hehe()).subscribe(System.out::println);
输出:
兜底数据...
如果流没有数据,可以切换到另一个 Publisher,实现动态兜底。
4. transform 与 transformDeferred
AtomicInteger counter = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c", "d").transform(values -> {if (counter.incrementAndGet() == 1) {// 如果是第一次调用,老流中的所有元素转为大写return values.map(String::toUpperCase);} else {// 如果不是第一次调用,原封不动返回return values;}});// transform无 Defer,只调用一次// transform有 Defer,有多少订阅者就调用多少次flux.subscribe(v -> System.out.println("订阅者1:v = " + v));flux.subscribe(v -> System.out.println("订阅者2:v = " + v));
输出:
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者1:v = D
订阅者2:v = A
订阅者2:v = B
订阅者2:v = C
订阅者2:v = D
AtomicInteger counter = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c", "d").transformDeferred(values -> {if (counter.incrementAndGet() == 1) {return values.map(String::toUpperCase);} else {return values;}});flux.subscribe(v -> System.out.println("订阅者1:v = " + v));
flux.subscribe(v -> System.out.println("订阅者2:v = " + v));
输出:
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者1:v = D
订阅者2:v = a
订阅者2:v = b
订阅者2:v = c
订阅者2:v = d
区别
- transform:只会在流构建时应用一次变换逻辑。
- transformDeferred:每个订阅者都会触发一次变换逻辑。
5. concat、concatWith 与 concatMap
示例 1:concatWith
Flux.just(1, 2).concatWith(Flux.just(3, 4)).concatWith(Flux.just(5, 6)).log().subscribe();
输出:
20:53:55.690 [main] INFO reactor.Flux.ConcatArray.1 -- onSubscribe(FluxConcatArray.ConcatArraySubscriber)
20:53:55.694 [main] INFO reactor.Flux.ConcatArray.1 -- request(unbounded)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(1)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(2)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(3)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(4)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(5)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(6)
20:53:55.695 [main] INFO reactor.Flux.ConcatArray.1 -- onComplete()
示例 2:Flux.concat
Flux.concat(Flux.just(1, 2),Flux.just("h", "j"),Flux.just("haha", "hehe")).log().subscribe();
输出:
20:55:06.356 [main] INFO reactor.Flux.ConcatArray.1 -- onSubscribe(FluxConcatArray.ConcatArraySubscriber)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- request(unbounded)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(1)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(2)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(h)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(j)
20:55:06.361 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(haha)
20:55:06.363 [main] INFO reactor.Flux.ConcatArray.1 -- onNext(hehe)
20:55:06.363 [main] INFO reactor.Flux.ConcatArray.1 -- onComplete()
特点:
- 静态调用,拼接多个流。
- 元素类型可以不同。
示例 3:concatMap
Flux.just(1, 2).concatMap(s -> Flux.just(s + " -> a")).log().subscribe();
输出:
20:56:42.857 [main] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
20:56:42.861 [main] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- request(unbounded)
20:56:42.863 [main] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(1 -> a)
20:56:42.863 [main] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onNext(2 -> a)
20:56:42.863 [main] INFO reactor.Flux.ConcatMapNoPrefetch.1 -- onComplete()
特点:
- 类似 flatMap,但保证顺序。
- 一对多映射,串行执行。
6. flatMap:并行化的扁平化映射
Flux.just("zhang san", "li si").flatMap(v -> Flux.fromArray(v.split(" "))).log().subscribe();
输出:
20:57:36.333 [main] INFO reactor.Flux.FlatMap.1 -- onSubscribe(FluxFlatMap.FlatMapMain)
20:57:36.337 [main] INFO reactor.Flux.FlatMap.1 -- request(unbounded)
20:57:36.338 [main] INFO reactor.Flux.FlatMap.1 -- onNext(zhang)
20:57:36.338 [main] INFO reactor.Flux.FlatMap.1 -- onNext(san)
20:57:36.338 [main] INFO reactor.Flux.FlatMap.1 -- onNext(li)
20:57:36.338 [main] INFO reactor.Flux.FlatMap.1 -- onNext(si)
20:57:36.338 [main] INFO reactor.Flux.FlatMap.1 -- onComplete()
特点:可以把一个元素映射为多个元素(比如数组展开)。
7. filter:筛选元素
Flux.just(1, 2, 3, 4).log().filter(s -> s % 2 == 0) // 过滤偶数,消费上面的流,request(1).subscribe(); // 最终消费者 request(unbounded)
输出:
20:59:58.585 [main] INFO reactor.Flux.Array.1 -- | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
20:59:58.590 [main] INFO reactor.Flux.Array.1 -- | request(unbounded)
20:59:58.591 [main] INFO reactor.Flux.Array.1 -- | onNext(1)
20:59:58.591 [main] INFO reactor.Flux.Array.1 -- | request(1)
20:59:58.591 [main] INFO reactor.Flux.Array.1 -- | onNext(2)
20:59:58.592 [main] INFO reactor.Flux.Array.1 -- | onNext(3)
20:59:58.592 [main] INFO reactor.Flux.Array.1 -- | request(1)
20:59:58.592 [main] INFO reactor.Flux.Array.1 -- | onNext(4)
20:59:58.592 [main] INFO reactor.Flux.Array.1 -- | onComplete()
特点:
-
类似 Java Stream 的 filter。
-
结合
.log()
可以清晰看到事件流转:onSubscribe
request(unbounded)
onNext(...)
onComplete()
总结
本文介绍了 Reactor 中最常用的操作符,完整对比如下:
操作符 | 功能 | 特点 |
---|---|---|
zip / zipWith | 多流结对压缩 | 短流优先,支持元组(最多 8 个流) |
merge / mergeWith | 多流按时间合并 | merge:可以合并不同类型,结果类型为 Object;mergeWith:要求相同类型。两者都可能乱序(取决于数据到达时间)。 |
mergeSequential | 流整体顺序合并 | 串行但保留内部顺序 |
concat / concatWith | 串联多个流 | 保证顺序,静态/动态调用 |
concatMap | 一对多映射 | 保证顺序,串行处理 |
flatMap | 一对多映射 | flatMap 的“并行”是由内部流的调度方式决定的。如果内部流是同步的,flatMap 结果看起来就像单线程串行;如果内部流用到了异步调度器,那就可能多线程并发。 |
defaultIfEmpty | 静态兜底 | 返回固定值 |
switchIfEmpty | 动态兜底 | 当源流没有任何元素时,会订阅并切换到备用 Publisher。 |
transform | 变换 | 只应用一次,所有订阅者共享 |
transformDeferred | 变换 | 每个订阅者独立执行 |
filter | 元素过滤 | 类似 Java Stream |
通过这些操作符的灵活组合,我们可以构建出强大、优雅的响应式数据流管道。