当前位置: 首页 > news >正文

使用gRPC实现分片服务的心跳机制

一、什么是 RPC(Remote Procedure Call)?

概念解析

RPC 是 Remote Procedure Call 的缩写,即远程过程调用。本质上,RPC 允许你在一台机器上调用另一台机器上的函数或方法,就像调用本地函数一样。

它包括两大核心要素:

  1. 传输协议:如 TCP、HTTP 等,决定了数据如何在网络上传输。

  2. 编码协议(序列化):如 JSON、Protobuf 等,决定了数据如何在网络上传递前被转化为二进制流。

二、为什么用 gRPC 做心跳?

gRPC 是 Google 推出的一个高性能、开源、通用的远程调用框架。它可以看作是“现代版的 RPC”。

它的三大技术特性:

  1. 基于 HTTP/2 协议:支持多路复用、头部压缩、双向流等高级特性。

  2. 使用 Protocol Buffers(Protobuf)作为默认序列化协议:更小更快。

  3. 支持多语言:目前支持 C++, Java, Go, Python, C#, Node.js 等主流语言。

下图展示了 gRPC 的基本调用流程:

gRPC基本调用流程解析

心跳(heartbeat)是分布式系统中常见的机制,用来检测远端服务/节点是否仍然存活、测量延迟、保持连接活性并触发故障恢复。常见实现方式包括 HTTP poll、TCP 心跳、消息队列心跳等。选择 gRPC 的好处:

  • 高性能:gRPC 基于 HTTP/2,支持长连接、多路复用,序列化通常使用 Protobuf(二进制),支持二进制通信,相比 HTTP+JSON 更轻量

  • 双向流:gRPC 原生支持双向流(bidirectional streaming),非常适合连续心跳的场景(客户端持续发送,服务端可回应或单向监听)。

  • 跨语言/IDL:使用 Protobuf 定义接口,支持多种语言的客户端/服务器代码。

  • 丰富特性:拦截器、Deadline、Keepalive、TLS、元数据(metadata)等,方便做安全与可观测性。

  • 生态成熟:与监控/追踪/负载均衡等集成良好(Prometheus、OpenTelemetry、Envoy 等)

gRPC 的详细通信过程

为了更好地理解 gRPC 是如何工作的,请参考下图的详细调用过程:

详细过程

图中展示了从客户端发起请求,到服务器处理响应,再返回结果的全过程。可以看到:

  • 客户端通过 gRPC Stub 发起调用;
  • 消息经过 Protobuf 序列化;
  • 基于 HTTP/2 发送到服务端;
  • 服务端反序列化后处理并响应。

经验提示:如果你的系统中存在大量内网调用,且通信频繁,gRPC 能极大提升吞吐量和响应速度。

三、实现目标

构建一个**ResourceManager(RM) + 多个 Worker(节点)**的分布式系统,支持:

  • 节点注册与双向心跳(gRPC 双向流)

  • RM 维护活跃节点列表并下发分片信息(总分片数、节点编号)

  • Worker 根据分片规则(taskId % totalShard == shardId)处理自己负责的任务(数据分片执行)

  • 节点上下线触发稳定性检测与安全重平衡(避免分片抖动导致重复处理)

  • 数据获取接口:Worker 向 RM 请求某个分片的数据块进行处理(gRPC unary 或 server streaming)

本文给出设计 + 关键代码片段(proto、Server、Client、分片逻辑、重平衡策略、Spring Boot 配置)。


四、设计要点(简要)

  1. 通信协议:gRPC,使用 Protobuf 描述服务。采用双向流(bidirectional streaming)用于心跳(低延迟、长连接、双向 ACK),并用 unary / server streaming 做数据获取(按业务需求)。

  2. 分片策略:静态模运算 shard = id % totalShards(简单、无状态);可扩展为一致性哈希或范围分片。

  3. 分片稳定性:节点上下线会变更 totalShards 或节点编号,需暂停 affected jobs 并等待分片稳定(例如:等 N 秒或连续 K 次无变更)。

  4. 容错:RM 定期扫描 lastSeen,若节点超过超时时间则标记下线并触发重平衡。

  5. 扩展性:当节点很多或业务复杂时,把分片信息持久化到轻量 KV(Redis/etcd),并对 RM 做分片(leader)选举或 HA。


五、proto(heartbeat + control + data fetch)

文件 shard.proto(关键字段、双向流心跳 + 分片获取 + 数据流):

