当前位置: 首页 > news >正文

Nacos源码—3.Nacos集群高可用分析二

大纲

1.Nacos集群的几个问题

2.单节点对服务进行心跳健康检查和同步检查结果

3.集群新增服务实例时如何同步给其他节点

4.集群节点的健康状态变动时的数据同步

5.集群新增节点时如何同步已有服务实例数据

3.集群新增服务实例时如何同步给其他节点

(1)新增服务实例时同步给集群其他节点的架构

(2)新增服务实例时同步给集群其他节点的源码

(3)总结

(1)新增服务实例时同步给集群其他节点的架构

Nacos使用的架构是:双层内存队列 + 异步任务。

第一层:

Nacos会使用一个ConcurrentHashMap作为延迟任务的存储容器,把新增服务实例的信息包装成一个DistroDelayTask任务,放入到该Map中。

DistroTaskEngineHolder有一个属性叫DistroDelayTaskExecuteEngine,该属性父类构造方法会开启一个异步任务从ConcurrentHashMap获取DistroDelayTask任务。

第二层:

Nacos会使用BlockingQueue作为同步任务的存储容器,根据参数创建DistroSyncChangeTask线程任务,并放入BlockingQueue。

Nacos会开启一个InnerWorker异步任务,它会从BlockingQueue取出DistroSyncChangeTask并调用其run()方法。

在DistroSyncChangeTask的run()方法中,最后会通过HTTP方式,调用其他集群节点的API接口来完成数据同步。

(2)新增服务实例时同步给集群其他节点的源码

一.构造延迟任务存储在Map中 + 异步任务处理

二.构造同步任务存储在Queue中 + 异步任务处理

三.同步服务实例数据到集群节点的核心方法

一.构造延迟任务存储在Map中 + 异步任务处理

Nacos服务端在处理服务实例注册请求时,会调用DistroConsistencyServiceImpl的onPut()方法来触发更新内存注册表,然后才调用DistroProtocol的sync()方法进行集群数据的同步。

在DistroProtocol的sync()方法的for循环会遍历除自身外的其他集群节点。这个集群节点数据是在搭建Nacos集群时,在cluster.conf文件中配置的,所以Nacos服务端能够获取到整个集群节点的信息。遍历除自身外的集群节点,是因为自己本身是不需要进行数据同步的,当前节点自己只需要同步数据到其他集群节点即可。

DistroProtocol的sync()方法的for循环最后封装一个DistroDelayTask任务,然后调用NacosDelayTaskExecuteEngine的addTask()方法添加到tasks属性,也就是ConcurrentHashMap类型的tasks属性中,其中DistroDelayTask任务实现了NacosTask任务。

而NacosDelayTaskExecuteEngine在初始化时,会开启一个异步任务。这个异步任务会执行ProcessRunnable的run()方法,接着会执行NacosDelayTaskExecuteEngine的processTasks()方法。

