当前位置: 首页 > news >正文

响应式编程框架Reactor【5】

文章目录

  • 八、Reactor测试
    • 8.1 核心工具与依赖
    • 8.2 依赖引入
    • 8.3 StepVerifier
      • 8.3.1 基础用法:验证正常流与完成信号
      • 8.3.2 验证错误信息号
      • 8.3.3 验证条件与断言
      • 8.3.4 验证背压行为
      • 8.3.5 验证时间依赖操作(虚拟时间)
      • 8.3.6 验证资源清理与取消
    • 8.4 测试副作用与状态变化
      • 8.3.7 验证多个元素
    • 8.5 TestPublisher:手动控制信号发射
      • 8.5.1 模拟正常数据流
      • 8.5.2 模拟错误流
      • 8.5.3 模拟背压请求
      • 8.5.4 模拟错误处理
    • 8.6 并发与线程切换测试
    • 8.7 测试最佳实践
    • 8.8 StepVerifier工作流程
  • 九、实际应用场景
    • 9.1 Web应用中的响应式处理
    • 9.2 数据库响应式访问

八、Reactor测试

在 Reactor 响应式编程中,测试是确保流行为正确性的关键环节。由于响应式流的异步性、序列性和副作用特性,传统的单元测试方法(如直接断言返回值)难以适用。Reactor 提供了专门的测试库reactor-test,通过StepVerifier等工具实现对流的声明式验证,可精准测试元素序列、完成 / 错误信号、背压行为、时间依赖及副作用等场景。

8.1 核心工具与依赖

Reactor 的测试能力主要依赖reactor-test模块,其中:

  • StepVerifier:核心类,用于声明式验证流的行为(元素序列、信号、背压等)。

  • TestPublisher:用于手动发送信号(next/error/complete),测试订阅者对各种信号的响应。

  • VirtualTimeScheduler:虚拟时间调度器,用于模拟时间流逝,测试延迟、超时等时间依赖操作。

    1. StepVerifier
      1. 作用:用于测试 PublisherMono/Flux)的输出序列。
      2. 特点:声明式、链式调用、支持虚拟时间(Virtual Time)测试延迟操作。
    1. TestPublisher<T>
      1. 作用:一个可编程的 Publisher,用于模拟外部服务或测试背压、错误等场景。

8.2 依赖引入

需在项目中添加reactor-test依赖:

<!-- Maven -->
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><version>3.5.9</version> <!-- 与Reactor核心版本一致 --><scope>test</scope>
</dependency>

8.3 StepVerifier

流行为的声明式验证

StepVerifier是 Reactor 测试的核心工具,它通过 “步骤式” 声明来验证流的完整生命周期:从订阅开始,到元素发射、信号(完成 / 错误)触发,再到背压交互。

8.3.1 基础用法:验证正常流与完成信号

