当前位置: 首页 > news >正文

grpc 和限流Sentinel

基于gRPC的微服务通信模块技术方案书

1. 总体架构设计

长连接
gRPC客户端
gRPC服务端
Sentinel限流
业务逻辑处理
返回响应
轮询调度器

2. 技术栈说明

组件版本功能
gRPC1.58.0高性能RPC框架
Protocol Buffers3.24.4接口定义与序列化
Sentinel1.8.7流量控制与熔断降级
Netty4.1.100.Final网络通信基础
Spring Boot3.1.5应用框架

3. 详细设计方案

3.1 gRPC接口定义 (helloworld.proto)

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "HelloWorldProto";service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {}
}message HelloRequest {string name = 1;
}message HelloReply {string message = 1;
}

3.2 服务端实现

3.2.1 核心组件
包含
使用
实现
GrpcServer
+start() : void
+stop() : void
HelloServiceImpl
+sayHello(HelloRequest) : HelloReply
SentinelInterceptor
+interceptCall(ServerCall, Metadata, ServerCallHandler)
ServerInterceptor
3.2.2 限流配置
参数说明
资源名grpc_service:SayHelloSentinel资源标识
阈值类型QPS每秒请求数
单机阈值2每秒最大请求数
流控效果直接拒绝超限直接返回错误

3.3 客户端实现

3.3.1 连接管理策略
Client ChannelPool gRPC服务端 获取Channel 返回可用Channel 发起RPC调用 返回响应 归还Channel Client ChannelPool gRPC服务端
参数说明
连接池大小5最大连接数
空闲超时30分钟自动关闭空闲连接
心跳间隔60秒保持连接活跃

4. 代码实现

4.1 依赖配置 (pom.xml)

<dependencies><!-- gRPC --><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.58.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.58.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.58.0</version></dependency><!-- Sentinel --><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-core</artifactId><version>1.8.7</version></dependency><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-grpc</artifactId><version>1.8.7</version></dependency><!-- 连接池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.11.1</version></dependency>
</dependencies>

4.2 服务端实现