在processTasks()方法中,先从tasks这个map中获取全部的key进行遍历,然后根据key调用NacosDelayTaskExecuteEngine的removeTask()方法。removeTask()方法会将从tasks这个map中获取到的延迟任务进行删除然后返回,接着根据taskKey获取DistroDelayTaskProcessor同步任务处理器,最后调用DistroDelayTaskProcessor的process()方法,把从removeTask()方法返回的NacosTask延迟任务放入第二层内存队列中。

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final DistroProtocol distroProtocol;...@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中,//并添加异步任务来实现将最新的服务实例列表更新到内存注册表onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍历除自身以外的其他集群节点for (Member each : memberManager.allMembersWithoutSelf()) {//包装第一层DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());//包装第二层DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//实际调用的是NacosDelayTaskExecuteEngine.addTask()方法添加任务distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}}...
}public class DistroKey {private String resourceKey;private String resourceType;    private String targetServer;    public DistroKey() {}public DistroKey(String resourceKey, String resourceType, String targetServer) {this.resourceKey = resourceKey;this.resourceType = resourceType;this.targetServer = targetServer;}...
}//Distro delay task.
public class DistroDelayTask extends AbstractDelayTask {private final DistroKey distroKey;private DataOperation action;private long createTime;public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {this.distroKey = distroKey;this.action = action;this.createTime = System.currentTimeMillis();setLastProcessTime(createTime);setTaskInterval(delayTime);}...
}//Abstract task which can delay and merge.
public abstract class AbstractDelayTask implements NacosTask {//Task time interval between twice processing, unit is millisecond.private long taskInterval;//The time which was processed at last time, unit is millisecond.private long lastProcessTime;public void setTaskInterval(long interval) {this.taskInterval = interval;}public void setLastProcessTime(long lastProcessTime) {this.lastProcessTime = lastProcessTime;}...
}//Distro task engine holder.
@Component
public class DistroTaskEngineHolder {private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {return delayTaskExecuteEngine;}...
}public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {public DistroDelayTaskExecuteEngine() {super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);}...
}//Nacos delay task execute engine.
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池protected final ReentrantLock lock = new ReentrantLock();...public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启延时任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到ConcurrentHashMap中tasks.put(key, newTask);} finally {lock.unlock();}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}...//process tasks in execute engine.protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//根据taskKey获取NacosTaskProcessor延迟任务处理器:DistroDelayTaskProcessorNacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//调用DistroDelayTaskProcessor.process()方法,把task同步任务放入到第二层内存队列中if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}}@Overridepublic AbstractDelayTask removeTask(Object key) {lock.lock();try {AbstractDelayTask task = tasks.get(key);if (null != task && task.shouldProcess()) {return tasks.remove(key);} else {return null;}} finally {lock.unlock();}}
}

二.构造同步任务存储在Queue中 + 异步任务处理

在DistroDelayTaskProcessor的process()方法中,会把获取到的NacosTask延迟任务放入第二层内存队列。也就是先将NacosTask任务对象转换为DistroDelayTask延迟任务对象,然后包装一个DistroSyncChangeTask同步任务对象,最后调用NacosExecuteTaskExecuteEngine的addTask()方法添加到队列中。

具体在执行NacosExecuteTaskExecuteEngine的addTask()方法时,会调用同一个类下的getWorker()方法获取其中一个TaskExecuteWorker。然后通过调用TaskExecuteWorker的process()方法,把DistroSyncChangeTask同步任务放入TaskExecuteWorker的queue队列。

创建NacosExecuteTaskExecuteEngine时会创建多个TaskExecuteWorker,而TaskExecuteWorker初始化时又会启动一个InnerWorker线程。这个InnerWorker线程会不断从阻塞队列中取出同步任务进行处理,也就是InnerWorker的run()方法会调用DistroSyncChangeTask的run()方法,通过DistroSyncChangeTask的run()方法来处理服务实例数据的集群同步。

//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {private final DistroTaskEngineHolder distroTaskEngineHolder;private final DistroComponentHolder distroComponentHolder;public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder, DistroComponentHolder distroComponentHolder) {this.distroTaskEngineHolder = distroTaskEngineHolder;this.distroComponentHolder = distroComponentHolder;}@Overridepublic boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}//将NacosTask任务对象转换为DistroDelayTask任务对象DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {//包装成一个DistroSyncChangeTask对象DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法添加到队列中去distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false;}
}//Nacos execute task execute engine.
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);//TaskExecuteWorker在初始化时会启动一个线程处理其队列中的任务executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把DistroSyncChangeTask任务放入到队列当中去worker.process(task);}private TaskExecuteWorker getWorker(Object tag) {int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();return executeWorkers[idx];}...
}//Nacos execute task execute worker.
public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {//任务存储容器private final BlockingQueue<Runnable> queue;public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {this.name = name + "_" + mod + "%" + total;this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);this.closed = new AtomicBoolean(false);this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;new InnerWorker(name).start();}...@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把DistroSyncChangeTask任务放入到队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把DistroSyncChangeTask任务放入到队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...//Inner execute worker.private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取队列中的任务,这里的task任务类型是:DistroSyncChangeTaskRunnable task = queue.take();long begin = System.currentTimeMillis();//调用DistroSyncChangeTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("distro task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[DISTRO-FAILED] " + e.toString(), e);}}}}
}

三.同步服务实例数据到集群节点的核心方法

