当前位置: 首页 > news >正文

Spring WebFlux 原理与实践全解析

前言

传统的 Web 框架Spring MVC,通过阻塞式多线程处理每一个请求。每一个客户端连接都会占用一个线程:当请求到达时,系统从线程池中分配一个工作线程(worker 线程)负责处理,而这个线程会一直 “盯着” 任务,直到处理完成并返回结果后才会释放

这种模式下,线程资源与连接数直接挂钩。比如面对 1000 个同时发来的 “查询商品库存” 请求,Spring MVC会调度 1000 个线程,每个线程单独处理一个请求。如果查询数据库平均耗时1秒,这 1 秒里,每个线程只能等待,其既不能接新请求,也不能干别的,只能等待查询完成。一旦线程池里的线程被耗尽,新请求就会排队甚至失败,这也就是高并发场景下传统框架 “扛不住” 的原因。

而 Spring5 所推出的 Spring-WebFlux,则是一套全新的异步非阻塞 Web 模块。它的核心是通过 “事件轮询(event looping)” 机制,来实现一个线程就能处理多个请求,从而降低连接的资源消耗


回到之前的例子:同样面对 1000 个查询请求,当使用异步WebFlux后,程序内部不再需要创建 1000 个线程,而是利用非阻塞特性实现线程复用。假设我们的CPU 核心数为4,对于此类I/O密集型请求,线程池数量通常维护到 8(I/O 密集型任务中,线程数一般为 CPU 核心数的 2 倍,刚好匹配大量等待 IO 时的线程复用需求)。

此时,仅需 8 个线程即可完成 1000 个请求的处理。事件轮询机制作为核心调度组件,会首先接收所有请求并完成注册,随后通过持续的事件监听机制检测各请求的后续操作是否就绪。

当某请求需要执行数据库操作时,线程不会进入阻塞等待状态,而是通过注册回调函数预设结果处理逻辑,随即转向处理下一个就绪任务;待数据库操作完成并返回结果后,事件轮询会触发回调流程,由空闲线程继续执行该请求的后续处理步骤。

在此过程中,线程始终处于可执行状态,避免了因等待I/O操作导致的资源闲置。因此,1000 个请求同样可在约 1 秒内完成处理,且全程不会因线程资源耗尽引发新请求的阻塞。

WebFlux相关API

知晓了WebFlux的异步机制后,接下来则对WebFlux相关的API进行一个简单介绍。

具体来看,WebFlux的 API 围绕响应式编程模型设计,核心依赖于 Reactor 库中的Mono和Flux两种类型:

  • Mono用于表示包含 0 个或 1 个元素的异步数据流(如单个用户信息查询结果);

  • Flux用于表示包含 0 个或多个元素的异步数据流(如列表查询结果)。

一、创建型操作符

1. just(T…)

just操作可直接通过传入的元素创建一个响应式流。其中,Flux 可接收多个元素(0 个或多个),Mono 则仅能接收单个元素(若传入多个会报错,示例中Mono.just("A", "B", "C")为错误示范,正确应为Mono.just("A"))。

// 创建包含3个元素的Flux流(可正常运行)
Flux<String> flux = Flux.just("A", "B", "C");
// 正确示例:创建仅包含1个元素的Mono流
Mono<String> mono = Mono.just("A"); 

2. fromIterable(Iterable)

fromIterable、fromArray则分别将已有的Iterable类型集合(如List、Set),和数组转换为响应式流,自动遍历集合中的元素并发送到流中。适合需要处理已有数据集合的场景,避免手动逐个添加元素。

// fromIterable
List<Integer> list = Arrays.asList(1, 2, 3);
Flux<Integer> flux = Flux.fromIterable(list);
// fromArray
Integer[] arr = {10, 20, 30};
Flux<Integer> flux = Flux.fromArray(arr);
flux.subscribe(System.out::println);    

3. range(int start, int count)

range则适用于快速生成一段连续的整数流来构建测试用例。其中,第一个参数为起始值,第二个参数为元素个数。

