当前位置: 首页 > news >正文

Nacos源码—4.Nacos集群高可用分析四

大纲

6.CAP原则与Raft协议

7.Nacos实现的Raft协议是如何写入数据的

8.Nacos实现的Raft协议是如何选举Leader节点的

9.Nacos实现的Raft协议是如何同步数据的

10.Nacos如何实现Raft协议的简版总结

8.Nacos实现的Raft协议是如何选举Leader节点的

(1)初始化RaftCore实例时会开启两个异步任务

(2)选举Leader节点的MasterElection异步任务

(1)初始化RaftCore实例时会开启两个异步任务

在RaftCore的init()方法中,会开启两个异步任务。第一个异步任务的作用是选举Leader节点,第二个异步任务的作用是发送心跳同步数据。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {...//Init raft core.@PostConstructpublic void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();//从本地文件中加载数据raftStore.loadDatums(notifier, datums);setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());initialized = true;Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));//开启一个异步任务选举Leader节点masterTask = GlobalExecutor.registerMasterElection(new MasterElection());//开启一个异步任务通过心跳同步数据heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());versionJudgement.registerObserver(isAllNewVersion -> {stopWork = isAllNewVersion;if (stopWork) {try {shutdown();raftListener.removeOldRaftMetadata();} catch (NacosException e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}}, 100);//给NotifyCenter注册一个监听PersistentNotifierNotifyCenter.registerSubscriber(notifier);Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);}...
}public class GlobalExecutor {//线程池的线程数是可用线程的一半private static final ScheduledExecutorService NAMING_TIMER_EXECUTOR =ExecutorFactory.Managed.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),Runtime.getRuntime().availableProcessors() * 2,new NameThreadFactory("com.alibaba.nacos.naming.timer"));...public static ScheduledFuture registerMasterElection(Runnable runnable) {//以固定的频率来执行某项任务,它不受任务执行时间的影响,到时间就会执行任务return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);}public static ScheduledFuture registerHeartbeat(Runnable runnable) {//以相对固定的频率来执行某项任务,即只有等这一次任务执行完了(不管执行了多长时间),才能执行下一次任务return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);}...
}

(2)选举Leader节点的MasterElection异步任务

MasterElection的run()方法就体现了Raft协议进行Leader选举的第一步。即每个节点会进行休眠,如果时间没到则返回然后重新执行异步任务。等休眠时间到了才会调用MasterElection的sendVote()方法发起投票。

一旦执行MasterElection的sendVote()方法发起投票:会先把选举周期+1,然后投票给自己,接着修改节点状态为Candidate。做完这些准备工作后,才会以HTTP形式向其他节点发送投票请求。

其他节点返回投票信息时,会调用RaftPeerSet的decideLeader()方法处理。这个方法会处理其他节点返回的投票信息,具体逻辑如下:

首先用一个Map记录每个节点返回的投票信息,然后遍历这个Map去统计投票数量,最后比较当前节点的累计票数,是否已超过集群节点半数。如果超过,则把当前节点的状态修改为Leader。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {private RaftPeerSet peers;...public class MasterElection implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}//随机休眠RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;//休眠时间没到就直接返回if (local.leaderDueMs > 0) {return;}//reset timeoutlocal.resetLeaderDue();local.resetHeartbeatDue();//发起投票sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term);peers.reset();//选举周期+1local.term.incrementAndGet();//投票给自己local.voteFor = local.ip;//自己成为候选者,设置当前节点状态为Candidate状态local.state = RaftPeer.State.CANDIDATE;Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));//遍历其他集群节点for (final String server : peers.allServersWithoutMySelf()) {final String url = buildUrl(server, API_VOTE);try {//发送HTTP的投票请求HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));//处理返回的投票结果peers.decideLeader(peer);}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}}}...
}@Deprecated
@Component
@DependsOn("ProtocolManager")
public class RaftPeerSet extends MemberChangeListener implements Closeable {private volatile Map<String, RaftPeer> peers = new HashMap<>(8);...//Calculate and decide which peer is leader. If has new peer has more than half vote, change leader to new peer.public RaftPeer decideLeader(RaftPeer candidate) {//记录本次投票结果peers.put(candidate.ip, candidate);SortedBag ips = new TreeBag();int maxApproveCount = 0;String maxApprovePeer = null;//统计累计票数for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}ips.add(peer.voteFor);if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);maxApprovePeer = peer.voteFor;}}//判断投票数量是否超过半数,如果已超半数则把自己节点的状态改为Leaderif (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);//设置当前节点的状态为Leader状态peer.state = RaftPeer.State.LEADER;if (!Objects.equals(leader, peer)) {leader = peer;ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));Loggers.RAFT.info("{} has become the LEADER", leader.ip);}}return leader;}...
}

9.Nacos实现的Raft协议是如何同步数据的

(1)Leader节点如何发送心跳来同步数据

(2)Follower节点如何处理心跳来同步数据

