(九)Spring Webflux
底层基于Netty实现的Web容器与请求/响应处理机制
参照:Spring WebFlux :: Spring Framework https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html
https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html
一、组件对比
| API功能 | Servlet-阻塞式Web | WebFlux-响应式Web | 
| 前端控制器 | DispatcherServlet | DispatcherHandler | 
| 处理器 | Controller | WebHandler/Controller | 
| 请求、响应 | ServletRequest、ServletResponse | ServerWebExchange: | 
| 过滤器 | Filter(HttpFilter) | WebFilter | 
| 异常处理器 | HandlerExceptionResolver | DispatchExceptionHandler | 
| Web配置 | @EnableWebMvc | @EnableWebFlux | 
| 自定义配置 | WebMvcConfigurer | WebFluxConfigurer | 
| 返回结果 | 任意 | Mono、Flux、任意 | 
| 发送REST请求 | RestTemplate | WebClient | 
Mono: 返回0|1 数据流
Flux:返回N数据流
二、依赖
maven可以找到 Spring Webflux 的依赖,自行选择版本添加到pom.xml文件中。


<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.4.4</version>
</dependency>三、Reactor Core
1、HttpHandler、HttpServer
public class Main{
        public static void main(String[] args) throws IOException {
        //快速自己编写一个能处理请求的服务器
        //1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
        HttpHandler handler = (ServerHttpRequest request,
                                   ServerHttpResponse response)->{
            URI uri = request.getURI();
            System.out.println(Thread.currentThread()+"请求进来:"+uri);
            //编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
//            response.getHeaders(); //获取响应头
//            response.getCookies(); //获取Cookie
//            response.getStatusCode(); //获取响应状态码;
//            response.bufferFactory(); //buffer工厂
//            response.writeWith() //把xxx写出去
//            response.setComplete(); //响应结束
            //数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
            //创建 响应数据的 DataBuffer
            DataBufferFactory factory = response.bufferFactory();
            //数据Buffer
            DataBuffer buffer = factory.wrap(new String(uri.toString() + " ==> Hello!").getBytes());
            // 需要一个 DataBuffer 的发布者
            return response.writeWith(Mono.just(buffer));
        };
        //2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
        //3、启动Netty服务器
        HttpServer.create()
                .host("localhost")
                .port(80)
                .handle(adapter) //用指定的处理器处理请求
                .bindNow(); //现在就绑定
        System.out.println("服务器启动完成....监听80,接受请求");
        System.in.read();
        System.out.println("服务器停止....");
    }
}2、DispatcherHandler
SpringMVC: DispatcherServlet;
SpringWebFlux: DispatcherHandler
请求流程:
- HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
- HandlerAdapter:处理器适配器;反射执行目标方法
- HandlerResultHandler:处理器结果处理器;
SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;
WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;
public Mono<Void> handle(ServerWebExchange exchange) { 
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
			return handlePreFlight(exchange);
		}
		return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings
				.concatMap(mapping -> mapping.getHandler(exchange)) //找每一个mapping看谁能处理请求
				.next() //直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
				.switchIfEmpty(createNotFoundError()) //如果没拿到这个元素,则响应404错误;
				.onErrorResume(ex -> handleDispatchError(exchange, ex)) //异常处理,一旦前面发生异常,调用处理异常
				.flatMap(handler -> handleRequestWith(exchange, handler)); //调用方法处理请求,得到响应结果
	}- 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
- 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
- 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
- 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;
源码中的核心两个:
- handleRequestWith: 编写了handlerAdapter怎么处理请求
- handleResult: String、User、ServerSendEvent、Mono、Flux ...
四、注解开发
1.方法入参
Method Arguments :: Spring Framework https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html
| Controller method argument | Description | 
| ServerWebExchange | 封装了请求和响应对象的对象; 自定义获取数据、自定义响应 | 
| ServerHttpRequest, ServerHttpResponse | 请求、响应 | 
| WebSession | 访问Session对象 | 
| java.security.Principal | |
| org.springframework.http.HttpMethod | 请求方式 | 
| java.util.Locale | 国际化 | 
| java.util.TimeZone + java.time.ZoneId | 时区 | 
| @PathVariable | 路径变量 | 
| @MatrixVariable | 矩阵变量 | 
| @RequestParam | 请求参数 | 
| @RequestHeader | 请求头; | 
| @CookieValue | 获取Cookie | 
| @RequestBody | 获取请求体,Post、文件上传 | 
| HttpEntity<B> | 封装后的请求对象 | 
| @RequestPart | 获取文件上传的数据 multipart/form-data. | 
| java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap. | Map、Model、ModelMap | 
| @ModelAttribute | |
| Errors, BindingResult | 数据校验,封装错误 | 
| SessionStatus + class-level @SessionAttributes | |
| UriComponentsBuilder | For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links. | 
| @SessionAttribute | |
| @RequestAttribute | 转发请求的请求域数据 | 
| Any other argument | 所有对象都能作为参数: | 
2、方法返回值
sse和websocket区别:
 ● SSE:单工;请求过去以后,等待服务端源源不断的数据
 ● websocket:双工: 连接建立后,可以任何交互;
