当前位置: 首页 > news >正文

gRPC从0到1系列【24】

文章目录

  • gRPC拦截器
      • 7.4.6 如何注册ClientInterceptor?
      • 7.4.7 注意事项
      • 7.4.7 注意事项
      • 7.4.8 拦截器执行流程
      • 7.4.9 🆕 重试拦截器
      • 7.4.10 🚀超时控制拦截器
      • 7.4.11 拦截器最佳实践

gRPC拦截器

7.4.6 如何注册ClientInterceptor?

✅ 1. 对于阻塞/异步 Stub:

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(channel).withInterceptors(new AuthInterceptor("my-token"), new LoggingInterceptor());

✅ 2. 多个拦截器顺序很重要!

  • 先注册的拦截器更靠近应用代码
  • 后注册的拦截器更靠近网络层

7.4.7 注意事项

  1. 线程安全:拦截器实例通常会被多个线程共享,应设计为无状态或线程安全。
  2. 性能影响:每个拦截器都会增加调用开销,避免在拦截器中做耗时操作。
  3. 异常处理:在 start()sendMessage() 中抛出异常会中断调用,需谨慎处理。
  4. 不要修改不可变对象:如 MethodDescriptor 是不可变的,不应尝试修改。

7.4.7 注意事项

  1. 线程安全:拦截器实例通常会被多个线程共享,应设计为无状态或线程安全。
  2. 性能影响:每个拦截器都会增加调用开销,避免在拦截器中做耗时操作。
  3. 异常处理:在 start()sendMessage() 中抛出异常会中断调用,需谨慎处理。
  4. 不要修改不可变对象:如 MethodDescriptor 是不可变的,不应尝试修改。

ClientInterceptor 是 gRPC 客户端实现关注点分离(Separation of Concerns)的关键工具。通过它,你可以将通用逻辑(如认证、日志、监控)从业务代码中剥离,提高代码的可维护性和复用性。

合理使用拦截器,能让你的 gRPC 客户端更健壮、可观测、安全。

7.4.8 拦截器执行流程

客户端代码日志拦截器认证拦截器重试拦截器超时拦截器gRPC服务器发起RPC调用记录开始时间生成请求ID传递调用添加认证头部添加客户端信息传递调用准备重试逻辑传递调用设置超时计时器实际网络调用返回响应检查超时状态传递响应检查是否需要重试传递响应传递响应记录结束时间计算耗时返回最终结果客户端代码日志拦截器认证拦截器重试拦截器超时拦截器gRPC服务器

7.4.9 🆕 重试拦截器