syntax = "proto3";
package shard;option java_package = "com.example.shard";
option java_outer_classname = "ShardProto";// 心跳消息
message Heartbeat {string nodeId = 1;int64 ts = 2;int32 load = 3;
}// 心跳 ACK (RM->Node)
message HeartbeatAck {string nodeId = 1;int64 ts = 2;bool accepted = 3;string message = 4;// 可携带最新分片信息(可选)int32 totalShards = 10;int32 shardId = 11;
}// 请求分片信息(unary)
message GetShardRequest {string nodeId = 1;
}
message GetShardResponse {int32 totalShards = 1;int32 shardId = 2;int64 version = 3; // 分片版本号用于稳定性判断
}// 请求某分片的数据(stream)
message FetchShardDataRequest {int32 shardId = 1;int32 batchSize = 2;
}
message DataRecord {string id = 1;bytes payload = 2;
}service ShardService {// 双向心跳流:双方可以在同一流中互相发送心跳/ackrpc HeartbeatStream(stream Heartbeat) returns (stream HeartbeatAck);// 获取分片信息rpc GetShardInfo(GetShardRequest) returns (GetShardResponse);// 拉取分片数据(server streaming)rpc FetchShardData(FetchShardDataRequest) returns (stream DataRecord);
}

说明:HeartbeatStream 用于持续心跳(Node->RM)并让 RM 在同一流上下发 ACK 或分片变更通知(极适合实时下发)。GetShardInfo 用于节点启动或重平衡期间明确拉取当前分片信息。FetchShardData 用于 Worker 在处理其分片时,按需从 RM 或数据服务拉数据(也可以直接访问数据库/对象存储)。


六、Spring Boot + gRPC 集成建议

  • 推荐使用 yidongnan/grpc-spring-boot-starterlognet/grpc-spring-boot-starter 以简化 server/client 注入与配置。GitHub

  • Spring Boot 项目结构:

    • shard-proto 模块:放 .proto + Maven protobuf 插件生成代码。

    • rm-service 模块:实现 ShardService 的服务端(@GrpcService)。

    • worker-service 模块:实现 gRPC 客户端(@GrpcClient)并定时发送心跳、拉取数据。


RM(ResourceManager)核心实现要点(伪代码/关键片段)

RM 内存结构

class ResourceManager {ConcurrentMap<String, Long> lastSeen = new ConcurrentHashMap<>();List<String> nodes = new CopyOnWriteArrayList<>();volatile int totalShards = 0;volatile long shardVersion = 0;// node -> shardId mapping can be computed by sorting nodes list
}

心跳流实现(使用 grpc-spring-boot-starter)

@GrpcService
public class ShardServiceImpl extends ShardServiceGrpc.ShardServiceImplBase {@Overridepublic StreamObserver<Heartbeat> heartbeatStream(StreamObserver<HeartbeatAck> responseObserver) {return new StreamObserver<Heartbeat>() {@Overridepublic void onNext(Heartbeat hb) {lastSeen.put(hb.getNodeId(), hb.getTs());// ensure node in listif (!nodes.contains(hb.getNodeId())) {nodes.add(hb.getNodeId());recomputeShards(); // 更新 totalShards & shardVersion}int shardId = computeShardId(hb.getNodeId());HeartbeatAck ack = HeartbeatAck.newBuilder().setNodeId(hb.getNodeId()).setTs(System.currentTimeMillis()).setAccepted(true).setTotalShards(totalShards).setShardId(shardId).build();responseObserver.onNext(ack);}@Override public void onError(Throwable t) { /* cleanup */ }@Override public void onCompleted() { responseObserver.onCompleted(); }};}
}

分片计算(简单 deterministic 排序)

private void recomputeShards() {Collections.sort(nodes);totalShards = nodes.size();shardVersion++;
}
// 节点的 shardId = index of nodeId in sorted nodes
private int computeShardId(String nodeId) {return nodes.indexOf(nodeId);
}

说明:这里采用简单策略 —— 将 nodes 按 ID 排序并用索引作为 shardId,配合 totalShards = nodes.size(),Worker 使用 taskId % totalShards == shardId 进行筛选。优点:实现简单、可重复计算;缺点:节点加入/离开会导致大量分片重映射。若需更平滑的重平衡,请考虑 consistent hashing 或虚拟节点技术。


Worker(节点)实现要点(伪代码/关键片段)

Worker 在启动时:

  1. 建立与 RM 的 HeartbeatStream(双向流)。

  2. 每隔 heartbeatInterval 发送 Heartbeat

  3. 在收到 HeartbeatAck 时,保存 totalShardsshardId(并记录 shardVersion)。

  4. 周期性或触发式地执行分片作业:加载任务列表并根据 shardId 筛选要处理的数据(taskId % totalShards == shardId)。