| Controller method return value | Description | 
| @ResponseBody | 把响应数据写出去,如果是对象,可以自动转为json | 
| HttpEntity<B>, ResponseEntity<B> | ResponseEntity:支持快捷自定义响应内容 | 
| HttpHeaders | 没有响应内容,只有响应头 | 
| ErrorResponse | 快速构建错误响应 | 
| ProblemDetail | SpringBoot3; | 
| String | 就是和以前的使用规则一样; | 
| View | 直接返回视图对象 | 
| java.util.Map, org.springframework.ui.Model | 以前一样 | 
| @ModelAttribute | 以前一样 | 
| Rendering | 新版的页面跳转API; 不能标注 @ResponseBody 注解 | 
| void | 仅代表响应完成信号 | 
| Flux<ServerSentEvent>, Observable<ServerSentEvent>, or other reactive type | 使用 text/event-stream 完成SSE效果 | 
| Other return values | 未在上述列表的其他返回值,都会当成给页面的数据; | 
五、文件上传
Multipart Content :: Spring Framework https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html使用注解@RequestPart,可简便处理复杂的表单
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html使用注解@RequestPart,可简便处理复杂的表单
@PostMapping("/")
public String handle(@RequestPart("host") String host, 
		@RequestPart("file") FilePart file) { 
	// ...
}六、自定义Flux配置-WebFluxConfigurer
容器中注入这个类型的组件,重写底层逻辑
@Configuration
public class MyWebConfiguration {
    //配置底层
    @Bean
    public WebFluxConfigurer webFluxConfigurer(){
        return new WebFluxConfigurer() {
            @Override
            public void addCorsMappings(CorsRegistry registry) {
                registry.addMapping("/**")
                        .allowedHeaders("*")
                        .allowedMethods("*")
                        .allowedOrigins("localhost");
            }
        };
    }
}七、过滤-Filter
@Component
public class MyWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        System.out.println("请求处理放行到目标方法之前...");
        Mono<Void> filter = chain.filter(exchange); //放行
        //流一旦经过某个操作就会变成新流
        Mono<Void> voidMono = filter.doOnError(err -> {
                    System.out.println("目标方法异常以后...");
                }) // 目标方法发生异常后做事
                .doFinally(signalType -> {
                    System.out.println("目标方法执行以后...");
                });// 目标方法执行之后
        //上面执行不花时间。
        return voidMono; //看清楚返回的是谁!!!
    }
}八、错误处理
这些是 Java 中常见的响应式编程错误处理方法,通过 Project Reactor 来处理流中的错误。你可以根据实际情况选择不同的错误处理策略,确保你的应用在出现问题时不会崩溃,而是能优雅地恢复或者处理错误。
1. 使用 onErrorResume 处理错误
 
onErrorResume 可以捕获错误并恢复一个备用值(如另一个流),避免整个流崩溃。
public class ErrorHandlingExample {
    public static void main(String[] args) {
        Mono.error(new RuntimeException("Something went wrong"))
            .onErrorResume(error -> {
                System.out.println("Caught error: " + error.getMessage());
                return Mono.just("Fallback data"); // 返回备用值
            })
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//Caught error: Something went wrong
//Received: Fallback data
//Completed2. 使用 retry 重试逻辑
 
retry 操作符可以在发生错误时自动重试,通常用于临时故障的恢复。
public class RetryExample {
    public static void main(String[] args) {
        Mono<String> source = Mono.fromCallable(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Temporary error");
            }
            return "Success!";
        });
        source
            .retry(3)  // 重试3次
            .doOnRetry(retrySignal -> System.out.println("Retry attempt: " + retrySignal.totalRetries()))
            .onErrorResume(error -> Mono.just("Failed after retries"))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//Retry attempt: 1
//Retry attempt: 2
//Received: Success!
//Completed3. 使用 onErrorMap 转换错误
 
onErrorMap 允许你将捕获到的错误转换为不同类型的错误或自定义错误。
public class ErrorMappingExample {
    public static void main(String[] args) {
        Mono.error(new RuntimeException("Original error"))
            .onErrorMap(error -> new IllegalArgumentException("Mapped error: " + error.getMessage()))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Caught error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//caught error: Mapped error: Original error4. 使用 doOnError 进行额外的错误处理
 
doOnError 可以用来在流中出现错误时执行额外的副作用,比如日志记录。
public class DoOnErrorExample {
    public static void main(String[] args) {
        Mono.error(new RuntimeException("Something went wrong"))
            .doOnError(error -> System.out.println("Error occurred: " + error.getMessage()))
            .onErrorReturn("Fallback data") // 错误时返回备用数据
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//Error occurred: Something went wrong
//Received: Fallback data
//Completed5. 使用 doFinally 清理资源
 
doFinally 会在流完成时执行,无论是正常完成还是发生错误,常用于资源清理。
public class DoFinallyExample {
    public static void main(String[] args) {
         Mono.just("Hello")
            .doFinally(signalType -> System.out.println("Cleanup after signal: " + signalType))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//Received: Hello
//Cleanup after signal: COMPLETED
//Completed6. 使用 onErrorReturn 返回默认值
 
onErrorReturn 可以在发生错误时返回一个默认的值,避免流崩溃。
public class OnErrorReturnExample {
    public static void main(String[] args) {
        Mono.error(new RuntimeException("An error occurred"))
            .onErrorReturn("Default value")  // 错误时返回默认值
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}
//输出:
//Received: Default value
//Completed什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!