场景:测试一个简单流(如Flux.range(1,3))是否按预期发射元素并正常完成。

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.junit.Test;public class StepVerifierBasicTest {@Testpublic void testNormalFlux() {// 待测试的流:发射1,2,3后完成Flux<Integer> flux = Flux.range(1, 3);// StepVerifier验证流程StepVerifier.create(flux)// 预期第一个元素为1.expectNext(1)// 预期第二个元素为2.expectNext(2)// 预期第三个元素为3.expectNext(3)// 预期流正常完成.expectComplete()// 执行验证.verify();}
}

流程分析

测试代码StepVerifier被测试Fluxcreate(flux) 绑定待测试流订阅(subscribe)发射元素1验证expectNext(1)通过发射元素2验证expectNext(2)通过发射元素3验证expectNext(3)通过发送complete信号验证expectComplete()通过verify() 执行完成,测试通过测试代码StepVerifier被测试Flux

8.3.2 验证错误信息号

场景:测试流在特定条件下是否抛出预期的错误(如RuntimeException)。

   @Testpublic void testErrorFlux() {// 待测试的流:发射1后抛出异常Flux<Integer> errorFlux = Flux.range(1, 2).map(num -> {if (num == 2) {throw new RuntimeException("模拟错误");}return num;});StepVerifier.create(errorFlux).expectNext(1) // 预期第一个元素为1// 预期抛出RuntimeException,且消息匹配.expectErrorMatches(error ->error instanceof RuntimeException &&error.getMessage().equals("模拟错误")).verify(); // 执行验证}

8.3.3 验证条件与断言

@Test
public void testExpectPredicate() {Flux<Integer> flux = Flux.just(2, 4, 6, 8);StepVerifier.create(flux).expectNextMatches(n -> n % 2 == 0)  // 验证元素满足条件.expectNextMatches(n -> n > 0).thenConsumeWhile(n -> n < 10)       // 消费并验证所有后续元素 < 10.expectComplete().verify();
}

8.3.4 验证背压行为

场景:测试流是否正确响应背压(消费者通过request(n)控制元素请求量)。

@Test
public void testBackpressure() {// 待测试的流:发射1-5Flux<Integer> flux = Flux.range(1, 5);StepVerifier.create(flux)// 模拟消费者初始请求2个元素.thenRequest(2).expectNext(1, 2) // 验证前2个元素// 再请求2个元素.thenRequest(2).expectNext(3, 4) // 验证接下来2个元素// 最后请求1个元素.thenRequest(1).expectNext(5) // 验证最后1个元素.expectComplete().verify();
}

8.3.5 验证时间依赖操作(虚拟时间)

场景:测试含延迟的流(如delayElements),使用虚拟时间避免实际等待,加速测试。

@Test
public void testDelayElements() {// 待测试的流:每个元素延迟1秒发射(共3个元素)Flux<Integer> delayedFlux = Flux.range(1, 3).delayElements(Duration.ofSeconds(1));// 使用虚拟时间调度器VirtualTimeScheduler.getOrSet();StepVerifier.withVirtualTime(() -> delayedFlux).expectSubscription() // 验证订阅成功// 向前推进3秒(覆盖所有延迟).thenAwait(Duration.ofSeconds(3)).expectNext(1, 2, 3) // 验证所有元素被发射.expectComplete().verify();
}

关键说明

  • withVirtualTime(Supplier<Publisher>):启用虚拟时间,流的时间操作(如delay)会基于虚拟时钟而非真实时间。
  • thenAwait(Duration):手动推进虚拟时间,无需实际等待,大幅提升测试速度。

真实时间超时

@Test
public void testWithTimeout() {Flux<Long> slowFlux = Flux.interval(Duration.ofSeconds(2)) // 每 2 秒发一个.take(1); // 只取一个StepVerifier.create(slowFlux, StepVerifierOptions.create().withVirtualTime()).expectNoEvent(Duration.ofSeconds(1))     // 1 秒内无事件.expectNextCount(1)                       // 之后收到 1 个.expectComplete().verify(Duration.ofSeconds(3));           // 整体超时 3 秒
}

虚拟时间优势:测试 interval, delay, timeout 等操作符时,无需真实等待,大幅提升测试速度。

8.3.6 验证资源清理与取消

@Test
public void testCancel() {Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(100));StepVerifier.create(intervalFlux).expectNextCount(2)           // 收到 2 个.thenCancel()                 // 主动取消订阅.verify();                    // 验证取消成功
}

8.4 测试副作用与状态变化

响应式流中常包含副作用操作(如doOnNextdoOnErrordoFinally),需验证这些副作用是否按预期执行(如日志记录、状态更新)

@Test
public void testSideEffects() {List<Integer> processed = new ArrayList<>(); // 记录副作用执行结果// 待测试的流:发射1-3,每次发射后执行副作用(添加到list)Flux<Integer> fluxWithSideEffect = Flux.range(1, 3).doOnNext(num -> processed.add(num)) // 副作用:记录处理的元素.doOnComplete(() -> processed.add(-1)); // 完成时添加标记StepVerifier.create(fluxWithSideEffect).expectNext(1, 2, 3).expectComplete().verify();// 验证副作用是否按预期执行assert processed.equals(List.of(1, 2, 3, -1));
}

8.3.7 验证多个元素

@Test
public void testExpectNexts() {Flux<Integer> flux = Flux.range(1, 3); // 1, 2, 3StepVerifier.create(flux).expectNext(1, 2, 3)              // 一次性验证多个元素.expectComplete().verify();// 或者使用集合StepVerifier.create(flux).expectNextSequence(List.of(1, 2, 3)).expectComplete().verify();
}

8.5 TestPublisher:手动控制信号发射

TestPublisher 用于模拟一个 Publisher,常用于单元测试中替换外部依赖(如数据库、HTTP 客户端)

TestPublisher用于手动发送信号next/error/complete),适合测试订阅者(或操作符)对异常信号序列的响应(如重复发射、提前完成等)。

8.5.1 模拟正常数据流