Flux.range(1, 5).subscribe(System.out::println); // 输出 1~5

4. create(FluxSink)

通过FluxSink对象可手动发送元素(next)、结束信号(complete)或错误信号(error),灵活控制流的产生过程。

该API适合用于从异步回调、事件监听中获取数据等场景。

// 手动创建流,通过sink发送元素并结束
Flux.create(sink -> {sink.next("Hello"); // 发送第一个元素sink.next("WebFlux"); // 发送第二个元素sink.complete(); // 标记流结束(不再发送元素)
}).subscribe(System.out::println); // 输出:Hello WebFlux

二、转换型操作符

1. map(Function)

map的操作则主要对流中的每个元素执行指定的转换操作。即输入一个元素,输出一个转换后的元素,保持流的元素数量不变。适合简单的同步转换场景。

// 对流中每个字符串执行"转大写"操作
Flux<String> flux = Flux.just("apple", "banana").map(String::toUpperCase); // 调用String的toUpperCase()方法
flux.subscribe(System.out::println); // 输出:APPLE BANANA

2. flatMap(Function>)

与map不同,flatMap 则接收一个元素T,返回一个新的响应式流Publisher(如Flux或Mono)。 即整个过程是 "元素→流" 的映射,会将子流 "扁平化" 合并为一个新流,输出流的元素数量可能比输入流多(或少)。

// 将每个字符串按字符拆分,转换为包含单个字符的子流,再合并
Flux<String> flux = Flux.just("hello", "world").flatMap(s -> Flux.fromArray(s.split(""))); // 拆分后子流为 ["h","e","l","l","o"] 和 ["w","o","r","l","d"]
flux.subscribe(System.out::println); 
// 可能输出:h w e o l r l l d(顺序不固定,因两个子流并行处理)

举个更实际的例子,如 "一个订单包含多个商品,需要根据订单 ID 查询所有商品" 的场景(1 个订单→多个商品组成的流),或需要在转换中调用异步操作可使用flatMap来进行操作。

// 模拟"根据用户ID查询多个订单"的异步操作
Flux.just(1001, 1002) // 用户ID流.flatMap(userId -> orderService.findOrdersByUserId(userId)) // 每个用户ID→订单流.subscribe(order -> System.out.println("订单:" + order));

3. concatMap(Function>)

与flatMap类似,concatMap则是将每个元素转换为子流后合并,但严格按照原元素的顺序处理子流(前一个子流完全处理完才会处理下一个),因此最终流的元素顺序与原元素对应的子流顺序一致。适合需要保证顺序的场景(如按顺序处理批量任务)。

// 同样拆分字符串为字符子流,但按原顺序合并
Flux<String> flux = Flux.just("hello", "world").concatMap(s -> Flux.fromArray(s.split(""))); // 先处理"hello"的子流,再处理"world"的子流
flux.subscribe(System.out::println); 
// 固定输出:h e l l o w o r l d(严格遵循原元素顺序)

三、过滤型操作符

1. filter(Predicate)

javaFlux<Integer> flux = Flux.range(1, 10).filter(i -> i % 2 == 0);
flux.subscribe(System.out::println);

2. distinct()

对流中所有元素进行去重处理,保留首次出现的元素,后续重复元素会被过滤。

java深色版本Flux.just(1, 2, 2, 3).distinct().subscribe(System.out::println);

3. limitRate(int rate)

控制流从上游数据源获取元素的速率,每次向上游请求 rate 个元素,处理完后再请求下一批,避免一次性加载过多数据导致内存压力(类似 “分批拉取”)。常用于流中元素数量极大的场景(如处理百万级数据),平衡内存占用与处理效率。

// 生成1~100的整数流,每次从上游获取10个元素后再继续请求
Flux.range(1, 100)
.limitRate(10).subscribe(System.out::println); 
// 内部过程:先请求1~10,处理完后再请求11~20,直到所有元素处理完毕

四、错误处理操作符

1. onErrorReturn(T fallback)

