gRPC从0到1系列【24】
文章目录
- gRPC拦截器
- 7.4.6 如何注册ClientInterceptor?
- 7.4.7 注意事项
- 7.4.7 注意事项
- 7.4.8 拦截器执行流程
- 7.4.9 🆕 重试拦截器
- 7.4.10 🚀超时控制拦截器
- 7.4.11 拦截器最佳实践
gRPC拦截器
7.4.6 如何注册ClientInterceptor?
✅ 1. 对于阻塞/异步 Stub:
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();MyServiceGrpc.MyServiceBlockingStub stub = MyServiceGrpc.newBlockingStub(channel).withInterceptors(new AuthInterceptor("my-token"), new LoggingInterceptor());
✅ 2. 多个拦截器顺序很重要!
- 先注册的拦截器更靠近应用代码;
- 后注册的拦截器更靠近网络层。
7.4.7 注意事项
- 线程安全:拦截器实例通常会被多个线程共享,应设计为无状态或线程安全。
- 性能影响:每个拦截器都会增加调用开销,避免在拦截器中做耗时操作。
- 异常处理:在
start()
或sendMessage()
中抛出异常会中断调用,需谨慎处理。 - 不要修改不可变对象:如
MethodDescriptor
是不可变的,不应尝试修改。
7.4.7 注意事项
- 线程安全:拦截器实例通常会被多个线程共享,应设计为无状态或线程安全。
- 性能影响:每个拦截器都会增加调用开销,避免在拦截器中做耗时操作。
- 异常处理:在
start()
或sendMessage()
中抛出异常会中断调用,需谨慎处理。 - 不要修改不可变对象:如
MethodDescriptor
是不可变的,不应尝试修改。
ClientInterceptor
是 gRPC 客户端实现关注点分离(Separation of Concerns)的关键工具。通过它,你可以将通用逻辑(如认证、日志、监控)从业务代码中剥离,提高代码的可维护性和复用性。
合理使用拦截器,能让你的 gRPC 客户端更健壮、可观测、安全。
7.4.8 拦截器执行流程
7.4.9 🆕 重试拦截器
package com.example.grpc.interceptor;import io.grpc.*;
import java.util.concurrent.TimeUnit;/*** 重试拦截器* 在遇到可重试的失败时自动重试*/
public class RetryClientInterceptor implements ClientInterceptor {private final int maxRetries;private final long retryDelayMs;public RetryClientInterceptor(int maxRetries, long retryDelayMs) {this.maxRetries = maxRetries;this.retryDelayMs = retryDelayMs;}@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,CallOptions callOptions,Channel next) {return new ClientCall<ReqT, RespT>() {private ClientCall<ReqT, RespT> delegate;private int retryCount = 0;private Listener<RespT> originalListener;private Metadata originalHeaders;private ReqT firstMessage;@Overridepublic void start(Listener<RespT> responseListener, Metadata headers) {this.originalListener = responseListener;this.originalHeaders = headers;this.delegate = next.newCall(method, callOptions);// 包装监听器以处理重试逻辑Listener<RespT> retryListener = new Listener<RespT>() {@Overridepublic void onHeaders(Metadata headers) {originalListener.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {originalListener.onMessage(message);}@Overridepublic void onClose(Status status, Metadata trailers) {if (shouldRetry(status) && retryCount < maxRetries) {retryCount++;System.out.printf("🔄 [重试拦截器] 第 %d 次重试,状态: %s%n", retryCount, status);// 延迟后重试try {Thread.sleep(retryDelayMs);} catch (InterruptedException e) {Thread.currentThread().interrupt();originalListener.onClose(status, trailers);return;}// 重新创建调用并重试delegate = next.newCall(method, callOptions);delegate.start(this, originalHeaders);if (firstMessage != null) {delegate.sendMessage(firstMessage);}delegate.halfClose();} else {originalListener.onClose(status, trailers);}}@Overridepublic void onReady() {originalListener.onReady();}};delegate.start(retryListener, headers);}@Overridepublic void sendMessage(ReqT message) {// 保存第一条消息用于重试if (firstMessage == null) {firstMessage = message;}delegate.sendMessage(message);}@Overridepublic void halfClose() {delegate.halfClose();}@Overridepublic void cancel(String message, Throwable cause) {delegate.cancel(message, cause);}@Overridepublic void request(int numMessages) {delegate.request(numMessages);}/*** 判断是否应该重试*/private boolean shouldRetry(Status status) {return status.getCode() == Status.Code.UNAVAILABLE ||status.getCode() == Status.Code.RESOURCE_EXHAUSTED ||status.getCode() == Status.Code.INTERNAL;}};}
}
7.4.10 🚀超时控制拦截器
package com.example.grpc.interceptor;import io.grpc.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 超时控制拦截器* 为 RPC 调用添加超时控制*/
public class TimeoutClientInterceptor implements ClientInterceptor {private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);private final long defaultTimeoutMs;public TimeoutClientInterceptor(long defaultTimeoutMs) {this.defaultTimeoutMs = defaultTimeoutMs;}@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,CallOptions callOptions,Channel next) {// 获取调用选项中的超时设置,如果没有则使用默认值Long timeoutMs = callOptions.getDeadline() != null ? callOptions.getDeadline().timeRemaining(TimeUnit.MILLISECONDS) : defaultTimeoutMs;return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {private boolean cancelled = false;@Overridepublic void start(Listener<RespT> responseListener, Metadata headers) {// 包装监听器以处理超时Listener<RespT> timeoutListener = new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {@Overridepublic void onClose(Status status, Metadata trailers) {if (!cancelled) {super.onClose(status, trailers);}}};// 设置超时任务scheduler.schedule(() -> {if (!cancelled) {cancelled = true;String message = String.format("请求超时,方法: %s, 超时时间: %dms", method.getFullMethodName(), timeoutMs);System.out.println("⏰ [超时拦截器] " + message);cancel(message, null);}}, timeoutMs, TimeUnit.MILLISECONDS);super.start(timeoutListener, headers);}@Overridepublic void cancel(String message, Throwable cause) {cancelled = true;super.cancel(message, cause);}};}/*** 关闭调度器*/public void shutdown() {scheduler.shutdown();}
}
7.4.11 拦截器最佳实践
✅ 1. 拦截器执行顺序
// 正确的顺序:从外到内执行
.intercept(new LoggingClientInterceptor(), // 最先执行,最后结束new AuthClientInterceptor("token"), // 认证信息添加new RetryClientInterceptor(3, 1000), // 重试逻辑new TimeoutClientInterceptor(5000), // 超时控制new MetricsClientInterceptor() // 指标收集(最内层)
)
✅ 2. 性能考虑
- 避免在拦截器中执行阻塞操作
- 使用异步处理耗时任务
- 合理设置超时时间
✅ 3. 错误处理
- 确保拦截器不会掩盖原始异常
- 在适当的时候进行重试
- 记录详细的错误信息