@Test
public void testWithTestPublisher() {// 1. 创建 TestPublisherTestPublisher<String> testPublisher = TestPublisher.create();// 2. 被测试的服务使用这个 publisherServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);// 3. 手动发送数据testPublisher.next("Data1", "Data2");testPublisher.complete();// 4. 验证服务行为(假设服务有回调或状态)// 例如:assertThat(service.getProcessedData()).contains("Data1", "Data2");
}

8.5.2 模拟错误流

@Test
public void testErrorWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();ServiceUsingPublisher service = new ServiceUsingPublisher(testPublisher);testPublisher.error(new IOException("Network failed"));// 验证服务正确处理错误// assertThat(service.hasError()).isTrue();
}

8.5.3 模拟背压请求

@Test
public void testBackpressureWithTestPublisher() {TestPublisher<String> testPublisher = TestPublisher.create();StepVerifier.create(testPublisher.flux()).thenRequest(1)               // 主动请求 1 个.expectNext("dummy")          // TestPublisher 默认发送 dummy 值.then(() -> testPublisher.next("A")) // 发送真实数据.expectNext("A").thenRequest(1).then(() -> testPublisher.next("B")).expectNext("B").thenCancel().verify();
}

8.5.4 模拟错误处理

@Test
public void testInvalidSignalSequence() {// 创建一个严格模式的TestPublisher(不允许无效信号序列)TestPublisher<Integer> testPublisher = TestPublisher.create();// 手动发送信号:先发射元素,再发送错误,最后尝试发射元素(无效操作)testPublisher.next(1);testPublisher.error(new RuntimeException("测试错误"));testPublisher.next(2); // 错误后发射元素是无效的// 验证订阅者是否能捕获无效信号导致的错误Flux<Integer> flux = testPublisher.flux();StepVerifier.create(flux).expectNext(1).expectError(RuntimeException.class)// 验证错误消息(包含无效信号的描述).verify();
}

在这里插入图片描述

关键说明

  • TestPublisher.createStrict():严格模式,会检查信号序列的合法性(如错误 / 完成后不能再发射元素),非法操作会抛出异常。
  • TestPublisher.createNonStrict():非严格模式,允许非法信号序列,适合测试容错逻辑。

8.6 并发与线程切换测试

测试涉及线程切换(publishOn/subscribeOn)的流时,需确保操作在预期线程上执行,且并发场景下行为正确。

示例:验证线程切换后的执行线程

@Test
public void testThreadSwitch() {// 待测试的流:先在boundedElastic读取,再在parallel处理Flux<Integer> flux = Flux.range(1, 2).subscribeOn(Schedulers.boundedElastic()).doOnNext(num -> {// 验证上游操作在boundedElastic线程执行assert Thread.currentThread().getName().startsWith("boundedElastic");}).publishOn(Schedulers.parallel()).doOnNext(num -> {// 验证下游操作在parallel线程执行assert Thread.currentThread().getName().startsWith("parallel");});StepVerifier.create(flux).expectNext(1, 2).expectComplete().verify();
}

8.7 测试最佳实践

  1. 优先使用StepVerifier:它覆盖了 90% 以上的测试场景,声明式语法清晰且能验证完整流生命周期。
  2. 虚拟时间加速测试:对含delaytimeout的流,用withVirtualTime避免实际等待,测试速度提升 10 倍以上。
  3. 验证副作用:通过外部状态(如ListAtomicInteger)记录副作用,确保doOnXXX操作按预期执行。
  4. 严格模式测试边界情况:用TestPublisher的严格模式测试非法信号(如重复完成、错误后发射元素),验证流的容错性。
  5. 隔离测试:每个测试方法应独立,避免共享状态(如调度器、计数器)导致测试相互干扰。
  6. 断言具体化:使用expectErrorMatches而非expectError(),精准验证错误类型和消息;用expectNextSequence验证批量元素。

8.8 StepVerifier工作流程

expectNext
expectError
expectComplete
expectNoEvent
创建 StepVerifier
定义期望序列
期望类型
验证 onNext 信号
验证 onError 信号
验证 onComplete 信号
验证超时期内无事件
继续链式调用
调用 verify()
执行验证
通过?
测试成功
抛出 AssertionError

九、实际应用场景