当流中发生错误(如抛出异常)时,会立即终止原错误流,并返回一个预设的默认值作为流的最终结果,避免错误直接传递到订阅者导致程序中断。

// 模拟一个直接抛出异常的流,出错时返回默认值0
Flux<Integer> flux = Flux.error(new RuntimeException("Oops!")).onErrorReturn(0); // 异常发生时,用0作为替代结果
flux.subscribe(System.out::println); // 输出:0(而非抛出异常)

2. doOnError(Consumer)

当流中发生错误时,会触发传入的消费函数(Consumer),可用于记录错误日志、发送告警等 “副作用操作”。

// 模拟一个抛出异常的流,监听错误并打印信息
Flux.error(new RuntimeException("Oops!")).doOnError(e -> System.out.println("Error occurred: " + e.getMessage())) // 错误发生时执行打印.subscribe(data -> System.out.println("Data: " + data), // 正常数据处理(此处不会执行)error -> System.out.println("Subscriber caught error") // 订阅者仍会收到错误);
// 输出:
// Error occurred: Oops!
// Subscriber caught error

五、时间控制操作符

1. delayElements(Duration duration)

delayElements主要让流中的每个元素都延迟指定时间后再发射,相当于给每个元素的发送增加一个统一的 “等待期”。

// 生成1~3的整数流,每个元素延迟1秒后发送
Flux.range(1, 3).delayElements(Duration.ofSeconds(1)).subscribe(System.out::println); // 依次间隔1秒输出:1、2、3
Thread.sleep(5000); // 主线程休眠5秒,防止程序提前退出(否则看不到完整输出)

2. timeout(Duration timeout)

timeout主要为流设置超时阈值,若流在指定时间内没有发射新元素(或未完成),则会触发超时错误(TimeoutException)。适合需要限制操作响应时间的场景(如接口调用超时控制)。

// 生成1~3的整数流,每个元素延迟500毫秒发送,同时设置超时时间为300毫秒
Flux.range(1, 3).delayElements(Duration.ofMillis(500)) // 元素发送间隔500ms.timeout(Duration.ofMillis(300)) // 超过300ms未收到新元素则超时.subscribe(System.out::println, // 正常元素处理(此处第一个元素就会超时,不会执行)Throwable::printStackTrace // 捕获并打印超时异常);
Thread.sleep(2000); // 主线程休眠,确保异常能被捕获
// 输出:TimeoutException(因第一个元素需500ms发送,超过300ms超时阈值)

六、订阅操作符

1. subscribe(Consumer)

subscribe是响应式流的 “启动开关”—— 只有调用该方法,流才会开始发射元素。此重载方法接收一个消费函数(Consumer),用于处理流中发射的每个元素,是最基础的订阅方式。

// 创建包含"A"、"B"的流,订阅并打印每个元素
Flux.just("A", "B").subscribe(System.out::println); 
// 输出:A(处理第一个元素)、B(处理第二个元素)
// 注:若不调用subscribe,流不会执行任何操作

2. doOnNext(Consumer)

在流中的每个元素被发射到订阅者之前,触发指定的消费函数(如日志记录、数据预处理等),但不会改变元素本身或流的结构。

// 生成1~3的整数流,发射前打印提示,再将元素发送给订阅者
Flux.range(1, 3).doOnNext(i -> System.out.println("Before emit: " + i)) // 发射前执行:打印提示.subscribe(System.out::println); // 订阅者接收并打印元素
// 输出:
// Before emit: 1 → 发射前操作
// 1 → 订阅者处理
// Before emit: 2
// 2
// Before emit: 3
// 3

3. doOnComplete(Runnable)

流正常结束(所有元素发射完毕且无错误)时,触发指定的无参任务(Runnable),可用于执行流结束后的收尾操作(如释放资源、打印完成日志等)。

// 创建包含1、2的流,完成时打印"Done",并订阅(无需处理元素,仅触发完成回调)
Flux.just(1, 2).doOnComplete(() -> System.out.println("Done")) // 流正常结束时执行.subscribe(); // 订阅启动流
// 输出:Done(当1和2都发射完毕后,触发完成回调)