(1)Leader节点如何发送心跳来同步数据

RaftCore的init()方法会开启另外一个异步任务HeartBeat。HeartBeat的run()方法会调用HeartBeat的sendBeat()方法来发送心跳请求。

其中只有Leader节点才会发送心跳请求。Leader在调用HeartBeat的sendBeat()方法发送心跳同步数据请求时,会将Instance的key作为心跳的参数发送给其他Follower节点。Follower节点接收到Leader的心跳请求后,会比较请求中的数据与自身数据的差异,如果存在差异则向Leader同步。

HeartBeat的sendBeat()方法主要包括三部分:

第一部分:判断当前节点是不是Leader,如果不是Leader则不能发送心跳。

第二部分:组装发送心跳包的参数。只会把datum.key放入进去,并不会把整个Instance信息传输过去。Follower节点拿到心跳包中的key之后,发现部分key在自身节点是不存在的,那么这时Follower节点就会根据这些key向Leader节点获取Instance的详细信息进行同步。

第三部分:向其他Follower节点发送心跳数据,是通过HTTP的方式来发起心跳请求的,请求地址为:/v1/ns/raft/beat。

@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {...public class HeartBeat implements Runnable {@Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}local.resetHeartbeatDue();//发送心跳同步数据sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}private void sendBeat() throws IOException, InterruptedException {RaftPeer local = peers.local();//第一部分:判断当前节点是不是Leader,如果不是Leader则不能发送心跳if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {return;}if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());}local.resetLeaderDue();//build dataObjectNode packet = JacksonUtils.createEmptyJsonNode();packet.replace("peer", JacksonUtils.transferToJsonNode(local));ArrayNode array = JacksonUtils.createEmptyArrayNode();if (switchDomain.isSendBeatOnly()) {Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());}if (!switchDomain.isSendBeatOnly()) {//第二部分:组装发送心跳包的参数//组装数据,只会把datum.key放入进去,并不会把整个Instance信息传输过去for (Datum datum : datums.values()) {ObjectNode element = JacksonUtils.createEmptyJsonNode();if (KeyBuilder.matchServiceMetaKey(datum.key)) {//只放入key的信息element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));} else if (KeyBuilder.matchInstanceListKey(datum.key)) {element.put("key", KeyBuilder.briefInstanceListkey(datum.key));}element.put("timestamp", datum.timestamp.get());array.add(element);}}packet.replace("datums", array);//broadcastMap<String, String> params = new HashMap<String, String>(1);params.put("beat", JacksonUtils.toJson(packet));String content = JacksonUtils.toJson(params);        ByteArrayOutputStream out = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(out);gzip.write(content.getBytes(StandardCharsets.UTF_8));gzip.close();byte[] compressedBytes = out.toByteArray();String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length());}//第三部分:向其他Follower节点发送心跳数据,通过HTTP的方式来发起心跳,请求地址为:/v1/ns/raft/beat//发送心跳 + 同步数据for (final String server : peers.allServersWithoutMySelf()) {try {final String url = buildUrl(server, API_BEAT);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("send beat to server " + server);}//通过HTTP发送心跳HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);MetricsMonitor.getLeaderSendBeatFailedException().increment();return;}peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("receive beat response from: {}", url);}}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, throwable);MetricsMonitor.getLeaderSendBeatFailedException().increment();}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);MetricsMonitor.getLeaderSendBeatFailedException().increment();}}}}...
}

(2)Follower节点如何处理心跳来同步数据

Follower节点收到Leader节点发送过来的HTTP请求"/v1/ns/raft/beat"时,会执行RaftController类中的beat()方法,接着会执行RaftCore的receivedBeat()方法来进行具体的心跳处理。

RaftCore.receivedBeat()方法的具体逻辑如下:

一.首先会进行一些判断

第一个判断:Follower节点接收到的心跳请求如果不是Leader节点发出的会直接抛出异常。

第二个判断:Follower节点的term只会小于等于Leader节点的term,如果大于,则直接抛出异常。

第三个判断:如果自身节点的状态不是Follower,需要把状态改为Follower。因为有可能自身节点之前是Leader,但因为网络原因出现了脑裂问题。等网络恢复后,自身节点收到新Leader发来的心跳,新Leader的term比自身节点要大,那么它就需要切换成Follower节点。

二.然后对自身节点的datums中的key和心跳请求中的key进行比对

如果发现自身节点数据缺少了,那么就会记录到batch中,然后把batch中的key进行拆分包装成请求参数,最后通过HTTP方式向Leader节点查询这些key对应的Instance详细信息。

Follower节点拿到Leader节点返回的Instance服务实例信息后,会继续调用RaftStore.write()、PersistentNotifier.notify()这两个方法,一个将数据持久化到本地文件、一个将数据同步到内存注册表,从而最终完成以Leader节点为准的心跳请求同步数据的流程。

