当前位置: 首页 > news >正文

RPC 同步与异步之使用Spring WebFlux + WebClient或Netty + Reactor

1. RPC 同步与异步

  • 同步 RPC:调用方发起请求后,会一直阻塞等待远程方法返回结果。
  • 异步 RPC:调用方发起请求后,不会阻塞等待,而是通过回调、Future、Promise 或者其他方式来获取结果。

2. 响应式编程

  • 响应式编程是一种编程范式,强调 非阻塞、事件驱动、数据流处理
  • 响应式编程一般会用 Reactive Streams(如 Project Reactor、RxJava) 来处理异步数据流。

3. Netty 结合 RPC 的两种设计

如果使用 Netty 作为底层通信,可以设计两种 RPC 模式:

  1. 同步 RPC(基于 Netty,但仍然同步处理)
    • Netty 作为底层传输,但调用方仍然 同步等待 远程调用的返回值。
    • 适用于需要简单封装 Netty 但保持传统 RPC 体验的场景。
  2. 异步响应式 RPC(结合 Netty + 响应式编程)
    • Netty 本身是 事件驱动、异步的,可以与 响应式编程(如 Reactor) 结合,让 RPC 具备 流式、异步、非阻塞 特性。
    • 典型做法:
      • 使用 CompletableFuture、RxJava、Reactor(Mono/Flux) 等返回异步响应。
      • 例如 Dubbo3 支持 Reactive RPC,返回 Mono<T>

4. 选择哪种方式?

  • 如果你的调用方代码还是同步的,那即使 Netty 异步,最终调用也是同步的,不太能发挥 Netty 的异步特性。
  • 如果想提高吞吐量、减少线程阻塞,使用响应式(如 Reactor)可以提升性能,适合 高并发、流式数据处理的 RPC 调用

如果你要调用多个第三方接口,并希望以 响应式(Reactive) 的方式处理,通常可以使用 Reactor(Project Reactor)RxJava 来组合多个异步请求,而底层可以用 Netty + WebClient 或异步 HTTP 客户端 来处理非阻塞调用。


1. 传统方式(阻塞模式)

问题:如果有 3 个第三方接口,传统的 RestTemplate 或同步 HttpClient 需要依次等待它们返回,导致整体耗时长:

java复制编辑String result1 = callApi1();  // 耗时 100ms
String result2 = callApi2();  // 耗时 200ms
String result3 = callApi3();  // 耗时 150ms
// 总耗时 = 100 + 200 + 150 = 450ms

这种同步方式不能充分利用多线程,导致性能低。


2. 响应式方式(非阻塞 + 并发执行)

如果你使用 Spring WebFlux + WebClientNetty + Reactor,你可以让多个 API 并行调用,显著减少总耗时。

(1) 使用 WebClient(Spring WebFlux 方案)

Spring 提供的 WebClient 基于 Netty,支持 完全异步 & 非阻塞 调用:

import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

WebClient webClient = WebClient.create();

// 并行调用 3 个接口,返回 Mono(单值异步流)
Mono<String> result1 = webClient.get().uri("https://api.example.com/1").retrieve().bodyToMono(String.class);
Mono<String> result2 = webClient.get().uri("https://api.example.com/2").retrieve().bodyToMono(String.class);
Mono<String> result3 = webClient.get().uri("https://api.example.com/3").retrieve().bodyToMono(String.class);

// 合并多个 Mono,并行执行,合并结果
Mono<List<String>> combined = Mono.zip(result1, result2, result3)
    .map(tuple -> List.of(tuple.getT1(), tuple.getT2(), tuple.getT3()));

// 订阅并获取结果
combined.subscribe(results -> System.out.println("最终结果: " + results));

优点

  • 并行调用,多个 API 同时发出请求,减少等待时间。
  • 非阻塞,不会阻塞线程,提高吞吐量。

WebClient创建默认使用的是reactor.netty.http.client.HttpClient在reactor-netty-http-1.2.2.jar

在这里插入图片描述


(2) 使用 Netty + Reactor 直接发送异步请求

如果你不使用 Spring WebFlux,而是自己基于 Netty + Reactor 实现:

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

HttpClient client = HttpClient.create();