  5. 当检测到 shardVersion 变更时暂停正在运行的分片作业,等待稳定期(例如连续 M 次无变更 或 等待 T 秒)再重新计算并恢复。

示例(关键点):

@GrpcClient("rm")
private ShardServiceGrpc.ShardServiceStub stub;public void startHeartbeat() {StreamObserver<HeartbeatAck> ackObserver = new StreamObserver<HeartbeatAck>() {@Override public void onNext(HeartbeatAck ack) {// 更新本地分片信息与版本this.totalShards = ack.getTotalShards();this.shardId = ack.getShardId();this.shardVersion = ack.getShardVersion();}...};StreamObserver<Heartbeat> hbObserver = stub.heartbeatStream(ackObserver);// schedule sending heartbeats periodicallyscheduler.scheduleAtFixedRate(() -> {Heartbeat hb = Heartbeat.newBuilder().setNodeId(nodeId).setTs(now).build();hbObserver.onNext(hb);}, 0, heartbeatInterval, TimeUnit.SECONDS);
}

当 Worker 开始执行任务时的伪逻辑:

List<Task> allTasks = loadAllTasks(); // 从 DB 或文件系统
for (Task t : allTasks) {if (t.getId() % totalShards == shardId) {process(t);}
}

注意:避免在分片变更瞬间同时开始与停止任务导致重复或遗漏。通常做法是:若 shardVersion 发生变化,先停止当前分片任务,等待 stabilityWindow(例如 10s)并在确认 shardVersion 连续稳定 K 次后再开始新一轮处理。


数据获取策略(FetchShardData)

FetchShardData 可实现成 server-streaming:Worker 请求一个 shardId 的数据,RM(或 RM 代表的数据服务)按 batch 向 Worker 推送 DataRecord,Worker 处理一批后 request 下一个 batch,典型伪码如下:

FetchShardDataRequest req = FetchShardDataRequest.newBuilder().setShardId(shardId).setBatchSize(100).build();
Iterator<DataRecord> it = blockingStub.fetchShardData(req);
while (it.hasNext()) {DataRecord rec = it.next();processRecord(rec);
}

如果数据源是外部 DB/HDFS,建议 Worker 直接从数据源读取,RM 只下发元信息(例如 shardId、起止范围、访问凭证),避免 RM 成为数据传输瓶颈。


分片稳定性与重平衡策略(关键点)

分片重分配通常会发生在节点上下线或 RM 变更时。为降低抖动带来的问题:

  1. 分片版本号:每次 recompute 时增加 shardVersion(long 型),Worker 接收到新的版本后进入“pending”状态。

  2. 稳定窗口:只有当 shardVersionstabilityWindow(例如 5s)内未变化多次时,才允许 Worker 开始/继续任务。

  3. 渐进切换

    • 在允许的情况下,采用“慢迁移”:先让旧持有分片的节点完成当前正在处理的记录,然后在空闲时停止,不再拉新数据;新节点从头开始处理未完成的项(需要 idempotent 处理或幂等保证)。

  4. 幂等 & Exactly-once:若业务要求不重复处理,需在数据层做幂等控制(例如在 DB 加锁/记录处理状态,或使用事务/唯一标识)。

http://www.dtcms.com/a/597253.html

相关文章:

  • 陕西建设厅执业注册中心网站重庆网站建设沛宣
  • 济南 外贸网站建设大连鼎信网站建设公司地址
  • 自动化渗透工具分类及主流工具详解(2025年最新)
  • Qt QPushButton 样式完全指南:从基础到高级实现
  • 在Unity3d中使用Netly开启TCP服务
  • 男男床做视频网站上海家装设计网站
  • 如何清空网站空间上海工程建设招投标网站
  • Docker-玩转 Docker 镜像:从拉取、构建到发布
  • 技师院校人工智能技术应用专业实训室建设方案
  • HarmonyOS Tabs标签页组件深度解析:超越基础的高级技巧与实践
  • 无锡网站建设推荐wordpress 的分类目录
  • elasticSearch之java客户端详细使用:文档搜索API
  • 网页美工设计网站运维工程师可以自学吗
  • 手机网站制作注意事项卖产品的网站怎么做
  • Vue3 + Pinia 移动端Web应用:页面缓存策略解决方案
  • 可视化智能动作测评系统:用数据重塑每一个动作的科学评估时代
  • 算法32.0
  • 基于SpringBoot的锦州红色旅游资源信息管理系统的设计与实现
  • 静态网站建设教程wordpress采集接口
  • 网上购物有哪些网站?如何加入广告联盟赚钱
  • 解决 elementui el-cascader组件懒加载时存在选中状态丢失的问题?
  • vue3封装alert 提示组件 仿element-plus
  • Day33-动态规划
  • 域名访问过程会不会影响网站访问商务网站设计
  • 模仿elementUI 中Carousel 走马灯卡片模式 type=“card“ 的自定义轮播组件 图片之间有宽度
  • 公司网站建设哪家正规wordpress 按别名
  • 网站建设安全架构网络购物平台哪个最好
  • 2048——逻辑思维与矩阵合并算法
  • Qt:判断一个sql语句是否是select语句
  • 【题解】洛谷 P2470 [SCOI2007] 压缩