9.1 Web应用中的响应式处理

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class WebApplicationExamples {// 使用WebClient进行HTTP调用public Flux<User> getUsersWithPosts() {WebClient webClient = WebClient.create("https://api.example.com");return webClient.get().uri("/users").retrieve().bodyToFlux(User.class).flatMap(user -> webClient.get().uri("/users/{id}/posts", user.getId()).retrieve().bodyToFlux(Post.class).collectList().map(posts -> {user.setPosts(posts);return user;})).timeout(Duration.ofSeconds(5)).onErrorResume(e -> {// 记录错误但继续处理其他用户System.err.println("Error fetching posts: " + e.getMessage());return Mono.just(new User()); // 返回空用户或默认值});}// 批量处理与缓冲public Flux<Result> processInBatches(Flux<Item> items) {return items.buffer(100) // 每100个元素一批.delayElements(Duration.ofMillis(500)) // 控制速率.flatMap(batch -> processBatch(batch).subscribeOn(Schedulers.parallel()));}private Mono<Result> processBatch(List<Item> batch) {return Mono.fromCallable(() -> {// 模拟批量处理return new Result("Processed " + batch.size() + " items");});}// 实时数据流处理public Flux<Event> processRealTimeEvents(Flux<Event> eventStream) {return eventStream.window(Duration.ofSeconds(1)) // 1秒窗口.flatMap(window -> window.groupBy(Event::getType).flatMap(group -> group.reduce((e1, e2) -> new Event(e1.getType(), e1.getValue() + e2.getValue()))));}
}// 简单的数据模型
class User {private String id;private List<Post> posts;// getters and setters
}class Post {private String id;private String content;// getters and setters
}class Item {private String id;// getters and setters
}class Result {private String message;// constructor, getters
}class Event {private String type;private int value;// constructor, getters
}

9.2 数据库响应式访问

伪代码,需要引入其它的依赖项

import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;public class DatabaseExamples {private final R2dbcEntityTemplate template;public Flux<User> findActiveUsers() {return template.select(User.class).from("users").matching(where("active").is(true)).all().delayElements(Duration.ofMillis(10)) // 控制数据库压力.onBackpressureBuffer(1000); // 缓冲背压}public Mono<Void> saveUsersInTransaction(Flux<User> users) {return template.inTransaction(() -> users.buffer(100) // 每100个用户一批提交.flatMap(batch -> template.insert(batch).then()));}public Flux<User> findUsersWithRetry() {return template.select(User.class).from("users").all().retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).onErrorResume(e -> {System.err.println("Database unavailable: " + e.getMessage());return Flux.empty(); // 返回空流而不是错误});}
}
http://www.dtcms.com/a/357894.html

相关文章:

  • PostgreSQL表空间(Tablespace)作用(管理数据库对象的存储位置)(pg_default、pg_global)
  • STL库——list(类模拟实现)
  • 将LLM模型“钉”在电路板上:用电阻矩阵实现物理推理引擎
  • Nacos-3.0.3 适配PostgreSQL数据库
  • openGauss笔记
  • rabbitMQ延时队列实现,怎么保证消息的幂等
  • HTML 核心元素实战:超链接、iframe 框架与 form 表单全面解析
  • 【WDG协议栈】AUTOSAR架构下WDG模块软硬件功能详解
  • 基于单片机指纹考勤系统/智能考勤
  • ⸢ 叁 ⸥ ⤳ 默认安全:概述与建设思路
  • 【Day 33】Linux-MySQL 备份与恢复详解
  • 从分子工具到技术革新:链霉亲和素 - 生物素系统与 M13 噬菌体展示的交叉应用解析
  • 针对 “TCP 数据传输机制” 的攻击
  • vue2下拉菜单
  • 服务器托管多少钱一年?服务器托管收费标准
  • C++day2作业
  • TuringComplete游戏攻略(2.2存储器)
  • 【C++】类和对象(终章)
  • 数值分析——误差的来源与分类、误差的基本概念(绝对误差、相对误差、有效数字)
  • 世界模型的典型框架与分类
  • react性能优化有哪些
  • 卷积神经网络项目:基于CNN实现心律失常(ECG)的小颗粒度分类系统
  • 拆解《AUTOSAR Adaptive Platform Core》(Core.pdf)—— 汽车电子的 “基础技术说明书”
  • 开发指南136-设置零值不显示
  • Java中使用JSONUtil处理JSON数据:从前端到后端的完美转换
  • docker命令(二)
  • vue+Django 双推荐算法旅游大数据可视化系统Echarts mysql数据库 带爬虫
  • 指纹云手机网络环境隔离技术:筑牢海外社媒多账号运营安全屏障
  • Git与DevOps实战:从版本控制到自动化部署
  • jsqlparser(六):TablesNamesFinder 深度解析与 SQL 格式化实现