更多操作可参考如下表格:

操作符类型描述
just, fromIterable, empty, range, generate, create创建构建流
map, flatMap, concatMap, switchMap, buffer, window转换映射或拆分
filter, take, skip, distinct, limitRate过滤控制元素
mergeWith, concatWith, zipWith, combineLatest组合合并流
onErrorResume, onErrorReturn, doOnError错误处理异常捕获
delayElements, timeout时间控制延迟/超时
defaultIfEmpty, hasElement, all, any条件逻辑判断
reduce, collect, count聚合统计计算
subscribe, doOnNext, doOnComplete副作用监听
log, checkpoint日志调试

调用实践

在深入剖析完WebFlux相关 API,对其异步非阻塞特性及响应式编程模型有了清晰认知后,为进一步巩固理解并探索其实践边界,我们将借助Trae搭建一款基于WebFlux的远程调用框架。

框架整体调用逻辑如下:


在设计过程中,我们主要借鉴@FeignClient远程调用的原理,首先定义一个@FeignWebFlux用以标注到接口上,然后我们的框架自动解析标有@FeignWebFlux的注解,并生成代理类,进而底层通过WebFlux来实现对服务的远程调用。

我们整个项目结构如下:

 

  • web-flux-remote-client:定义远程调用注解,并完成相关注解代理类的构建

  • client-consumer: 客户端消费者

  • client-provider: 客户端服务提供者

注解构建

依据我们之前描述,我们首先需要定义一个@FeignWebFlux的注解,此处我们借助Trae来实现。


此处,Trae的思考过程还是比较完善的,其对@FeignClient注解的理解基本是到位的,给出的代码也是比较详细的,但其中的fallback,fallbackFactory等其实我们本次并不需要,所以对于其给出的代码,我们优化后代码如下。此次,我们仅保留url相关配置。

package org.webflux.remote.client.annotation;import java.lang.annotation.*;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignWebFlux {/*** 直接指定服务URL,优先级高于value*/String url() default "";/*** 所有方法的基础路径前缀*/String path() default "";}

在完成基础注解定以后,如果想实现对标有@FeignWebFlux的类实现代理,则需要我们扫描其对应包路径,而为了实现这一逻辑,通常我们会定义一个@Enablexxx的注解,然后借助
ImportBeanDefinitionRegistrar这一扩展点来实现。

package org.webflux.remote.client.annotation;import org.springframework.context.annotation.Import;
import org.webflux.remote.client.register.FeignWebFluxRegistrar;import java.lang.annotation.*;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(FeignWebFluxRegistrar.class) // 导入扫描注册器
public @interface EnableFeignWevFluxClients {String[] basePackages() default {};
}

在EnableFeignWevFluxClients注解内容,我们参照自动注入的原理,通过@Import注入我们自定义的Bean扫描器。