Mono<String> result1 = client.get().uri("https://api.example.com/1").responseContent().aggregate().asString();
Mono<String> result2 = client.get().uri("https://api.example.com/2").responseContent().aggregate().asString();
Mono<String> result3 = client.get().uri("https://api.example.com/3").responseContent().aggregate().asString();

Mono<List<String>> combined = Mono.zip(result1, result2, result3)
    .map(tuple -> List.of(tuple.getT1(), tuple.getT2(), tuple.getT3()));

combined.subscribe(results -> System.out.println("最终结果: " + results));

适用于高性能 Netty 服务器,完全异步 & 事件驱动


3. 处理多个响应的场景

在实际项目中,你可能需要:

  1. 等待所有 API 完成Mono.zip()
  2. 谁先返回就先处理Flux.merge()
  3. 某个 API 超时就降级.timeout(Duration.ofSeconds(3))
  4. 错误重试.retry(3)

示例:

import org.springframework.web.reactive.function.client.WebClient;
        WebClient webClient = WebClient.create();

        Mono<String> result1 = webClient.get().uri("https://api.example.com/1")
                .retrieve().bodyToMono(String.class)
                .timeout(Duration.ofSeconds(3))  // 超时
                .retry(3)  // 失败时重试 3 次
                .onErrorReturn("默认值1")  // 失败降级
                .doOnNext(res -> System.out.println("result1 成功: " + res))
                .doOnError(e -> System.err.println("result1 失败: " + e.getMessage()));

        Mono<String> result2 = webClient.get().uri("https://api.example.com/2")
                .retrieve().bodyToMono(String.class)
                .onErrorResume(e -> Mono.just("备用接口返回值"))  // 降级方案
                .doOnNext(res -> System.out.println("result2 成功: " + res))
                .doOnError(e -> System.err.println("result2 失败: " + e.getMessage()));

        Mono.zip(result1, result2)
                .doOnSuccess(results -> System.out.println("合并结果: " + results))
                .doOnError(e -> System.err.println("合并失败: " + e.getMessage()))
                .block(); // **同步阻塞,确保 main 线程不会提前结束**

总结

方案实现方式适用场景
同步调用(阻塞)RestTemplate简单但性能低
异步 + 并行调用WebClient推荐,高性能、适合 WebFlux
Netty + ReactorHttpClient(Netty)适用于高并发后端系统

如果你的项目本身是基于 Spring WebFlux,建议用 WebClient;如果是 Netty 服务器,可以直接用 Netty + Reactor。这样可以同时调用多个 API,提高吞吐量,并避免阻塞线程。

相关文章:

  • ruoyi-vue部署 linux 系统项目安装部署 oa 项目部署 (合集)
  • 2025年湖南建筑安全员B证备考资料
  • 【AI论文】LEGO拼图:大型语言模型在多步骤空间推理方面的表现如何?
  • 【SPP】蓝牙串口协议(SPP)深度解析:从 RS232 仿真到设备互联的技术实现
  • 基于深度学习的图像超分辨率技术研究与实现
  • 格雷码、汉明码,CRC校验的区别
  • Vue3.X项目中包依赖的解析与安装出现问题如何解决?
  • 21天Python计划:python下载和开发工具介绍
  • 【Linux】进程的详讲(上)
  • 开源测试用例管理平台
  • beanie.exceptions.CollectionWasNotInitialized
  • L2正则化:优化模型的平滑之道
  • JAVA 应用实现 APM 自动注入(Docker 篇)
  • HX324双运算放大器:赋能万物互联时代的信号处理基石
  • C++Primer学习(13.6 对象移动)
  • K8S学习之基础六十一:k8s中部署helm
  • Scala的数据类型
  • Jmeter-负载测试
  • HarmonyOS WebSocket全场景应用开发深度解析
  • vllm+openwebui,玩转私有化AI
  • 米拓建站免费模板/上海自媒体推广
  • 企业网站建设ppt介绍/网络营销的定义是什么
  • 厦门比较好的网站设计公司/数据分析软件哪个最好用
  • oss可以做视频网站吗/济南市新闻最新消息
  • 做二手货车网站/如何去除痘痘效果好
  • 现在网站开发语言有/泉州百度推广排名优化