在DistroSyncChangeTask的run()方法中,会先获取DistroHttpAgent,然后调用DistroHttpAgent的syncData()方法,通过HTTP方式把新增的服务实例数据同步给其他集群节点。向集群节点进行同步服务实例数据的地址是:/v1/ns/distro/datum,这对应于DistroController的onSyncDatum()方法。

DistroController的onSyncDatum()方法会遍历传递过来的服务实例对象。如果调用ServiceManager的containService()方法时发现服务不存在,则先通过ServiceManager的createEmptyService()方法创建空的服务,然后会调用DistroProtocol的onReceive()方法注册服务实例,接着会调用DistroConsistencyServiceImpl的processData()方法进行处理,最后又会调用实例注册时的DistroConsistencyServiceImpl的onPut()方法。

//Distro sync change task.
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {    private final DistroComponentHolder distroComponentHolder;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey);this.distroComponentHolder = distroComponentHolder;}@Overridepublic void run() {Loggers.DISTRO.info("[DISTRO-START] {}", toString());try {//构建请求参数String type = getDistroKey().getResourceType();DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);//调用DistroHttpAgent.syncData()方法,通过HTTP方式同步新增的服务实例数据boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}}...
}//Distro http agent.
public class DistroHttpAgent implements DistroTransportAgent {private final ServerMemberManager memberManager;public DistroHttpAgent(ServerMemberManager memberManager) {this.memberManager = memberManager;}@Overridepublic boolean syncData(DistroData data, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}byte[] dataContent = data.getContent();//通过HTTP方式同步新增的服务实例数据return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());}...
}public class NamingProxy {...//Synchronize datum to target server.public static boolean syncData(byte[] data, String curServer) {Map<String, String> headers = new HashMap<>(128);headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);headers.put(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");headers.put(HttpHeaderConsts.CONTENT_ENCODING, "gzip");try {//通过HTTP同步数据 :/v1/ns/distro/datumRestResult<String> result = HttpClient.httpPutLarge("http://" + curServer + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data);if (result.ok()) {return true;}if (HttpURLConnection.HTTP_NOT_MODIFIED == result.getCode()) {return true;}throw new IOException("failed to req API:" + "http://" + curServer + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.getCode() + " msg: "+ result.getData());} catch (Exception e) {Loggers.SRV_LOG.warn("NamingProxy", e);}return false;}...
}//Restful methods for Partition protocol.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {@Autowiredprivate DistroProtocol distroProtocol;@Autowiredprivate ServiceManager serviceManager;...//Synchronize datum.@PutMapping("/datum")public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {    if (dataMap.isEmpty()) {Loggers.DISTRO.error("[onSync] receive empty entity!");throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");}//遍历新增的服务实例对象for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {//获取命名空间、服务实例名称String namespaceId = KeyBuilder.getNamespace(entry.getKey());String serviceName = KeyBuilder.getServiceName(entry.getKey());if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) {//创建空的服务Service,这和服务实例注册时一样serviceManager.createEmptyService(namespaceId, serviceName, true);}DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());//注册新的服务实例对象distroProtocol.onReceive(distroHttpData);}}return ResponseEntity.ok("ok");}...
}@Component
public class DistroProtocol {...//Receive synced distro data, find processor to process.public boolean onReceive(DistroData distroData) {String resourceType = distroData.getDistroKey().getResourceType();//获取到DistroConsistencyServiceImplDistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}//调用DistroConsistencyServiceImpl.processData()方法处理新增的服务实例return dataProcessor.processData(distroData);}...
}@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {//用于存储所有已注册的服务实例数据private final DataStore dataStore;private volatile Notifier notifier = new Notifier();...@Overridepublic boolean processData(DistroData distroData) {DistroHttpData distroHttpData = (DistroHttpData) distroData;Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();//这里的onPut()方法和服务实例注册时调用的onPut()方法一样onPut(datum.key, datum.value);return true;}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}...
}

(4)总结

