SpringWebFlux:响应式Web框架
目录
一、概念
1.1 解决的问题
1.2 核心定义
1.3 核心特性
1.4 适用场景与不适用场景
1.5 与SpringMVC对比
二、架构与原理
2.1 核心架构概览
2.2 核心组件
2.3 请求处理流程
三、使用
3.1 项目搭建
3.2 创建响应式 REST API
3.2.1 方式1:基于注解的控制器(推荐初学者)
3.2.2 方式2:函数式端点(更灵活的配置)
3.3 配置 MongoDB(可选)
3.4 测试
3.5 运行和部署
3.6 性能优化建议
3.7 监控和指标
一、概念
1.1 解决的问题
-
高并发与高吞吐量:传统的Spring MVC基于Servlet API,是同步阻塞I/O模型。每个请求都会占用一个线程。当有大量并发请求,特别是那些涉及慢速I/O操作(如数据库调用、外部API请求)时,线程池中的线程会被迅速耗尽,导致性能瓶颈和延迟增加。
-
资源效率:创建和切换线程是有成本的。在传统的“一个请求一个线程”模型中,线程数量是有限的(通常几百到几千个),大量线程会导致内存消耗和CPU上下文切换开销巨大。
-
背压(Backpressure):在数据流处理中,当生产者的数据产生速度远大于消费者的处理速度时,如何防止消费者被“压垮”?这就是背压问题,传统的编程模型很难优雅地处理。响应式流规范(Reactive Streams)提供了背压机制,允许消费者主动告知生产者自己能处理多少数据,从而实现流量的动态控制。这是传统编程模型难以做到的。
WebFlux就是Spring提供的解决上述问题的答案。它旨在用少量的、固定的线程来处理高并发请求,从而提高系统的可伸缩性。
1.2 核心定义
-
一个完全非阻塞、异步的Web框架:它建立在Reactive Streams规范之上,允许以声明式的方式编写异步代码,从而高效地处理背压。
-
运行在非Servlet容器上:它不依赖于Servlet API,可以运行在Netty、Undertow等支持NIO的服务器上。当然,它也可以运行在支持Servlet 3.1+的非阻塞I/O的Servlet容器(如Tomcat, Jetty)上。
-
Reactive Streams的实现:其核心是Project Reactor库,它提供了
Flux
和Mono
这两个响应式发布者(Publisher)类型。
核心目标:提供一个非阻塞的、异步的 Web 框架,能够用少量、固定的线程来处理高并发请求,从而在相同硬件资源下提供更高的吞吐量和可伸缩性。
核心思想: 从命令式、阻塞的编程模型(一步一步执行)转向声明式、非阻塞的响应式编程模型(描述数据流应该做什么)。自己定义的是一个处理数据的流水线,数据到来时才触发执行。
类比:就像用Java 8的Stream API,但Stream是用于处理内存中的集合(拉模式),而Reactive Streams是处理异步数据流(推模式)。
1.3 核心特性
-
非阻塞I/O:基于事件循环,单线程处理大量并发连接
-
背压(Backpressure):消费者控制数据流速,防止内存溢出
-
函数式编程:支持Router Functions函数式端点定义
1.4 适用场景与不适用场景
适用场景(WebFlux 大放异彩的地方):
-
高并发、低延迟的 I/O 密集型应用:如实时通信、聊天系统、实时数据流处理、API 网关等。
-
需要处理长时间异步任务:如服务器推送(SSE)、WebSocket。
-
需要精细控制数据流的背压:如处理来自 Kafka、RabbitMQ 等消息中间件的高速数据流。
-
已使用响应式技术栈:如果你的数据层(如 MongoDB, Cassandra)本身就支持响应式驱动,使用 WebFlux 可以构建端到端的非阻塞应用。
不适用场景(Spring MVC 依然是更好的选择):
-
CPU 密集型应用:非阻塞模型对 CPU 密集型任务没有优势,复杂的计算会阻塞事件循环线程,反而降低性能。
-
强依赖阻塞式技术栈:如果应用严重依赖 JDBC、JPA(Hibernate)等阻塞式技术,切换到 WebFlux 的成本很高,且收益有限。
-
团队不熟悉响应式编程:响应式编程的调试和问题排查比命令式编程更复杂,需要团队具备相应的技能。
1.5 与SpringMVC对比
特性 | Spring MVC | Spring WebFlux |
---|---|---|
编程范式 | 命令式、同步、阻塞 | 声明式、异步、非阻塞 |
并发模型 | 每个请求一个线程(Thread-per-request) | 事件循环(Event Loop),少量线程处理大量请求 |
核心返回值 | Object , List<T> , HttpEntity<T> | Mono<T> , Flux<T> |
背压支持 | 不支持 | 原生支持 |
数据访问 | 同步 JDBC, JPA (Hibernate) | 响应式驱动,如 R2DBC, Reactive MongoDB, Cassandra |
服务器要求 | Servlet 容器 (Tomcat, Jetty) | Netty, Servlet 3.1+ 容器 |
学习曲线 | 相对平缓,符合传统 Java 开发习惯 | 较陡峭,需要理解响应式编程概念 |
二、架构与原理
2.1 核心架构概览
Spring WebFlux 的架构是围绕响应式流(Reactive Streams) 和函数式编程思想构建的。其核心分层如下:
-
底层容器:不再依赖传统的 Servlet 容器(Tomcat, Jetty),而是支持 Netty、Undertow 以及 Servlet 3.1+ 非阻塞 I/O 容器。Netty 是其默认和首选,因为它从设计之初就是事件驱动、异步非阻塞的。
-
Web 核心层:这是 WebFlux 框架的核心,定义了处理 HTTP 请求的抽象 API。
-
响应式编程模型:基于 Project Reactor(
Mono
和Flux
类型),贯穿整个架构。 -
编程模型:提供两种风格来定义端点(Endpoint):
-
基于注解的模型:与 Spring MVC 类似的
@Controller
,@RequestMapping
等注解。 -
函数式端点模型:基于 Lambda 的轻量级、函数式编程模型。
-
2.2 核心组件
-
HttpHandler
:这是最底层的、与容器无关的 HTTP 请求处理契约。它只有一个方法Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response)
。不同的服务器适配器(Netty, Undertow, Servlet)会将自己的请求/响应转换为ServerHttpRequest
和ServerHttpResponse
,然后调用HttpHandler
。 -
WebHandler
API:这是 Spring WebFlux 的核心抽象。DispatcherHandler
是WebHandler
的主要实现,其角色类似于 Spring MVC 中的DispatcherServlet
,是请求处理的中央调度器。 -
DispatcherHandler
:它是请求处理的“大脑”,不直接处理业务逻辑,而是协调以下三个核心组件:-
HandlerMapping
:根据请求信息(如URL、方法、头等)找到对应的处理器(Handler)。对于注解控制器,处理器是一个方法;对于函数式端点,是一个RouterFunction
。 -
HandlerAdapter
:调用实际的处理器。它知道如何与不同类型的处理器交互(例如,调用一个@Controller
方法,或执行一个RouterFunction
)。 -
HandlerResultHandler
:将处理器返回的结果(一个HandlerResult
对象,通常包含Mono
或Flux
数据)与适当的渲染器 结合,最终写入到ServerHttpResponse
中。
-
2.3 请求处理流程
以一个 GET /users
请求为例,追踪其在 WebFlux 中的完整生命周期:
-
1、接收请求:
-
Netty(或其他服务器)接收到一个 HTTP 请求。
-
服务器适配器将原生请求/响应对象包装成 Spring 的
ServerHttpRequest
和ServerHttpResponse
。 -
调用顶层的
HttpHandler
的handle
方法。
-
-
2、调度阶段(
DispatcherHandler
):-
DispatcherHandler
被调用,开始其工作。 -
步骤 A:查找处理器(
HandlerMapping
)-
DispatcherHandler
遍历所有HandlerMapping
列表,询问:“谁能处理这个/users
请求?” -
假设我们使用的是注解控制器,一个
RequestMappingHandlerMapping
会找到匹配的@GetMapping("/users")
方法,并返回一个表示该方法的HandlerMethod
对象。
-
-
-
3、执行阶段(
HandlerAdapter
):-
步骤 B:获取适配器
-
DispatcherHandler
遍历HandlerAdapter
列表,询问:“谁能执行这个HandlerMethod
类型的处理器?” -
RequestMappingHandlerAdapter
会响应说“我能”。
-
-
步骤 C:执行处理器
-
RequestMappingHandlerAdapter
开始工作:-
进行数据绑定、验证、类型转换等(参数解析器
HandlerMethodArgumentResolver
负责将请求数据转换为方法参数)。 -
关键点:它异步地调用控制器方法。控制器方法返回一个
Mono<List<User>>
,而不是一个实际的List<User>
。此时,数据还没有被获取。 -
它将这个
Mono
包装成一个HandlerResult
对象并返回。
-
-
-
-
4、结果处理阶段(
HandlerResultHandler
):-
步骤 D:处理结果
-
DispatcherHandler
遍历HandlerResultHandler
列表,询问:“谁能处理这个返回类型为Mono<List<User>>
的HandlerResult
?” -
ResponseBodyResultHandler
(或ViewResolutionResultHandler
)会响应。因为我们通常返回@ResponseBody
或@RestController
。
-
-
步骤 E:异步渲染与写入
-
这是最核心、最体现非阻塞特性的地方。
-
ResponseBodyResultHandler
并不会阻塞等待Mono
发出数据。相反,它订阅(Subscribe) 这个Mono
,并注册一系列回调函数:-
onNext:当
Mono
发出一个List<User>
数据时,调用此回调。渲染器(如 Jackson JSON 编码器)会将这个 List 转换为 JSON 字节流。 -
onError:如果过程中发生错误(如数据库异常),调用此回调来处理错误。
-
onComplete:当整个数据流结束时,调用此回调,最终关闭响应。
-
-
这些字节流通过非阻塞 I/O 被写入到
ServerHttpResponse
的底层通道中。
-
-
-
5、背压(Backpressure):
-
如果控制器返回的是
Flux<User>
(一个包含多个元素的流),响应式框架的背压机制会发挥作用。 -
当网络客户端处理速度较慢时,它会通过 TCP 流量控制间接地施加背压。WebFlux 和 Netty 能感知到这一点,从而通知上游的
Flux
放慢数据发射速度,防止服务器端积压过多数据导致内存溢出。
-
三、使用
3.1 项目搭建
步骤1:创建项目
使用 Spring Initializr 创建项目:
# 使用 curl 创建项目
curl https://start.spring.io/starter.zip \-d dependencies=webflux \-d type=maven-project \-d groupId=com.example \-d artifactId=webflux-demo \-o webflux-demo.zip# 解压并进入项目目录
unzip webflux-demo.zip
cd webflux-demo
或者手动创建 pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/></parent><groupId>com.example</groupId><artifactId>webflux-demo</artifactId><version>0.0.1-SNAPSHOT</version><properties><java.version>11</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
步骤2:创建主应用类
package com.example.webfluxdemo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class WebfluxDemoApplication {public static void main(String[] args) {SpringApplication.run(WebfluxDemoApplication.class, args);}
}
3.2 创建响应式 REST API
3.2.1 方式1:基于注解的控制器(推荐初学者)
创建实体类
package com.example.webfluxdemo.model;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "users")
public class User {@Idprivate String id;private String name;private String email;private Integer age;
}
创建 Repository(使用 Spring Data MongoDB Reactive)
package com.example.webfluxdemo.repository;import com.example.webfluxdemo.model.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {Flux<User> findByName(String name);Flux<User> findByAgeGreaterThan(Integer age);
}
创建 Controller
package com.example.webfluxdemo.controller;import com.example.webfluxdemo.model.User;
import com.example.webfluxdemo.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;@RestController
@RequestMapping("/api/users")
public class UserController {@Autowiredprivate UserRepository userRepository;// 获取所有用户@GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)public Flux<User> getAllUsers() {return userRepository.findAll();}// 根据ID获取用户@GetMapping("/{id}")public Mono<ResponseEntity<User>> getUserById(@PathVariable String id) {return userRepository.findById(id).map(user -> ResponseEntity.ok(user)).defaultIfEmpty(ResponseEntity.notFound().build());}// 创建用户@PostMapping@ResponseStatus(HttpStatus.CREATED)public Mono<User> createUser(@RequestBody User user) {return userRepository.save(user);}// 更新用户@PutMapping("/{id}")public Mono<ResponseEntity<User>> updateUser(@PathVariable String id, @RequestBody User user) {return userRepository.findById(id).flatMap(existingUser -> {existingUser.setName(user.getName());existingUser.setEmail(user.getEmail());existingUser.setAge(user.getAge());return userRepository.save(existingUser);}).map(updatedUser -> ResponseEntity.ok(updatedUser)).defaultIfEmpty(ResponseEntity.notFound().build());}// 删除用户@DeleteMapping("/{id}")public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {return userRepository.findById(id).flatMap(existingUser ->userRepository.delete(existingUser).then(Mono.just(ResponseEntity.ok().<Void>build()))).defaultIfEmpty(ResponseEntity.notFound().build());}// 流式端点 - 每秒发送一个用户@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<User> streamUsers() {return userRepository.findAll().delayElements(Duration.ofSeconds(1));}
}
3.2.2 方式2:函数式端点(更灵活的配置)
创建 Handler
package com.example.webfluxdemo.handler;import com.example.webfluxdemo.model.User;
import com.example.webfluxdemo.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;import static org.springframework.web.reactive.function.BodyInserters.fromValue;@Component
public class UserHandler {@Autowiredprivate UserRepository userRepository;public Mono<ServerResponse> getAllUsers(ServerRequest request) {return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(userRepository.findAll(), User.class);}public Mono<ServerResponse> getUserById(ServerRequest request) {String id = request.pathVariable("id");return userRepository.findById(id).flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(user))).switchIfEmpty(ServerResponse.notFound().build());}public Mono<ServerResponse> createUser(ServerRequest request) {return request.bodyToMono(User.class).flatMap(user -> userRepository.save(user)).flatMap(savedUser -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(savedUser)));}public Mono<ServerResponse> deleteUser(ServerRequest request) {String id = request.pathVariable("id");return userRepository.findById(id).flatMap(existingUser ->userRepository.delete(existingUser).then(ServerResponse.ok().build())).switchIfEmpty(ServerResponse.notFound().build());}
}
配置 Router
package com.example.webfluxdemo.config;import com.example.webfluxdemo.handler.UserHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;import static org.springframework.web.reactive.function.server.RequestPredicates.*;@Configuration
public class RouterConfig {@Autowiredprivate UserHandler userHandler;@Beanpublic RouterFunction<ServerResponse> userRoutes() {return RouterFunctions.route(GET("/functional/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::getAllUsers).andRoute(GET("/functional/users/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUserById).andRoute(POST("/functional/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::createUser).andRoute(DELETE("/functional/users/{id}"), userHandler::deleteUser);}
}
3.3 配置 MongoDB(可选)
添加 MongoDB 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
配置 application.yml
spring:data:mongodb:uri: mongodb://localhost:27017/webflux_demodatabase: webflux_demoserver:port: 8080logging:level:org.springframework.data.mongodb.core.ReactiveMongoTemplate: DEBUG
3.4 测试
单元测试
package com.example.webfluxdemo.controller;import com.example.webfluxdemo.model.User;
import com.example.webfluxdemo.repository.UserRepository;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.Arrays;
import java.util.List;import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;@WebFluxTest(UserController.class)
class UserControllerTest {@Autowiredprivate WebTestClient webTestClient;@MockBeanprivate UserRepository userRepository;@Testvoid shouldGetAllUsers() {User user1 = new User("1", "John", "john@example.com", 30);User user2 = new User("2", "Jane", "jane@example.com", 25);List<User> users = Arrays.asList(user1, user2);given(userRepository.findAll()).willReturn(Flux.fromIterable(users));webTestClient.get().uri("/api/users").exchange().expectStatus().isOk().expectBodyList(User.class).hasSize(2).contains(user1, user2);}@Testvoid shouldGetUserById() {User user = new User("1", "John", "john@example.com", 30);given(userRepository.findById("1")).willReturn(Mono.just(user));webTestClient.get().uri("/api/users/1").exchange().expectStatus().isOk().expectBody().jsonPath("$.name").isEqualTo("John").jsonPath("$.email").isEqualTo("john@example.com");}@Testvoid shouldCreateUser() {User user = new User(null, "John", "john@example.com", 30);User savedUser = new User("1", "John", "john@example.com", 30);given(userRepository.save(any(User.class))).willReturn(Mono.just(savedUser));webTestClient.post().uri("/api/users").contentType(MediaType.APPLICATION_JSON).bodyValue(user).exchange().expectStatus().isCreated().expectBody().jsonPath("$.id").isEqualTo("1").jsonPath("$.name").isEqualTo("John");}
}
集成测试
package com.example.webfluxdemo;import com.example.webfluxdemo.model.User;
import com.example.webfluxdemo.repository.UserRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;import java.util.Arrays;
import java.util.List;@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class WebfluxDemoApplicationTests {@Autowiredprivate WebTestClient webTestClient;@Autowiredprivate UserRepository userRepository;@BeforeEachvoid setUp() {userRepository.deleteAll().block();}@Testvoid testUserCRUDOperations() {User user = new User(null, "Test User", "test@example.com", 30);// CreateUser createdUser = webTestClient.post().uri("/api/users").contentType(MediaType.APPLICATION_JSON).body(Mono.just(user), User.class).exchange().expectStatus().isCreated().expectBody(User.class).returnResult().getResponseBody();// ReadwebTestClient.get().uri("/api/users/{id}", createdUser.getId()).exchange().expectStatus().isOk().expectBody().jsonPath("$.name").isEqualTo("Test User");// Updateuser.setName("Updated User");webTestClient.put().uri("/api/users/{id}", createdUser.getId()).contentType(MediaType.APPLICATION_JSON).body(Mono.just(user), User.class).exchange().expectStatus().isOk().expectBody().jsonPath("$.name").isEqualTo("Updated User");// DeletewebTestClient.delete().uri("/api/users/{id}", createdUser.getId()).exchange().expectStatus().isOk();}
}
3.5 运行和部署
运行应用
// 主类已包含,直接运行
@SpringBootApplication
public class WebfluxDemoApplication {public static void main(String[] args) {SpringApplication.run(WebfluxDemoApplication.class, args);}
}
使用 Maven 运行
mvn spring-boot:run
打包部署
mvn clean package
java -jar target/webflux-demo-0.0.1-SNAPSHOT.jar
3.6 性能优化建议
配置优化
server:port: 8080netty:connection-timeout: 2000msidle-timeout: 30000msspring:codec:max-in-memory-size: 10MBlogging:level:reactor.netty: DEBUG
背压处理
@GetMapping("/users/backpressure")
public Flux<User> getUsersWithBackpressure() {return userRepository.findAll().onBackpressureBuffer(1000) // 设置缓冲区大小.delayElements(Duration.ofMillis(10)); // 控制发射速率
}
3.7 监控和指标
添加 Actuator
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置监控端点
management:endpoints:web:exposure:include: health,info,metricsendpoint:health:show-details: always