4.2.1 gRPC服务实现 (HelloServiceImpl.java)
public class HelloServiceImpl extends GreeterGrpc.GreeterImplBase {private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {String name = request.getName();String message = "Hello, " + name + "!";HelloReply reply = HelloReply.newBuilder().setMessage(message).build();responseObserver.onNext(reply);responseObserver.onCompleted();logger.info("Processed request for: {}", name);}
}
4.2.2 Sentinel拦截器 (SentinelInterceptor.java)
public class SentinelInterceptor implements ServerInterceptor {private static final String RESOURCE_NAME = "grpc_service:SayHello";static {// 初始化限流规则:QPS=2FlowRule rule = new FlowRule();rule.setResource(RESOURCE_NAME);rule.setGrade(RuleConstant.FLOW_GRADE_QPS);rule.setCount(2);rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);FlowRuleManager.loadRules(Collections.singletonList(rule));}@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,Metadata headers,ServerCallHandler<ReqT, RespT> next) {// 资源名称根据方法名动态生成String resourceName = RESOURCE_NAME + ":" + call.getMethodDescriptor().getFullMethodName();Entry entry = null;try {entry = SphU.entry(resourceName, EntryType.IN);return next.startCall(call, headers);} catch (BlockException e) {// 限流处理call.close(Status.RESOURCE_EXHAUSTED.withDescription("Request blocked by Sentinel"), new Metadata());return new ServerCall.Listener<>() {};} finally {if (entry != null) {entry.exit();}}}
}
4.2.3 gRPC服务启动器 (GrpcServer.java)
public class GrpcServer {private Server server;public void start() throws IOException {int port = 50051;server = ServerBuilder.forPort(port).addService(new HelloServiceImpl()).intercept(new SentinelInterceptor()) // 添加Sentinel拦截器.build().start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {GrpcServer.this.stop();}));}public void stop() {if (server != null) {server.shutdown();}}public void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}
}

4.3 客户端实现

4.3.1 连接池管理 (ChannelPoolFactory.java)
public class ChannelPoolFactory {private static final GenericObjectPool<ManagedChannel> channelPool;static {PooledObjectFactory<ManagedChannel> factory = new BasePooledObjectFactory<>() {@Overridepublic ManagedChannel create() {return ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().idleTimeout(30, TimeUnit.MINUTES) // 30分钟空闲超时.keepAliveTime(60, TimeUnit.SECONDS) // 60秒心跳.build();}@Overridepublic PooledObject<ManagedChannel> wrap(ManagedChannel channel) {return new DefaultPooledObject<>(channel);}@Overridepublic void destroyObject(PooledObject<ManagedChannel> p) {p.getObject().shutdown();}};GenericObjectPoolConfig<ManagedChannel> config = new GenericObjectPoolConfig<>();config.setMaxTotal(5); // 最大连接数config.setMinIdle(1);  // 最小空闲连接config.setMaxWaitMillis(3000); // 获取连接超时时间channelPool = new GenericObjectPool<>(factory, config);}public static ManagedChannel getChannel() throws Exception {return channelPool.borrowObject();}public static void returnChannel(ManagedChannel channel) {channelPool.returnObject(channel);}
}
4.3.2 客户端轮询逻辑 (GrpcClient.java)
public class GrpcClient {private static final Random random = new Random();private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);public static void main(String[] args) {// 启动10个客户端线程模拟并发for (int i = 0; i < 10; i++) {scheduler.scheduleAtFixedRate(() -> makeRequest(), 0, 500 + random.nextInt(1500), TimeUnit.MILLISECONDS);}}private static void makeRequest() {ManagedChannel channel = null;try {channel = ChannelPoolFactory.getChannel();GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);HelloRequest request = HelloRequest.newBuilder().setName("Client-" + Thread.currentThread().getId()).build();try {HelloReply response = stub.sayHello(request);System.out.printf("[%s] Received: %s%n", Thread.currentThread().getName(), response.getMessage());} catch (StatusRuntimeException e) {if (e.getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) {System.err.printf("[%s] Request blocked by rate limiting%n", Thread.currentThread().getName());} else {e.printStackTrace();}}} catch (Exception e) {e.printStackTrace();} finally {if (channel != null) {ChannelPoolFactory.returnChannel(channel);}}}
}

5. 性能优化策略

5.1 连接管理优化

策略实现方式效果
连接预热启动时创建最小空闲连接避免首次请求延迟
动态扩容监控连接等待队列自动增加连接池大小
健康检查定期ping空闲连接及时发现失效连接

5.2 Sentinel高级配置

// 添加热点参数限流
ParamFlowRule rule = new ParamFlowRule(RESOURCE_NAME).setParamIdx(0) // 第一个参数.setCount(5).setGrade(RuleConstant.FLOW_GRADE_QPS).setDurationInSec(1).setParamFlowItemList(Collections.singletonList(new ParamFlowItem().setObject("highPriority").setClassType(String.class.getName()).setCount(10)));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

5.3 监控与告警

Metrics
Logs
gRPC服务
Prometheus
Grafana
ELK
告警

监控指标:

  1. 请求QPS与响应时间
  2. 限流拒绝次数
  3. 连接池使用率
  4. 线程池活跃度

6. 部署方案

6.1 容器化部署 (Dockerfile)

FROM openjdk:17-jdk-slimWORKDIR /appCOPY target/grpc-service.jar /app/app.jarEXPOSE 50051ENTRYPOINT ["java", "-jar", "app.jar"]

6.2 Kubernetes部署 (deployment.yaml)

apiVersion: apps/v1
kind: Deployment
metadata:name: grpc-service
spec:replicas: 3selector:matchLabels:app: grpc-servicetemplate:metadata:labels:app: grpc-servicespec:containers:- name: grpc-serviceimage: registry.example.com/grpc-service:1.0.0ports:- containerPort: 50051resources:limits:memory: "512Mi"cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:name: grpc-service
spec:selector:app: grpc-serviceports:- protocol: TCPport: 50051targetPort: 50051

7. 测试方案

7.1 性能测试脚本 (test.sh)

#!/bin/bash# 启动服务端
java -jar grpc-server.jar &# 等待服务启动
sleep 5# 启动客户端测试
for i in {1..10}
dojava -jar grpc-client.jar > client-$i.log &
done# 监控限流情况
watch -n 1 "grep 'blocked' *.log | wc -l"

7.2 测试结果验证

测试场景预期结果验证方法
正常请求(QPS<2)全部成功响应成功率100%
限流触发(QPS>2)部分拒绝错误日志包含"blocked"
长连接保持连接复用连接创建日志次数<请求次数
高并发压力服务稳定CPU/内存波动在安全范围

8. 项目优势总结

  1. 高性能通信:基于gRPC HTTP/2协议,支持多路复用和头部压缩
  2. 精准流量控制:Sentinel实现毫秒级QPS限流
  3. 资源高效利用:连接池管理减少TCP握手开销
  4. 弹性扩展:无状态设计支持水平扩展
  5. 生产就绪:集成健康检查、指标监控等生产级特性

部署说明:项目启动顺序为:1. 启动gRPC服务端 2. 启动gRPC客户端。Sentinel限流规则会在服务端启动时自动初始化。

http://www.dtcms.com/a/265717.html

相关文章:

  • STC8G 8051内核单片机开发(GPIO)
  • 2025年6月微短剧备案分析:都市题材占四成,20-29集成主流体量
  • OS15.【Linux】gdb调试器的简单使用
  • 修改文件属主
  • 活体检测api集成方案-炫彩活体检测助力身份核验
  • 马斯克脑机接口(Neuralink)技术进展,已经实现瘫痪患者通过BCI控制电脑、玩视频游戏、学习编程,未来盲人也能恢复视力了
  • [极客时间]LangChain 实战课 -----|(10) 链(下):想学“育花”还是“插花”?用RouterChain确定客户意图
  • 预警:病毒 “黑吃黑”,GitHub 开源远控项目暗藏后门
  • 2024年INS SCI2区,强化搜索自适应大邻域搜索算法RSALNS+无人机扩展型协作多任务分配,深度解析+性能实测
  • 实现如何利用 Kafka 延时删除 用户邮箱的验证码(如何发送邮箱+源码) - 第一期
  • 前缀和算法详解
  • FASTAPI+VUE3平价商贸管理系统
  • React自学 基础一
  • 基于大语言模型进行Prompt优化
  • 深入解析 AAC AudioSpecificConfig 在 RTSP/RTMP 播放器中的核心作用
  • PDF的图片文字识别工具
  • Spring AI ETL Pipeline使用指南
  • Java中的volatile到底是什么来路
  • OpenCV CUDA模块设备层-----在 GPU上高效地执行两个uint类型值的最小值比较函数vmin2()
  • 《人生顶层设计》读书笔记6
  • 开源无广告面板mdserver-web:替代宝塔实现服务器轻松管理
  • 地下管线安全的智能监测先锋:智能标志桩图像监测装置解析​
  • 矩阵批量剪辑源码搭建定制化开发:支持OEM
  • 爬虫技术-获取浏览器身份认证信息(如 Cookie、Token、Session 等)
  • Python 中如何使用 Conda 管理版本和创建 Django 项目
  • 【Docker】如何设置 `wiredTigerCacheSizeGB` 和 `resources.limits.memory`
  • BenchmarkSQL 测试 PostgreSQL 时遇到 numeric field overflow 报错的原因与解决方案
  • 请求未达服务端?iOS端HTTPS链路异常的多工具抓包排查记录
  • 区块链真的会是未来吗?
  • TCP粘包、拆包、解决