public class FeignWebFluxRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware{private Environment environment;@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {// 1. 获取@EnableFeignWevFluxClients(扫描路径)String[] basePackages = (String[]) importingClassMetadata.getAnnotationAttributes(EnableFeignWevFluxClients.class.getName()).get("basePackages");if (basePackages.length == 0) {// 默认扫描启动类所在包String defaultPackage = ClassUtils.getPackageName(importingClassMetadata.getClassName());basePackages = new String[]{defaultPackage};}// 2. 扫描指定包下所有标注@WebFluxClient的接口ClassPathScanningCandidateComponentProvider scanner = this.getScanner();scanner.addIncludeFilter(new AnnotationTypeFilter(FeignWebFlux.class)); // 只扫描带@WebFluxClient的接口for (String basePackage : basePackages) {Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);for (BeanDefinition beanDefinition : candidateComponents) {if (beanDefinition instanceof AnnotatedBeanDefinition) {AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;AnnotationMetadata metadata = annotatedBeanDefinition.getMetadata();// 只处理接口if (metadata.isInterface()) {registerFeignClient(registry, metadata);}}}}}protected ClassPathScanningCandidateComponentProvider getScanner() {return new ClassPathScanningCandidateComponentProvider(false, this.environment) {@Overrideprotected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {boolean isCandidate = false;if (beanDefinition.getMetadata().isIndependent() && !beanDefinition.getMetadata().isAnnotation()) {isCandidate = true;}return isCandidate;}};}// 注册FeignWebFlxu客户端(通过工厂Bean生成代理)private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata metadata) {String className = metadata.getClassName();// 构建工厂Bean的定义:FeignWebFluxClientFactoryBeanBeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignWebFluxClientFactoryBean.class);builder.addPropertyValue("type", className); // 接口类型// 注册BeanDefinition(Bean名称为接口全类名)registry.registerBeanDefinition(className, builder.getBeanDefinition());}@Overridepublic void setEnvironment(Environment environment) {this.environment = environment;}
}

远程调用工具构建

我们此处所定义的@FeignWebFlux其会接收到远程调用的地址信息,在接收到地址信息后,我们底层则WebClient来实现远程服务调用。

 

public class WebClientRestHandler implements RestHandler {private WebClient webClient;@Overridepublic void init(ServerInfo serverInfo) {this.webClient = WebClient.create(serverInfo.getUrl());}@Overridepublic Object invokeRest(MethodInfo methodInfo) {// 参数校验if (methodInfo == null) {return Mono.error(new IllegalArgumentException("MethodInfo must not be null"));}if (methodInfo.getHttpMethod() == null) {return Mono.error(new IllegalArgumentException("HttpMethod must not be null"));}if (methodInfo.getUrl() == null || methodInfo.getUrl().trim().isEmpty()) {return Mono.error(new IllegalArgumentException("Url must not be empty"));}checkWebClientInitialized();WebClient.RequestBodyUriSpec methodSpec = webClient.method(methodInfo.getHttpMethod());// 构建URI(支持路径参数和查询参数)methodSpec.uri(uriBuilder -> {if (methodInfo.getParams() != null && !methodInfo.getParams().isEmpty()) {return uriBuilder.path(methodInfo.getUrl()).build(methodInfo.getParams());} else {return uriBuilder.path(methodInfo.getUrl()).build();}});//        // 构建URI(支持路径参数和查询参数)
//        methodSpec.uri(uriBuilder -> {
//            // 处理路径参数
//            if (methodInfo.getParams() != null) {
//                uriBuilder = uriBuilder.path(methodInfo.getUrl()).build(methodInfo.getParams());
//            } else {
//                uriBuilder = uriBuilder.path(methodInfo.getUrl());
//            }
//            // 处理查询参数(如果MethodInfo支持)
//            if (methodInfo.getQueryParams() != null && !methodInfo.getQueryParams().isEmpty()) {
//                methodInfo.getQueryParams().forEach(uriBuilder::queryParam);
//            }
//            return uriBuilder.build();
//        });// 设置请求体WebClient.ResponseSpec responseSpec;if (methodInfo.getBody() != null) {responseSpec = methodSpec.contentType(MediaType.APPLICATION_JSON).body(methodInfo.getBody(), methodInfo.getBodyElementType()).accept(MediaType.APPLICATION_JSON).retrieve();} else {responseSpec = methodSpec.accept(MediaType.APPLICATION_JSON).retrieve();}// 处理HTTP错误状态码responseSpec = responseSpec.onStatus(HttpStatus::isError, clientResponse ->clientResponse.bodyToMono(String.class).flatMap(errorBody -> Mono.error(new WebClientResponseException(clientResponse.statusCode().value(),"Request failed with status code: " + clientResponse.statusCode(),clientResponse.headers().asHttpHeaders(),errorBody.getBytes(),null))));// 处理响应类型并统一异常处理if (methodInfo.isReturnFlux()) {return responseSpec.bodyToFlux(methodInfo.getReturnElementType()).onErrorResume(e -> Flux.error(convertException(e)));} else {return responseSpec.bodyToMono(methodInfo.getReturnElementType()).onErrorResume(e -> Mono.error(convertException(e)));}}// 异常转换辅助方法private Throwable convertException(Throwable e) {if (e instanceof WebClientResponseException) {return e;}return new RuntimeException("Remote service invocation failed: " + e.getMessage(), e);}// WebClient初始化检查private void checkWebClientInitialized() {if (this.webClient == null) {this.webClient = WebClient.builder().build();}}}

此处值得注意的是,Trae在构建代码时,出现了两个问题


问题1:在使用UriBuilder参数时,其中的build方法方法其实是会返回URL的类型,对于返回的URL则不能继续在进行构建,所以代码47行会报错。

问题2: Trae在构建代码时,对于已经提供形参的方法,其貌似并不会读取形参中的变量信息,而是会根据自己的思考,假设其中有某些变量,这一点需要多轮对话来解决。

代理构建

实现了注解的以及底层网络调用的构建,下一步则是对于@FeignWebFlux注解所标注类代理的构建。

public class JdkDynamicAopProxy implements ProxyCreator{RestHandler restHandler = new WebClientRestHandler();@Overridepublic Object createProxy(Class<?> type) {// 抽取服务器信息ServerInfo serverInfo = extractServerInfo(type);log.info("serverInfo :{}", serverInfo);// 初始化服务器信息(webClient)restHandler.init(serverInfo);return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{type}, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 抽取所有请求信息封装到 MethodInfo 对象MethodInfo methodInfo = extractMethodInfo(method, args);log.info("methodInfo :{}", methodInfo);// 调用Rest客户端 处理请求return restHandler.invokeRest(methodInfo);}// .... 省略其他代码

此处,借助JDK的动态代理,生成对应的代理来,然后借助之前我们构建的WebClientRestHandler完成远程调用。
详细代码可参考:

https://gitee.com/ThanksCode/webflux-remote-client

http://www.dtcms.com/a/454099.html

相关文章:

  • 网站建设学生作业成品如何做外文网站
  • 中学网站源码展示页网站怎么做排名
  • 江苏省建设工程协会网站外贸网站推广建站
  • 【C++】运算符替代写法全解析
  • 广州建设网站哪家好支付招聘网站怎么做费用
  • 免费扑克网站代码商城网站建设一般需要多少钱
  • kanass入门到实战(10) - 如何做好测试管理?
  • 网站制作呼和浩特12315可以查询营业执照吗
  • 全球主要城市时间对比,实时更新|昼夜区分|时间对比
  • 栈:“后进先出” 的艺术,撑起程序世界的底层骨架
  • JAVA算法练习题day32
  • 网站添加设置着陆页唐山网站优化
  • 做婚纱网站的目的seo的主要工作是什么
  • 上海哪家做网站校园无线网络设计方案
  • 深入解析Doris ZoneMap索引机制
  • (1)SwiftUI基础入门教程
  • wordpress整站搬家教程河池做网站
  • 怎么改版一个网站白酒网站设计
  • windows显示驱动开发-节能与 VSync 控制
  • 安徽省住房和城乡建设厅官方网站网站访问量大
  • 好的平面设计网站有哪些易云巢做营销型网站
  • 从0死磕全栈之Next.js 中的错误处理机制详解(App Router)
  • c语言贪吃蛇游戏开发
  • 360网站seo优化怎么做简单网站
  • 小说网站静态模板设计师设计费一般多少
  • 网站主机要怎么做跨境电商交3980元培训费可信吗
  • Coze源码分析-资源库-编辑知识库-后端源码-流程/技术/总结
  • 哺乳动物双等位基因表达的 “守护者”--解析 MSL2对基因剂量平衡与疾病机制的新启示--文献精读164
  • 阿里云网站建设需要多少钱开发软件公司都有哪些
  • CTFHub SQL注入通关笔记4:布尔盲注(手注法+脚本法)