@Deprecated
@RestController
@RequestMapping({UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft", UtilsAndCommons.NACOS_SERVER_CONTEXT + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft"})
public class RaftController {private final RaftCore raftCore;...@PostMapping("/beat")public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {if (versionJudgement.allMemberIsNewVersion()) {throw new IllegalStateException("old raft protocol already stop");}String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);String value = URLDecoder.decode(entity, "UTF-8");value = URLDecoder.decode(value, "UTF-8");JsonNode json = JacksonUtils.toObj(value);//处理心跳RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));return JacksonUtils.transferToJsonNode(peer);}...
}@Deprecated
@DependsOn("ProtocolManager")
@Component
public class RaftCore implements Closeable {public final PersistentNotifier notifier;...//Received beat from leader. // TODO split method to multiple smaller method.public RaftPeer receivedBeat(JsonNode beat) throws Exception {...//第一个判断:如果发送心跳不是Leader,则直接抛出异常if (remote.state != RaftPeer.State.LEADER) {Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JacksonUtils.toJson(remote));throw new IllegalArgumentException("invalid state from master, state: " + remote.state);}//第二个判断:如果本身节点的term还大于Leader的term,也直接抛出异常if (local.term.get() > remote.term.get()) {Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}", remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());}//第三个判断:自己的节点状态是不是FOLLOWER,如果不是则需要更改为FOLLOWERif (local.state != RaftPeer.State.FOLLOWER) {Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));//mk followerlocal.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;}...//遍历Leader传输过来的Instance key,和本地的Instance进行对比for (Object object : beatDatums) {...try {if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {continue;}if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {//记录需要同步的datumKeybatch.add(datumKey);}//到达一定数量才进行批量数据同步if (batch.size() < 50 && processedCount < beatDatums.size()) {continue;}...//使用batch组装参数String url = buildUrl(remote.ip, API_GET);Map<String, String> queryParam = new HashMap<>(1);queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));//发送HTTP请求给Leader,根据keys参数获取Instance详细信息HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {return;}//序列化result结果List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() { });//遍历Leader返回的Instance详细信息for (JsonNode datumJson : datumList) {Datum newDatum = null;OPERATE_LOCK.lock();try {...//Raft写本地数据raftStore.write(newDatum);//同步内存数据datums.put(newDatum.key, newDatum);//和服务实例注册时的逻辑一样,最终会调用listener.onChange()方法notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);} catch (Throwable e) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);} finally {OPERATE_LOCK.unlock();}}...return;}...});batch.clear();} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);}}...}    ...
}

10.Nacos如何实现Raft协议的简版总结

Nacos实现的Raft协议主要包括三部分内容:

一.Nacos集群如何使用Raft协议写入数据

二.Nacos集群如何选举Leader节点

三.Nacos集群如何让Leader实现心跳请求同步数据

Nacos早期版本实现的只是Raft协议的简化版本,并没有两阶段提交的处理。而是Leader节点处理数据完成后,直接就去同步给其他集群节点。哪怕集群节点同步失败或没有过半节点成功,Leader的数据也不会回滚而只抛出异常。所以,Nacos早期版本的Raft实现,后期也会废弃使用。

如下是Nacos实现的Raft协议在注册服务实例时集群处理数据的流程:

如下是Nacos实现的Raft协议处理Leader选举和通过心跳同步数据的流程:

相关文章:

  • Python爬虫+代理IP+Header伪装:高效采集亚马逊数据
  • 【AI知识库云研发部署】RAGFlow + DeepSeek
  • python打卡day17
  • 解决android studio 中gradle 出现task list not built
  • 使用Unsloth微调DeepSeek-R1蒸馏模型:低显存高效训练实践
  • 效率提升利器:解锁图片处理新姿势
  • x-cmd install | Tuistash - Logstash 实时监控,告别图形界面,高效便捷!
  • 餐饮部绩效考核管理制度与综合评估方法
  • STL之stackqueue
  • Linux主机时间设置操作指南及时间异常影响
  • 开个帖子记录一下自己学spring源码的过程
  • LLM评估指标:WSC和WebNLG 是什么
  • mysql协议详解
  • Waymo公司正在加快其位于亚利桑那州新工厂的无人驾驶出租车(robotaxi)生产进度
  • 使用 AddressSanitizer 检测堆越界错误
  • 小刚说C语言刷题—1044 -找出最经济型的包装箱型号
  • 资产管理系统选型避坑:2025年核心技术趋势洞察
  • 凌晨三点的数据库崩溃现场
  • Dependency Track使用
  • 疗愈服务预约小程序源码介绍
  • 现场|万米云端,遇见上博
  • 上海乐高乐园度假区将于7月5日开园
  • 非洲中青年军官代表团访华,赴北京、长沙、韶山等地参访交流
  • 景点变回监狱,特朗普下令重新启用“恶魔岛”
  • 五一车市消费观察:政策赋能、企业发力,汽车消费火热
  • 港股5月迎开门红,恒生科指涨3%,欧股开盘全线上扬