一开始调用DistroConsistencyServiceImpl的put()方法进行服务实例注册时,会调用DistroProtocol的sync()方法同步新增的服务实例给其他集群节点,然后会构造延迟任务存储在Map中 + 异步任务处理,接着继续构造同步任务存储在阻塞队列Queue中 + 异步任务处理,最后异步任务会发起HTTP请求来进行服务实例的数据同步,最终又调用回DistroConsistencyServiceImpl的onPut()方法来更新注册表。所以集群的每个节点都会有所有服务实例的数据。

之所以使用双层内存队列,而不是使用一个内存队列,直接将同步新增服务实例的任务异步交给TaskExecuteWorker进行处理,是因为希望通过加多一个内存队列进行中转来进一步提升处理的性能。服务实例的注册是有可能出现超高并发的,比如上千台机器同时启动,那么就会对Nacos服务端产生上千并发的服务实例注册请求。这时候如果只有一个内存队列,那么上千的新增服务实例的同步请求任务在竞争锁进入TaskExecuteWorker的阻塞队列(内存队列)时,就会让发起服务实例注册请求的Nacos客户端等待Nacos服务端响应的时间过长。

4.集群节点的健康状态变动时的数据同步

(1)Nacos后台管理的集群管理模块介绍

(2)集群节点启动时开启节点健康检查任务的源码

(3)集群节点收到健康检查请求后的数据同步源码

(4)总结

(1)Nacos后台管理的集群管理模块介绍

在集群管理模块下,可以看到每个节点的状态和元数据。节点IP就是节点的IP地址以及端口,节点状态就是标识当前节点是否可用,节点元数据就是相关的Raft信息。

其中节点元数据示例如下:

{// 最后刷新时间"lastRefreshTime": 1674093895774,// raft 元信息"raftMetaData": {"metaDataMap": {"naming_persistent_service": {// leader IP 地址"leader": "10.0.16.3:7849",// raft 分组节点"raftGroupMember": ["10.0.16.3:7850","10.0.16.3:7848","10.0.16.3:7849"],"term": 1}}},// raft 端口"raftPort": "7849",// Nacos 版本"version": "1.4.1"
}

(2)集群节点启动时开启节点健康检查任务的源码

因为ServerMemberManager这个Bean会监听WebServerInitializedEvent事件,所以Spring启动时会执行ServerMemberManager的onApplicationEvent()方法。该方法会在集群模式下开启一个集群节点的健康检查任务,也就是会执行MemberInfoReportTask的run()方法,即执行Task的run()方法。

由于MemberInfoReportTask类继承了使用模版设计模式的抽象父类Task,所以执行Task的run()方法时:会先执行MemberInfoReportTask的executeBody()方法,然后会执行MemberInfoReportTask的after()方法。

在MemberInfoReportTask的executeBody()方法中:首先会获取除自身以外的其他集群节点List,然后通过对cursor变量自增后取模,来选出本次请求的目标节点Member,最后通过HTTP方式(/v1/core/cluster/report)对目标节点Member发起请求。如果目标节点返回成功,则执行MemberUtil的onSuccess()方法。如果目标节点返回失败,则执行MemberUtil的onFail()方法,并且把目标节点Member的state属性修改为DOWN。

最后在MemberInfoReportTask的after()方法中:又会重新提交这个MemberInfoReportTask健康检查任务,反复执行。