package com.example.grpc.interceptor;import io.grpc.*;
import java.util.concurrent.TimeUnit;/*** 重试拦截器* 在遇到可重试的失败时自动重试*/
public class RetryClientInterceptor implements ClientInterceptor {private final int maxRetries;private final long retryDelayMs;public RetryClientInterceptor(int maxRetries, long retryDelayMs) {this.maxRetries = maxRetries;this.retryDelayMs = retryDelayMs;}@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,CallOptions callOptions,Channel next) {return new ClientCall<ReqT, RespT>() {private ClientCall<ReqT, RespT> delegate;private int retryCount = 0;private Listener<RespT> originalListener;private Metadata originalHeaders;private ReqT firstMessage;@Overridepublic void start(Listener<RespT> responseListener, Metadata headers) {this.originalListener = responseListener;this.originalHeaders = headers;this.delegate = next.newCall(method, callOptions);// 包装监听器以处理重试逻辑Listener<RespT> retryListener = new Listener<RespT>() {@Overridepublic void onHeaders(Metadata headers) {originalListener.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {originalListener.onMessage(message);}@Overridepublic void onClose(Status status, Metadata trailers) {if (shouldRetry(status) && retryCount < maxRetries) {retryCount++;System.out.printf("🔄 [重试拦截器] 第 %d 次重试,状态: %s%n", retryCount, status);// 延迟后重试try {Thread.sleep(retryDelayMs);} catch (InterruptedException e) {Thread.currentThread().interrupt();originalListener.onClose(status, trailers);return;}// 重新创建调用并重试delegate = next.newCall(method, callOptions);delegate.start(this, originalHeaders);if (firstMessage != null) {delegate.sendMessage(firstMessage);}delegate.halfClose();} else {originalListener.onClose(status, trailers);}}@Overridepublic void onReady() {originalListener.onReady();}};delegate.start(retryListener, headers);}@Overridepublic void sendMessage(ReqT message) {// 保存第一条消息用于重试if (firstMessage == null) {firstMessage = message;}delegate.sendMessage(message);}@Overridepublic void halfClose() {delegate.halfClose();}@Overridepublic void cancel(String message, Throwable cause) {delegate.cancel(message, cause);}@Overridepublic void request(int numMessages) {delegate.request(numMessages);}/*** 判断是否应该重试*/private boolean shouldRetry(Status status) {return status.getCode() == Status.Code.UNAVAILABLE ||status.getCode() == Status.Code.RESOURCE_EXHAUSTED ||status.getCode() == Status.Code.INTERNAL;}};}
}

7.4.10 🚀超时控制拦截器

package com.example.grpc.interceptor;import io.grpc.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 超时控制拦截器* 为 RPC 调用添加超时控制*/
public class TimeoutClientInterceptor implements ClientInterceptor {private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);private final long defaultTimeoutMs;public TimeoutClientInterceptor(long defaultTimeoutMs) {this.defaultTimeoutMs = defaultTimeoutMs;}@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,CallOptions callOptions,Channel next) {// 获取调用选项中的超时设置,如果没有则使用默认值Long timeoutMs = callOptions.getDeadline() != null ? callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS) : defaultTimeoutMs;return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {private boolean cancelled = false;@Overridepublic void start(Listener<RespT> responseListener, Metadata headers) {// 包装监听器以处理超时Listener<RespT> timeoutListener = new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {@Overridepublic void onClose(Status status, Metadata trailers) {if (!cancelled) {super.onClose(status, trailers);}}};// 设置超时任务scheduler.schedule(() -> {if (!cancelled) {cancelled = true;String message = String.format("请求超时,方法: %s, 超时时间: %dms", method.getFullMethodName(), timeoutMs);System.out.println("⏰ [超时拦截器] " + message);cancel(message, null);}}, timeoutMs, TimeUnit.MILLISECONDS);super.start(timeoutListener, headers);}@Overridepublic void cancel(String message, Throwable cause) {cancelled = true;super.cancel(message, cause);}};}/*** 关闭调度器*/public void shutdown() {scheduler.shutdown();}
}

7.4.11 拦截器最佳实践

✅ 1. 拦截器执行顺序

// 正确的顺序:从外到内执行
.intercept(new LoggingClientInterceptor(),    // 最先执行,最后结束new AuthClientInterceptor("token"), // 认证信息添加new RetryClientInterceptor(3, 1000), // 重试逻辑new TimeoutClientInterceptor(5000),  // 超时控制new MetricsClientInterceptor()      // 指标收集(最内层)
)

✅ 2. 性能考虑

  • 避免在拦截器中执行阻塞操作
  • 使用异步处理耗时任务
  • 合理设置超时时间

✅ 3. 错误处理

  • 确保拦截器不会掩盖原始异常
  • 在适当的时候进行重试
  • 记录详细的错误信息
http://www.dtcms.com/a/453251.html

相关文章:

  • 无锡设计师网站又拍云wordpress全站cdn
  • LeetCode 刷题【106. 从中序与后序遍历序列构造二叉树】
  • 大模型的核心原理
  • 用友u8 erp和免费生产排程软件isuperaps通过sql实现数据集成示例
  • 车载诊断架构 --- 车载ECU故障类型详解(下)
  • 解读IEC 60502-2 2014
  • 引流网站建设c 转网站开发
  • 新建网站如何做关键词wordpress一句话插件
  • 通过git拉取前端项目
  • 建设诚信网站儒枫网网站建设
  • AI - 自然语言处理(NLP) - part 3 - 语言模型
  • 人工智能与数据领域700+职位数据集:支持就业市场分析、NLP训练与推荐系统开发的高质量研究资源
  • 律师手机网站模板北京市工程建设交易信息网站
  • 0基础学CV(4)|目标检测模型之yolov8训练自己的数据集
  • 网站产品 模块青岛做网站哪家公司好
  • rule 5 permit source 192.168.1.0 0.0.0.127 特定子网 概念及实验
  • 网站建设 重庆三合一网站建设推广
  • - custom_action_cpp: 自定义动作创建与调用示例
  • 如何修改iptables+wg实现双层网络转发到工业现场设备
  • 《投资-78》价值投资者的认知升级与交易规则重构 - 架构
  • 做断桥铝窗户的网站网站开发信息发布
  • 基于【讯飞星火 Spark Lite】轻量级大语言模型的【PySide6应用】开发与实践
  • MySql(SQL)
  • 做网站赚外快镇安县住房和城乡建设部网站
  • 【STM32项目开源】基于STM32的智能家居环境(空气质量)检测系统
  • Photoshop - Photoshop 工具栏(8)魔棒工具
  • 虹口专业做网站wordpress分类目录第二页
  • 保险业多模态数据融合与智能化运营架构:技术演进、应用实践与发展趋势
  • AI大模型微调教程7
  • Docker 完整教程(5,6) | 容器编译与编排