使用gRPC实现分片服务的心跳机制
一、什么是 RPC(Remote Procedure Call)?
概念解析
RPC 是 Remote Procedure Call 的缩写,即远程过程调用。本质上,RPC 允许你在一台机器上调用另一台机器上的函数或方法,就像调用本地函数一样。
它包括两大核心要素:
-
传输协议:如 TCP、HTTP 等,决定了数据如何在网络上传输。
-
编码协议(序列化):如 JSON、Protobuf 等,决定了数据如何在网络上传递前被转化为二进制流。
二、为什么用 gRPC 做心跳?
gRPC 是 Google 推出的一个高性能、开源、通用的远程调用框架。它可以看作是“现代版的 RPC”。
它的三大技术特性:
-
基于 HTTP/2 协议:支持多路复用、头部压缩、双向流等高级特性。
-
使用 Protocol Buffers(Protobuf)作为默认序列化协议:更小更快。
-
支持多语言:目前支持 C++, Java, Go, Python, C#, Node.js 等主流语言。
下图展示了 gRPC 的基本调用流程:

心跳(heartbeat)是分布式系统中常见的机制,用来检测远端服务/节点是否仍然存活、测量延迟、保持连接活性并触发故障恢复。常见实现方式包括 HTTP poll、TCP 心跳、消息队列心跳等。选择 gRPC 的好处:
-
高性能:gRPC 基于 HTTP/2,支持长连接、多路复用,序列化通常使用 Protobuf(二进制),支持二进制通信,相比 HTTP+JSON 更轻量
-
双向流:gRPC 原生支持双向流(bidirectional streaming),非常适合连续心跳的场景(客户端持续发送,服务端可回应或单向监听)。
-
跨语言/IDL:使用 Protobuf 定义接口,支持多种语言的客户端/服务器代码。
-
丰富特性:拦截器、Deadline、Keepalive、TLS、元数据(metadata)等,方便做安全与可观测性。
-
生态成熟:与监控/追踪/负载均衡等集成良好(Prometheus、OpenTelemetry、Envoy 等)
gRPC 的详细通信过程
为了更好地理解 gRPC 是如何工作的,请参考下图的详细调用过程:

图中展示了从客户端发起请求,到服务器处理响应,再返回结果的全过程。可以看到:
- 客户端通过 gRPC Stub 发起调用;
- 消息经过 Protobuf 序列化;
- 基于 HTTP/2 发送到服务端;
- 服务端反序列化后处理并响应。
经验提示:如果你的系统中存在大量内网调用,且通信频繁,gRPC 能极大提升吞吐量和响应速度。
三、实现目标
构建一个**ResourceManager(RM) + 多个 Worker(节点)**的分布式系统,支持:
-
节点注册与双向心跳(gRPC 双向流)
-
RM 维护活跃节点列表并下发分片信息(总分片数、节点编号)
-
Worker 根据分片规则(
taskId % totalShard == shardId)处理自己负责的任务(数据分片执行) -
节点上下线触发稳定性检测与安全重平衡(避免分片抖动导致重复处理)
-
数据获取接口:Worker 向 RM 请求某个分片的数据块进行处理(gRPC unary 或 server streaming)
本文给出设计 + 关键代码片段(proto、Server、Client、分片逻辑、重平衡策略、Spring Boot 配置)。
四、设计要点(简要)
-
通信协议:gRPC,使用 Protobuf 描述服务。采用双向流(bidirectional streaming)用于心跳(低延迟、长连接、双向 ACK),并用 unary / server streaming 做数据获取(按业务需求)。
-
分片策略:静态模运算
shard = id % totalShards(简单、无状态);可扩展为一致性哈希或范围分片。 -
分片稳定性:节点上下线会变更
totalShards或节点编号,需暂停 affected jobs 并等待分片稳定(例如:等 N 秒或连续 K 次无变更)。 -
容错:RM 定期扫描
lastSeen,若节点超过超时时间则标记下线并触发重平衡。 -
扩展性:当节点很多或业务复杂时,把分片信息持久化到轻量 KV(Redis/etcd),并对 RM 做分片(leader)选举或 HA。
五、proto(heartbeat + control + data fetch)
文件 shard.proto(关键字段、双向流心跳 + 分片获取 + 数据流):
syntax = "proto3";
package shard;option java_package = "com.example.shard";
option java_outer_classname = "ShardProto";// 心跳消息
message Heartbeat {string nodeId = 1;int64 ts = 2;int32 load = 3;
}// 心跳 ACK (RM->Node)
message HeartbeatAck {string nodeId = 1;int64 ts = 2;bool accepted = 3;string message = 4;// 可携带最新分片信息(可选)int32 totalShards = 10;int32 shardId = 11;
}// 请求分片信息(unary)
message GetShardRequest {string nodeId = 1;
}
message GetShardResponse {int32 totalShards = 1;int32 shardId = 2;int64 version = 3; // 分片版本号用于稳定性判断
}// 请求某分片的数据(stream)
message FetchShardDataRequest {int32 shardId = 1;int32 batchSize = 2;
}
message DataRecord {string id = 1;bytes payload = 2;
}service ShardService {// 双向心跳流:双方可以在同一流中互相发送心跳/ackrpc HeartbeatStream(stream Heartbeat) returns (stream HeartbeatAck);// 获取分片信息rpc GetShardInfo(GetShardRequest) returns (GetShardResponse);// 拉取分片数据(server streaming)rpc FetchShardData(FetchShardDataRequest) returns (stream DataRecord);
}
说明:HeartbeatStream 用于持续心跳(Node->RM)并让 RM 在同一流上下发 ACK 或分片变更通知(极适合实时下发)。GetShardInfo 用于节点启动或重平衡期间明确拉取当前分片信息。FetchShardData 用于 Worker 在处理其分片时,按需从 RM 或数据服务拉数据(也可以直接访问数据库/对象存储)。
六、Spring Boot + gRPC 集成建议
-
推荐使用
yidongnan/grpc-spring-boot-starter或lognet/grpc-spring-boot-starter以简化 server/client 注入与配置。GitHub -
Spring Boot 项目结构:
-
shard-proto模块:放.proto+ Maven protobuf 插件生成代码。 -
rm-service模块:实现ShardService的服务端(@GrpcService)。 -
worker-service模块:实现 gRPC 客户端(@GrpcClient)并定时发送心跳、拉取数据。
-
RM(ResourceManager)核心实现要点(伪代码/关键片段)
RM 内存结构
class ResourceManager {ConcurrentMap<String, Long> lastSeen = new ConcurrentHashMap<>();List<String> nodes = new CopyOnWriteArrayList<>();volatile int totalShards = 0;volatile long shardVersion = 0;// node -> shardId mapping can be computed by sorting nodes list
}
心跳流实现(使用 grpc-spring-boot-starter)
@GrpcService
public class ShardServiceImpl extends ShardServiceGrpc.ShardServiceImplBase {@Overridepublic StreamObserver<Heartbeat> heartbeatStream(StreamObserver<HeartbeatAck> responseObserver) {return new StreamObserver<Heartbeat>() {@Overridepublic void onNext(Heartbeat hb) {lastSeen.put(hb.getNodeId(), hb.getTs());// ensure node in listif (!nodes.contains(hb.getNodeId())) {nodes.add(hb.getNodeId());recomputeShards(); // 更新 totalShards & shardVersion}int shardId = computeShardId(hb.getNodeId());HeartbeatAck ack = HeartbeatAck.newBuilder().setNodeId(hb.getNodeId()).setTs(System.currentTimeMillis()).setAccepted(true).setTotalShards(totalShards).setShardId(shardId).build();responseObserver.onNext(ack);}@Override public void onError(Throwable t) { /* cleanup */ }@Override public void onCompleted() { responseObserver.onCompleted(); }};}
}
分片计算(简单 deterministic 排序)
private void recomputeShards() {Collections.sort(nodes);totalShards = nodes.size();shardVersion++;
}
// 节点的 shardId = index of nodeId in sorted nodes
private int computeShardId(String nodeId) {return nodes.indexOf(nodeId);
}
说明:这里采用简单策略 —— 将 nodes 按 ID 排序并用索引作为 shardId,配合 totalShards = nodes.size(),Worker 使用 taskId % totalShards == shardId 进行筛选。优点:实现简单、可重复计算;缺点:节点加入/离开会导致大量分片重映射。若需更平滑的重平衡,请考虑 consistent hashing 或虚拟节点技术。
Worker(节点)实现要点(伪代码/关键片段)
Worker 在启动时:
-
建立与 RM 的
HeartbeatStream(双向流)。 -
每隔
heartbeatInterval发送Heartbeat。 -
在收到
HeartbeatAck时,保存totalShards、shardId(并记录shardVersion)。 -
周期性或触发式地执行分片作业:加载任务列表并根据
shardId筛选要处理的数据(taskId % totalShards == shardId)。 -
当检测到
shardVersion变更时暂停正在运行的分片作业,等待稳定期(例如连续 M 次无变更 或 等待 T 秒)再重新计算并恢复。
示例(关键点):
@GrpcClient("rm")
private ShardServiceGrpc.ShardServiceStub stub;public void startHeartbeat() {StreamObserver<HeartbeatAck> ackObserver = new StreamObserver<HeartbeatAck>() {@Override public void onNext(HeartbeatAck ack) {// 更新本地分片信息与版本this.totalShards = ack.getTotalShards();this.shardId = ack.getShardId();this.shardVersion = ack.getShardVersion();}...};StreamObserver<Heartbeat> hbObserver = stub.heartbeatStream(ackObserver);// schedule sending heartbeats periodicallyscheduler.scheduleAtFixedRate(() -> {Heartbeat hb = Heartbeat.newBuilder().setNodeId(nodeId).setTs(now).build();hbObserver.onNext(hb);}, 0, heartbeatInterval, TimeUnit.SECONDS);
}
当 Worker 开始执行任务时的伪逻辑:
List<Task> allTasks = loadAllTasks(); // 从 DB 或文件系统
for (Task t : allTasks) {if (t.getId() % totalShards == shardId) {process(t);}
}
注意:避免在分片变更瞬间同时开始与停止任务导致重复或遗漏。通常做法是:若 shardVersion 发生变化,先停止当前分片任务,等待 stabilityWindow(例如 10s)并在确认 shardVersion 连续稳定 K 次后再开始新一轮处理。
数据获取策略(FetchShardData)
FetchShardData 可实现成 server-streaming:Worker 请求一个 shardId 的数据,RM(或 RM 代表的数据服务)按 batch 向 Worker 推送 DataRecord,Worker 处理一批后 request 下一个 batch,典型伪码如下:
FetchShardDataRequest req = FetchShardDataRequest.newBuilder().setShardId(shardId).setBatchSize(100).build();
Iterator<DataRecord> it = blockingStub.fetchShardData(req);
while (it.hasNext()) {DataRecord rec = it.next();processRecord(rec);
}
如果数据源是外部 DB/HDFS,建议 Worker 直接从数据源读取,RM 只下发元信息(例如 shardId、起止范围、访问凭证),避免 RM 成为数据传输瓶颈。
分片稳定性与重平衡策略(关键点)
分片重分配通常会发生在节点上下线或 RM 变更时。为降低抖动带来的问题:
-
分片版本号:每次 recompute 时增加
shardVersion(long 型),Worker 接收到新的版本后进入“pending”状态。 -
稳定窗口:只有当
shardVersion在stabilityWindow(例如 5s)内未变化多次时,才允许 Worker 开始/继续任务。 -
渐进切换:
-
在允许的情况下,采用“慢迁移”:先让旧持有分片的节点完成当前正在处理的记录,然后在空闲时停止,不再拉新数据;新节点从头开始处理未完成的项(需要 idempotent 处理或幂等保证)。
-
-
幂等 & Exactly-once:若业务要求不重复处理,需在数据层做幂等控制(例如在 DB 加锁/记录处理状态,或使用事务/唯一标识)。