@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {private final NacosAsyncRestTemplate asyncRestTemplate = HttpClientBeanHolder.getNacosAsyncRestTemplate(Loggers.CORE);//Address information for the local node.private String localAddress;//Broadcast this node element information task.private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();...//监听Spring启动时发布的WebServerInitializedEvent事件@Overridepublic void onApplicationEvent(WebServerInitializedEvent event) {//设置当前集群节点的状态为默认状态getSelf().setState(NodeState.UP);//集群模式下才启动集群节点的健康检查任务if (!EnvUtil.getStandaloneMode()) {//开启一个延时任务,执行MemberInfoReportTask.run()方法GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);}EnvUtil.setPort(event.getWebServer().getPort());EnvUtil.setLocalAddress(this.localAddress);Loggers.CLUSTER.info("This node is ready to provide external services");}...    class MemberInfoReportTask extends Task {private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() { };private int cursor = 0;@Overrideprotected void executeBody() {//获取除自身节点外的其他集群节点List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();if (members.isEmpty()) {return;}//轮询请求:每执行一次executeBody()方法,cursor就加1,然后根据cursor去获取对应的某集群节点Memberthis.cursor = (this.cursor + 1) % members.size();Member target = members.get(cursor);Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());//获取URL参数:/v1/core/cluster/reportfinal String url = HttpUtils.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");try {//通过HTTP发起请求,向某集群节点Member发起健康检查请求asyncRestTemplate.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) {Loggers.CLUSTER.warn("{} version is too low, it is recommended to upgrade the version : {}", target, VersionUtils.version);return;}if (result.ok()) {//如果请求成功,则设置集群节点Member的状态为NodeState.UPMemberUtil.onSuccess(ServerMemberManager.this, target);} else {//如果请求失败,则设置集群节点Member的状态为NodeState.DOWNLoggers.CLUSTER.warn("failed to report new info to target node : {}, result : {}", target.getAddress(), result);MemberUtil.onFail(ServerMemberManager.this, target);}}@Overridepublic void onError(Throwable throwable) {Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable));//如果请求失败,则设置集群节点Member的状态为NodeState.DOWNMemberUtil.onFail(ServerMemberManager.this, target, throwable);}@Overridepublic void onCancel() {}});} catch (Throwable ex) {Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex));}}@Overrideprotected void after() {//重新提交这个节点健康检查的异步任务,从而实现反复执行GlobalExecutor.scheduleByCommon(this, 2_000L);}}
}//Task使用了模版方法
public abstract class Task implements Runnable {protected volatile boolean shutdown = false;@Overridepublic void run() {if (shutdown) {return;}try {//执行异步任务的核心逻辑,这个方法是一个抽象方法,交给子类去具体实现executeBody();} catch (Throwable t) {Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));} finally {if (!shutdown) {after();}}}protected abstract void executeBody();protected void after() {}public void shutdown() {shutdown = true;}
}public class MemberUtil {...//Successful processing of the operation on the node.public static void onSuccess(final ServerMemberManager manager, final Member member) {final NodeState old = member.getState();manager.getMemberAddressInfos().add(member.getAddress());member.setState(NodeState.UP);member.setFailAccessCnt(0);if (!Objects.equals(old, member.getState())) {manager.notifyMemberChange();}}public static void onFail(final ServerMemberManager manager, final Member member) {onFail(manager, member, ExceptionUtil.NONE_EXCEPTION);}//Failure processing of the operation on the node.public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {manager.getMemberAddressInfos().remove(member.getAddress());final NodeState old = member.getState();member.setState(NodeState.SUSPICIOUS);member.setFailAccessCnt(member.getFailAccessCnt() + 1);int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {member.setState(NodeState.DOWN);}if (!Objects.equals(old, member.getState())) {manager.notifyMemberChange();}}...
}public class GlobalExecutor {private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4,new NameThreadFactory("com.alibaba.nacos.core.common"));...public static void scheduleByCommon(Runnable runnable, long delayMs) {if (COMMON_EXECUTOR.isShutdown()) {return;}//在指定的延迟后执行某项任务COMMON_EXECUTOR.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);}...
}public final class ExecutorFactory {...public static final class Managed {private static final String DEFAULT_NAMESPACE = "nacos";private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();...//Create a new scheduled executor service with input thread factory and register to manager.public static ScheduledExecutorService newScheduledExecutorService(final String group, final int nThreads, final ThreadFactory threadFactory) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);return executorService;}...}...
}

(3)集群节点收到健康检查请求后的数据同步源码

集群节点收到某集群节点发来的"/v1/core/cluster/report"请求后,会调用NacosClusterController的report()方法来处理请求。在report()方法中,会把发起请求的来源节点状态直接设置成UP状态,然后调用ServerMemberManager的update()方法来更新来源节点属性。在update()方法中,会把存放在serverList中对应的节点Member进行更新,也就是通过MemberUtil的copy()方法覆盖老对象的属性来实现更新。

注意:因为serverList属性在集群中的每个节点都存在一份,所以节点收到健康检查请求后,要对其serverList属性中的节点进行更新。

@RestController
@RequestMapping(Commons.NACOS_CORE_CONTEXT + "/cluster")
public class NacosClusterController {private final ServerMemberManager memberManager;...//Other nodes return their own metadata information.@PostMapping(value = {"/report"})public RestResult<String> report(@RequestBody Member node) {if (!node.check()) {return RestResultUtils.failedWithMsg(400, "Node information is illegal");}LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);//能够正常请求到该接口的集群节点肯定是健康的,所以直接设置其节点状态为UPnode.setState(NodeState.UP);node.setFailAccessCnt(0);//修改集群节点boolean result = memberManager.update(node);return RestResultUtils.success(Boolean.toString(result));}...
}@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {//Cluster node list.private volatile ConcurrentSkipListMap<String, Member> serverList;...//member information update.public boolean update(Member newMember) {Loggers.CLUSTER.debug("member information update : {}", newMember);String address = newMember.getAddress();if (!serverList.containsKey(address)) {return false;}//更新serverList中的数据serverList.computeIfPresent(address, (s, member) -> {//如果服务状态不健康,则直接移除if (NodeState.DOWN.equals(newMember.getState())) {memberAddressInfos.remove(newMember.getAddress());}//对比信息是否有做改变boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);//修改lastRefreshTime为当前时间newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());//属性覆盖MemberUtil.copy(newMember, member);if (isPublishChangeEvent) {//member basic data changes and all listeners need to be notified//如果有做改变,需要发布相关事件通知notifyMemberChange();}return member;});return true;}...
}

(4)总结

在Nacos集群架构下,集群节点间的健康状态如何进行同步。简单来说,集群节点间是会相互进行通信的。如果通信失败,那么就会把通信节点的状态属性修改为DOWN。

5.集群新增节点时如何同步已有服务实例数据

(1)节点启动时加载全部服务实例数据的异步任务

(2)节点处理获取全部服务实例数据请求的源码

(3)总结

(1)节点启动时加载服务实例数据的异步任务

Nacos服务端会有一个DistroProtocol类,它是一个Bean对象,在Spring项目启动时会创建这个DistroProtocol类型的Bean。

创建DistroProtocol类型的Bean时,会执行DistroProtocol的构造方法,从而调用DistroProtocol的startLoadTask()方法开启一个加载数据的异步任务。

在DistroProtocol的startLoadTask()方法中,会提交一个异步任务,并且会通过传入一个回调方法来标志是否已初始化成功。其中提交的任务类型是DistroLoadDataTask,所以会执行DistroLoadDataTask的run()方法,接着会执行DistroLoadDataTask的load()方法,然后执行该任务类的loadAllDataSnapshotFromRemote()方法,从而获取其他集群节点上的全部服务实例数据并更新本地注册表。

在loadAllDataSnapshotFromRemote()方法中,首先会遍历除自身节点外的其他集群节点。然后调用DistroHttpAgent的getDatumSnapshot()方法,通过HTTP请求"/v1/ns/distro/datums"获取目标节点的全部服务实例数据。接着再调用DistroConsistencyServiceImpl的processSnapshot()方法,将获取到的全部服务实例数据写入到本地注册表中。其中只要有一个集群节点数据同步成功,那么这个方法就结束。否则就继续遍历下一个集群节点,获取全部服务实例数据然后同步本地。

Nacos服务端在处理服务实例注册时,采用的是内存队列 + 异步任务。异步任务会调用listener的onChange()方法利用写时复制来更新本地注册表。而processSnapshot()方法也会调用listener的onChange()方法来更新注册表,其中listener的onChange()方法对应的实现其实就是Service的onChange()方法。

@Component
public class DistroProtocol {...public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {this.memberManager = memberManager;this.distroComponentHolder = distroComponentHolder;this.distroTaskEngineHolder = distroTaskEngineHolder;this.distroConfig = distroConfig;//开启一个异步任务startDistroTask();}private void startDistroTask() {if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}startVerifyTask();//提交一个加载数据的异步任务startLoadTask();}private void startLoadTask() {//加载数据的回调方法,修改isInitialized属性,标识是否初始化成功DistroCallback loadCallback = new DistroCallback() {@Overridepublic void onSuccess() {isInitialized = true;}@Overridepublic void onFailed(Throwable throwable) {isInitialized = false;}};//提交异步任务GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));}...
}//Distro load data task.
public class DistroLoadDataTask implements Runnable {...@Overridepublic void run() {try {load();if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}}private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}}private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);return false;}//遍历除自身节点外的其他节点for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());//调用DistroHttpAgent.getDatumSnapshot()方法,通过HTTP方式获取其他集群节点的数据DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());//调用DistroConsistencyServiceImpl.processSnapshot()方法,同步返回结果到自身节点的内存注册表boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);//只要有一个集群节点返回全部数据并同步成功则结束if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;}...
}public class DistroHttpAgent implements DistroTransportAgent {...@Overridepublic DistroData getDatumSnapshot(String targetServer) {try {//通过NamingProxy发起HTTP请求byte[] allDatum = NamingProxy.getAllData(targetServer);return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);} catch (Exception e) {throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);}}...
}public class NamingProxy {...//获取目标节点的全部数据public static byte[] getAllData(String server) throws Exception {Map<String, String> params = new HashMap<>(8);RestResult<String> result = HttpClient.httpGet("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,new ArrayList<>(),params);if (result.ok()) {return result.getData().getBytes();}throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "+ result.getMessage());}...
}@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {...@Overridepublic boolean processSnapshot(DistroData distroData) {try {return processData(distroData.getContent());} catch (Exception e) {return false;}}private boolean processData(byte[] data) throws Exception {if (data.length > 0) {//序列化成对象Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);//创建空的Servicefor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {...}for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {//更新本地注册表for (RecordListener listener : listeners.get(entry.getKey())) {listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}//Update data store if listener executed successfully:dataStore.put(entry.getKey(), entry.getValue());}}return true;}...
}

总结:Nacos服务端集群节点启动时,会创建一个DistroProtocol类型的Bean对象,在这个DistroProtocol类型的Bean对象的构造方法会开启一个异步任务。该异步任务的主要逻辑是通过HTTP方式从其他集群节点获取服务数据,然后把获取到的服务实例数据更新到本地的内存注册表,完成数据同步。而且只要成功从某一个集群节点完成数据同步,那整个任务逻辑就结束。

此外,向某个集群节点获取全部服务实例数据时,是向"/v1/ns/distro/datums"接口发起HTTP请求来进行获取的。

(2)节点处理获取全部服务实例数据请求的源码

Nacos集群节点收到"/v1/ns/distro/datums"的HTTP请求后,便会执行DistroController的getAllDatums()方法。也就是调用DistroProtocol的onSnapshot()方法获取数据,然后直接返回。接着会调用DistroDataStorageImpl的getDatumSnapshot()方法。

getDatumSnapshot()方法会从DataStore的getDataMap()方法获取结果。进行服务实例注册时,会把服务实例信息存一份放在DataStore的Map中。进行服务实例同步时,也会把服务实例信息存放到DataStore的Map中。所以在DataStore里,会包含整个服务实例信息的数据。这里获取全部服务实例数据的接口,也是利用DataStore来实现的,而不是从内存注册表中获取。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro")
public class DistroController {@Autowiredprivate DistroProtocol distroProtocol;...//Get all datums.@GetMapping("/datums")public ResponseEntity getAllDatums() {DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);return ResponseEntity.ok(distroData.getContent());}...
}@Component
public class DistroProtocol {...//Query all datum snapshot.public DistroData onSnapshot(String type) {DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);if (null == distroDataStorage) {Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);return new DistroData(new DistroKey("snapshot", type), new byte[0]);}//调用DistroDataStorageImpl.getDatumSnapshot()方法return distroDataStorage.getDatumSnapshot();}...
}public class DistroDataStorageImpl implements DistroDataStorage {    private final DataStore dataStore;...@Overridepublic DistroData getDatumSnapshot() {Map<String, Datum> result = dataStore.getDataMap();//对服务实例数据进行序列化byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);//封装一个DistroData对象并返回return new DistroData(distroKey, dataContent);}...
}//Store of data. 用于存储所有已注册的服务实例数据
@Component
public class DataStore {private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);public void put(String key, Datum value) {dataMap.put(key, value);}public Datum remove(String key) {return dataMap.remove(key);}public Set<String> keys() {return dataMap.keySet();}public Datum get(String key) {return dataMap.get(key);}public boolean contains(String key) {return dataMap.containsKey(key);}public Map<String, Datum> batchGet(List<String> keys) {Map<String, Datum> map = new HashMap<>(128);for (String key : keys) {Datum datum = dataMap.get(key);if (datum == null) {continue;}map.put(key, datum);}return map;}...public Map<String, Datum> getDataMap() {return dataMap;}
}

注意:DataStore数据最后还是存到内存的。通过使用DataStore,可以实现以下功能和好处:

一.数据持久化

DataStore可将节点数据持久化到磁盘或其他介质,以确保数据的持久性。这样即使系统重启或发生故障,节点数据也能够得到恢复和保留。毕竟Datum的key是ServiceName、value是Instance实例列表,而Instance实例中又会包含所属的ClusterName、IP和Port,所以根据DataStore可以恢复完整的内存注册表。

Map<string, map> serviceMap;
Map(namespace, Map(group::serviceName, Service));

二.数据同步

DataStore可以协调和同步节点数据的访问和更新。当多个节点同时注册或更新数据时,DataStore可确保数据的一致性和正确性,避免数据冲突和不一致的情况。

三.数据管理

DataStore提供了对节点数据的管理功能,包括增加、更新、删除等操作。通过使用适当的数据结构和算法,可以高效地管理大量的节点数据,并支持快速的数据访问和查询。

四.数据访问控制

DataStore可以实现对节点数据的访问控制和权限管理,只有具有相应权限的节点或用户才能访问和修改特定的节点数据,提高数据的安全性和保密性。

DataStore在Nacos中充当了节点数据的中央存储和管理器。通过提供持久化 + 同步 + 管理 + 访问控制等功能,确保节点数据的可靠性 + 一致性 + 安全性,是实现节点数据存储和操作的核心组件之一。

(3)总结

Nacos集群架构下新增一个集群节点时,新节点会如何进行服务数据同步:

首先利用了DistroProtocol类的Bean对象的构造方法开启异步任务,通过HTTP方式去请求其他集群节点的全部数据。

当新节点获取全部数据后,会调用Service的onChange()方法,然后利用写时复制机制更新本地内存注册表。

Nacos集群节点在处理获取全部服务实例数据的请求时,并不是从内存注册表中获取的,而是通过DataStore来获取。

相关文章:

  • Vue中的过滤器参数:灵活处理文本格式化
  • Docker 使用下 (二)
  • 知识图谱 + 大语言模型:打造更聪明、更可靠的AI大脑 —— 探索 GraphRAG 中文优化与可视化实践
  • VirtualBox调整虚拟机内存和CPU
  • 数据库的原子事务
  • 阿里云物联网平台--云产品流传
  • Qt6.8中进行PDF文件读取和编辑
  • 【Java学习笔记】包
  • LeetCode 0790.多米诺和托米诺平铺:难想条件的简单动态规划
  • 实验-数字电路设计2-复用器和七段数码管(数字逻辑)
  • 设计模式每日硬核训练 Day 18:备忘录模式(Memento Pattern)完整讲解与实战应用
  • Python基本语法(类和实例)
  • 基于MATLAB的图像色彩识别项目,彩色图像矩阵识别
  • Java:从入门到精通,你的编程之旅
  • 预训练到微调:深入理解AI的上下游任务
  • 访问计划(C++)
  • 【SpringBoot篇】详解短信验证码登录功能实现
  • 学习黑客Nmap 命令法诀
  • GD32F103C8T6多串口DMA空闲中断通信程序
  • 小刚说C语言刷题—1038编程求解数学中的分段函数
  • 体坛联播|拜仁提前2轮德甲夺冠,赵心童11比6暂时领先
  • 中南财经政法大学法学院党委副书记易育去世,终年45岁
  • 首日5金!中国队夺得跳水世界杯总决赛混合团体冠军
  • 中国队夺跳水世界杯总决赛首金
  • 体坛联播|曼联一只脚迈进欧联杯决赛,赵心童4比4奥沙利文
  • 光明日报社论:用你我的匠心,托举起繁盛的中国