当前位置: 首页 > news >正文

Nacos源码—3.Nacos集群高可用分析一

大纲

1.Nacos集群的几个问题

2.单节点对服务进行心跳健康检查和同步检查结果

3.集群新增服务实例时如何同步给其他节点

4.集群节点的健康状态变动时的数据同步

5.集群新增节点时如何同步已有服务实例数据

1.Nacos集群的几个问题

问题一:在单机模式下,Nacos服务端会开启心跳健康检查的定时任务。那么在集群模式下,是否有必要让全部集群节点都执行这个定时任务?

问题二:Nacos服务端通过心跳健康检查的定时任务感知服务实例健康状态改变时,如何把服务实例的健康状态同步给其他Nacos集群节点?

问题三:一个新服务实例发起注册请求,只会有一个Nacos集群节点处理对应请求,那么处理完注册请求后,集群节点间应该如何同步服务实例数据?

问题四:假设Nacos集群有三个节点,现在需要新增了一个节点,那么新增的节点应该如何从集群中同步已存在的服务实例数据?

问题五:Nacos集群节点相互之间,是否有心跳机制来检测集群节点是否可用?

2.单节点对服务进行心跳健康检查和同步检查结果

(1)集群对服务进行心跳健康检查的设计

(2)选择一个节点对服务进行心跳健康检查的源码

(3)集群之间同步服务的健康状态的源码

(4)总结

(1)集群对服务进行心跳健康检查的架构设计

假设Nacos集群有三个节点:现已知单机模式下的Nacos服务端是会开启心跳健康检查的定时任务的。既然集群节点有三个,是否每个节点都要执行心跳健康检查的定时任务?

方案一:三个节点全都去执行心跳健康检查任务。如果每个节点执行的结果都不同,那么以哪个为准?

方案二:只有一个节点去执行心跳健康检查任务,然后把检查结果同步给其他节点。

明显方案二逻辑简洁清晰,而Nacos集群也选择了方案二。在Nacos集群模式下,三个节点都会开启一个心跳健康检查的定时任务,但只有一个节点会真正地执行心跳健康检查的逻辑。然后在检查完成后,会开启一个定时任务将检查结果同步给其他节点。

(2)选择一个节点对服务进行心跳健康检查的源码

对服务进行心跳健康检查的任务,其实就是ClientBeatCheckTask任务。Nacos服务端在处理服务实例注册接口请求时,就会开启这个任务。如下所示:

ClientBeatCheckTask这个类是一个线程任务。在ClientBeatCheckTask的run()方法中,一开始就有两个if判断。第一个if判断:判断当前节点在集群模式下是否需要对该Service执行心跳健康检查任务。第二个if判断:是否开启了健康检查任务,默认是开启的。注意:ClientBeatProcessor用于处理服务实例的心跳,服务实例和服务都需要心跳健康检查。

在集群模式下,为了保证只有一个节点对该Service执行心跳健康检查,就需要第一个if判断中的DistroMapper的responsible()方法来实现了。通过DistroMapper的responsible()方法可知:只会有一个集群节点能够对该Service执行心跳健康检查。而其他的集群节点,并不会去执行对该Service的心跳健康检查。

//Check and update statues of ephemeral instances, remove them if they have been expired.
public class ClientBeatCheckTask implements Runnable {private Service service;//每个ClientBeatCheckTask都会对应一个Service...@JsonIgnorepublic DistroMapper getDistroMapper() {return ApplicationUtils.getBean(DistroMapper.class);}@Overridepublic void run() {try {//第一个if判断:DistroMapper.responsible()方法//判断当前节点在集群模式下是否需要对该Service执行心跳健康检查任务if (!getDistroMapper().responsible(service.getName())) {return;}//第二个if判断://是否开启了健康检查任务,默认是开启的if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);//first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}//then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {//delete instancedeleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}...
}//Distro mapper, judge which server response input service.
@Component("distroMapper")
public class DistroMapper extends MemberChangeListener {//List of service nodes, you must ensure that the order of healthyList is the same for all nodes.private volatile List<String> healthyList = new ArrayList<>();//init server list.@PostConstructpublic void init() {NotifyCenter.registerSubscriber(this);//注册订阅者this.healthyList = MemberUtil.simpleMembers(memberManager.allMembers());}...//Judge whether current server is responsible for input service.public boolean responsible(String serviceName) {//获取集群节点数量,这里假设的是三个集群节点final List<String> servers = healthyList;//如果采用单机模式启动,直接返回trueif (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}//如果没有可用的健康集群节点,直接返回falseif (CollectionUtils.isEmpty(servers)) {//means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}//对serviceName进行Hash操作,然后对servers.size()取模,得到负责执行心跳健康检查任务的那个节点索引int target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;}private int distroHash(String serviceName) {return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);}...
}

(3)集群之间同步服务的健康状态的源码

一.集群间同步服务的健康状态的实现逻辑

二.集群间同步服务的健康状态的实现源码

三.第一个异步任务ServiceReporter

四.第二个异步任务UpdatedServiceProcessor

既然集群中只有一个节点能够对某Service执行心跳健康检查,那么心跳健康检查的结果应该如何同步给集群的其他节点。

一.集群间同步服务的健康状态的实现逻辑

每个节点都会有一个定时任务,用来同步心跳健康检查的结果给其他节点。该异步任务会通过HTTP方式,调用其他集群节点的接口来实现数据同步。

二.集群间同步服务的健康状态的实现源码

在ServiceManager类中,有一个init()方法。该方法被@PostConstruct注解修饰了。在创建ServiceManager这个Bean时,便会调用这个init()方法。而在这个方法中,就会开启同步心跳健康检查结果的定时任务。

其中与同步服务实例健康状态相关的有两个异步任务:第一个是用来发起同步心跳健康检查结果请求的异步任务,第二个是用来处理同步心跳健康检查结果请求的异步任务。处理请求的思路是:内存队列削峰 + 异步任务提速。

//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {...//Init service maneger.@PostConstructpublic void init() {//用来发起 同步心跳健康检查结果请求 的异步任务GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);//用来处理 同步心跳健康检查结果请求 的异步任务:内存队列削峰 + 异步任务提速GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());if (emptyServiceAutoClean) {Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms", cleanEmptyServiceDelay, cleanEmptyServicePeriod);//delay 60s, period 20s;//This task is not recommended to be performed frequently in order to avoid//the possibility that the service cache information may just be deleted//and then created due to the heartbeat mechanismGlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay, cleanEmptyServicePeriod);}try {Loggers.SRV_LOG.info("listen for service meta change");consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);} catch (NacosException e) {Loggers.SRV_LOG.error("listen for service meta change failed!");}}...
}public class GlobalExecutor {private static final ScheduledExecutorService SERVICE_SYNCHRONIZATION_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),new NameThreadFactory("com.alibaba.nacos.naming.service.worker"));public static final ScheduledExecutorService SERVICE_UPDATE_MANAGER_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),new NameThreadFactory("com.alibaba.nacos.naming.service.update.processor"));...public static void scheduleServiceReporter(Runnable command, long delay, TimeUnit unit) {//在指定的延迟后执行某项任务SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(command, delay, unit);}public static void submitServiceUpdateManager(Runnable runnable) {//向线程池提交任务,让线程池执行任务SERVICE_UPDATE_MANAGER_EXECUTOR.submit(runnable);}...
}public final class ExecutorFactory {...public static final class Managed {private static final String DEFAULT_NAMESPACE = "nacos";private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();...//Create a new single scheduled executor service with input thread factory and register to manager.public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, final ThreadFactory threadFactory) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);return executorService;}...}
}//线程池管理器
public final class ThreadPoolManager {private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();private static final AtomicBoolean CLOSED = new AtomicBoolean(false);static {INSTANCE.init();//JVM关闭时添加勾子,释放线程资源ThreadUtils.addShutdownHook(new Thread(new Runnable() {@Overridepublic void run() {LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool");//关闭线程池管理器shutdown();LOGGER.warn("[ThreadPoolManager] Destruction of the end");}}));}public static ThreadPoolManager getInstance() {return INSTANCE;}private ThreadPoolManager() {}private void init() {resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8);}//Register the thread pool resources with the resource manager.public void register(String namespace, String group, ExecutorService executor) {if (!resourcesManager.containsKey(namespace)) {synchronized (this) {lockers.put(namespace, new Object());}}final Object monitor = lockers.get(namespace);synchronized (monitor) {Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);if (map == null) {map = new HashMap<String, Set<ExecutorService>>(8);map.put(group, new HashSet<ExecutorService>());map.get(group).add(executor);resourcesManager.put(namespace, map);return;}if (!map.containsKey(group)) {map.put(group, new HashSet<ExecutorService>());}map.get(group).add(executor);}}//Shutdown thread pool manager. 关闭线程池管理器public static void shutdown() {if (!CLOSED.compareAndSet(false, true)) {return;}Set<String> namespaces = INSTANCE.resourcesManager.keySet();for (String namespace : namespaces) {//销毁所有线程池资源INSTANCE.destroy(namespace);}}//Destroys all thread pool resources under this namespace.public void destroy(final String namespace) {final Object monitor = lockers.get(namespace);if (monitor == null) {return;}synchronized (monitor) {Map<String, Set<ExecutorService>> subResource = resourcesManager.get(namespace);if (subResource == null) {return;}for (Map.Entry<String, Set<ExecutorService>> entry : subResource.entrySet()) {for (ExecutorService executor : entry.getValue()) {//关闭线程池ThreadUtils.shutdownThreadPool(executor);}}resourcesManager.get(namespace).clear();resourcesManager.remove(namespace);}}...
}public final class ThreadUtils {...public static void addShutdownHook(Runnable runnable) {Runtime.getRuntime().addShutdownHook(new Thread(runnable));}public static void shutdownThreadPool(ExecutorService executor) {shutdownThreadPool(executor, null);}//Shutdown thread pool.public static void shutdownThreadPool(ExecutorService executor, Logger logger) {executor.shutdown();int retry = 3;while (retry > 0) {retry--;try {if (executor.awaitTermination(1, TimeUnit.SECONDS)) {return;}} catch (InterruptedException e) {executor.shutdownNow();Thread.interrupted();} catch (Throwable ex) {if (logger != null) {logger.error("ThreadPoolManager shutdown executor has error : {}", ex);}}}executor.shutdownNow();}...
}

三.第一个异步任务ServiceReporter

首先从内存注册表中,获取全部的服务名称。ServiceManager的getAllServiceNames()方法返回的是一个Map对象。其中的key是对应的命名空间ID,value是对应命名空间下的全部服务名称。然后遍历allServiceNames中的内容,此时会有两个for循环来处理。最后这个任务执行完,会继续提交一个延时执行的任务进行健康检查。

第一个for循环:遍历某命名空间ID下的全部服务名称,封装请求参数。

首先采用同样的Hash算法,判断遍历到的Service是否需要同步健康结果。如果需要执行,则把参数放到ServiceChecksum对象中。然后通过JacksonUtils转成JSON数据后,再放到Message请求参数对象。

第二个for循环:遍历集群节点,发送请求给其他节点进行数据同步。

首先判断是否是自身节点,如果是则跳过。否则调用ServiceStatusSynchronizer的send()方法。通过向其他集群节点的接口发起请求,来实现心跳健康检查结果的同步。集群节点同步的核心方法就在ServiceStatusSynchronizer的send()方法中。

通过ServiceStatusSynchronizer的send()方法中的代码可知,最终会通过HTTP方式进行数据同步,请求地址是"v1/ns/service/status"。该请求地址对应的请求处理入口是ServiceController的serviceStatus()方法。

在ServiceController的serviceStatus()方法中,如果通过对比入参和注册表的ServiceChecksum后,发现服务状态发生了改变,那么就会调用ServiceManager.addUpdatedServiceToQueue()方法。

在addUpdatedServiceToQueue()方法中,首先会把传入的参数包装成ServiceKey对象,然后放入到toBeUpdatedServicesQueue阻塞队列中。

既然最后会将ServiceKey对象放入到阻塞队列中,那必然有一个异步任务,从阻塞队列中获取ServiceKey对象进行处理。这个处理逻辑和处理服务实例注册时,将Pair对象放入阻塞队列一样,而这个异步任务便是ServiceManager的init()方法的第二个异步任务。

//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {//Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();private final DistroMapper distroMapper;private final Synchronizer synchronizer = new ServiceStatusSynchronizer();...public Map<String, Set<String>> getAllServiceNames() {Map<String, Set<String>> namesMap = new HashMap<>(16);for (String namespaceId : serviceMap.keySet()) {namesMap.put(namespaceId, serviceMap.get(namespaceId).keySet());}return namesMap;}private class ServiceReporter implements Runnable {@Overridepublic void run() {try {//获取内存注册表下的所有服务名称,按命名空间分类Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;}//遍历allServiceNames中的内容//也就是遍历每一个命名空间,然后封装请求参数,接着发送请求来同步心跳健康检查结果for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);//第一个循环:封装请求参数for (String serviceName : allServiceNames.get(namespaceId)) {//采用同样的算法,确保当前的集群节点,只对自己负责的那些Service,同步心跳健康检查结果if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}service.recalculateChecksum();//添加请求参数checksum.addItem(serviceName, service.getChecksum());}//创建请求参数对象Message,准备进行同步Message msg = new Message();//对请求对象进行JSON序列化msg.setData(JacksonUtils.toJson(checksum));Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}//第二个循环:遍历所有集群节点,发送请求给其他节点进行数据同步for (Member server : sameSiteServers) {//判断地址是否是本节点,如果是则直接跳过if (server.getAddress().equals(NetUtils.localServer())) {continue;}//同步其他集群节点synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);} finally {//继续提交一个延时执行的任务GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);}}}...
}public class ServiceStatusSynchronizer implements Synchronizer {@Overridepublic void send(final String serverIP, Message msg) {if (serverIP == null) {return;}//构建请求参数Map<String, String> params = new HashMap<String, String>(10);params.put("statuses", msg.getData());params.put("clientIP", NetUtils.localServer());//拼接url地址String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";}try {//异步发送HTTP请求,url地址就是:http://ip/v1/ns/service/status, 用来同步心跳健康检查结果HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);}}...
}//Service operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service")
public class ServiceController {@Autowiredprotected ServiceManager serviceManager;...//Check service status whether latest.@PostMapping("/status")public String serviceStatus(HttpServletRequest request) throws Exception {String entity = IoUtils.toString(request.getInputStream(), "UTF-8");String value = URLDecoder.decode(entity, "UTF-8");JsonNode json = JacksonUtils.toObj(value);String statuses = json.get("statuses").asText();String serverIp = json.get("clientIP").asText();if (!memberManager.hasMember(serverIp)) {throw new NacosException(NacosException.INVALID_PARAM, "ip: " + serverIp + " is not in serverlist");}try {ServiceManager.ServiceChecksum checksums = JacksonUtils.toObj(statuses, ServiceManager.ServiceChecksum.class);if (checksums == null) {Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: null");return "fail";}for (Map.Entry<String, String> entry : checksums.serviceName2Checksum.entrySet()) {if (entry == null || StringUtils.isEmpty(entry.getKey()) || StringUtils.isEmpty(entry.getValue())) {continue;}String serviceName = entry.getKey();String checksum = entry.getValue();Service service = serviceManager.getService(checksums.namespaceId, serviceName);if (service == null) {continue;}service.recalculateChecksum();//通过对比入参和注册表的checksum,如果发现服务状态有变动if (!checksum.equals(service.getChecksum())) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("checksum of {} is not consistent, remote: {}, checksum: {}, local: {}", serviceName, serverIp, checksum, service.getChecksum());}//添加到阻塞队列serviceManager.addUpdatedServiceToQueue(checksums.namespaceId, serviceName, serverIp, checksum);}}} catch (Exception e) {Loggers.SRV_LOG.warn("[DOMAIN-STATUS] receive malformed data: " + statuses, e);}return "ok";}...
}//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {private final Lock lock = new ReentrantLock();//阻塞队列private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);...//Add a service into queue to update.public void addUpdatedServiceToQueue(String namespaceId, String serviceName, String serverIP, String checksum) {lock.lock();try {//包装成ServiceKey对象,放入到toBeUpdatedServicesQueue阻塞队列中toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);} catch (Exception e) {toBeUpdatedServicesQueue.poll();toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updated to queue.", e);} finally {lock.unlock();}}...
}

四.第二个异步任务UpdatedServiceProcessor

UpdatedServiceProcessor的run()方法中有一个while无限循环,这个while无限循环会从toBeUpdatedServicesQueue阻塞队列中一直取任务。取得任务ServiceKey对象后,会将其封装成ServiceUpdater对象,然后继续将ServiceUpdater对象作为一个任务提交给一个线程池。

这个心跳健康检查结果的数据同步逻辑,和服务实例注册的处理逻辑类似,都使用了"阻塞队列 + 异步任务"的设计思想。放入阻塞队列是为了削峰,从阻塞队列取出任务再提交线程池是为了提速。

线程池在执行同步健康状态任务时,即执行ServiceUpdater的run()方法时,会调用ServiceManager的updatedHealthStatus()方法来更改服务的健康状态。

在ServiceManager的updatedHealthStatus()方法中,首先会解析参数,然后获取注册表中全部的Instance实例,并遍历实例。如果实例的健康状态有变动,则直接更改实例的healthy属性,并且针对healthy有变动的实例,发布服务改变事件通知客户端进行更新。

//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {//阻塞队列private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);...private class UpdatedServiceProcessor implements Runnable {//get changed service from other server asynchronously@Overridepublic void run() {ServiceKey serviceKey = null;try {//无限循环while (true) {try {//从阻塞队列中获取任务serviceKey = toBeUpdatedServicesQueue.take();} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");}if (serviceKey == null) {continue;}GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));}} catch (Exception e) {Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);}}}private class ServiceUpdater implements Runnable {String namespaceId;String serviceName;String serverIP;public ServiceUpdater(ServiceKey serviceKey) {this.namespaceId = serviceKey.getNamespaceId();this.serviceName = serviceKey.getServiceName();this.serverIP = serviceKey.getServerIP();}@Overridepublic void run() {try {//修改服务实例的健康状态updatedHealthStatus(namespaceId, serviceName, serverIP);} catch (Exception e) {Loggers.SRV_LOG.warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName, serverIP, e);}}}//Update health status of instance in service. 修改服务实例的健康状态public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));//解析参数JsonNode serviceJson = JacksonUtils.toObj(msg.getData());ArrayNode ipList = (ArrayNode) serviceJson.get("ips");Map<String, String> ipsMap = new HashMap<>(ipList.size());for (int i = 0; i < ipList.size(); i++) {String ip = ipList.get(i).asText();String[] strings = ip.split("_");ipsMap.put(strings[0], strings[1]);}Service service = getService(namespaceId, serviceName);if (service == null) {return;}//是否改变标识boolean changed = false;//获取全部的实例数据,进行遍历List<Instance> instances = service.allIPs();for (Instance instance : instances) {//同步健康状态结果boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));if (valid != instance.isHealthy()) {changed = true;//更新服务实例的健康状态instance.setHealthy(valid);Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName, (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(), instance.getClusterName());}}//如果服务实例健康状态改变了,那么就发布"服务改变事件",使用UDP方式通知客户端if (changed) {pushService.serviceChanged(service);if (Loggers.EVT_LOG.isDebugEnabled()) {StringBuilder stringBuilder = new StringBuilder();List<Instance> allIps = service.allIPs();for (Instance instance : allIps) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(), service.getName(), stringBuilder.toString());}}}...
}

(4)总结

问题一:在单机模式下,Nacos服务端会开启一个对服务进行心跳健康检查的定时任务。那么在集群模式下,是否有必要让全部节点都执行这个定时任务?

答:当Service的init()方法执行心跳健康检查任务时,首先会有一个逻辑判断。具体就是根据服务名称进行哈希运算,然后结合集群节点数量进行取模,最终选出一个节点来执行心跳健康检查任务。所以Nacos服务端对服务Service的心跳健康检查任务,在集群架构下,并不是每一台集群机器都会执行这个任务的,而是通过算法选出一台机器来执行,然后再把结果同步给其他集群节点。

问题二:Nacos服务端通过心跳健康检查的定时任务感知服务的健康状态改变时,如何把服务的健康状态同步给其他Nacos集群节点?

答:当Nacos服务端也就是Service的init()方法执行完成心跳健康检查任务后,ServiceManager的init()方法会有一个定时任务,同步检查结果到其他节点。这个定时任务会使用HTTP的方式来进行心跳健康检查结果的同步。这个定时任务执行完,会继续创建一个延迟执行的定时任务继续进行同步。

ServiceManager的init()方法还有一个定时任务用来处理检查结果的同步请求。这个定时任务的设计采用了:内存阻塞队列 + 异步任务的方式。这个定时任务会通过while无限循环一直从阻塞队列获取数据进行处理。

相关文章:

  • Nuxt3还能用吗?
  • Jetpack Compose 响应式布局实战:BoxWithConstraints 完全指南
  • Java IO流核心处理方式详解
  • 【Bootstrap V4系列】学习入门教程之 组件-卡片(Card)
  • 因为gromacs必须安装cuda(系统自带的NVIDIA驱动不行),这里介绍下如何安装cuda
  • SpringMVC——第6章:RESTFul编程风格
  • 51c大模型~合集124
  • 【从零开始学习微服务 | 第一篇】单体项目到微服务拆分实践
  • 深入理解 Bash 中的 $‘...‘ 字符串语法糖
  • DXFViewer进行中 : ->封装OpenGL -> 解析DXF直线
  • Compose 中使用 WebView
  • Unity:输入系统(Input System)与持续检测键盘按键(Input.GetKey)
  • win10开了移动热点,手机无法连接,解决办法(chatgpt版)
  • socket,http
  • 基于python的哈希查表搜索特定文件
  • 查看Ubuntu版本
  • (41)VTK C++开发示例 ---qt使用vtk最小示例
  • 科创大赛——知识点复习【c++】——第一篇
  • Flink流水线任务在线演示
  • 《类和对象(上)》
  • 北美票房|“雷霆”开画票房比“美队4”低,但各方都能接受
  • 贵州黔西市游船倾覆事故发生后,多家保险公司紧急响应
  • 贵州省黔西市发生载人游船侧翻事故
  • 外交部就习近平主席将应邀对俄罗斯进行国事访问并出席纪念苏联伟大卫国战争胜利80周年庆典答问
  • 马克思主义理论研究教学名师系列访谈|金瑶梅:教师需要了解学生的现实发展,把握其思想发展动态
  • “仿佛一场追星粉丝会”,老铺黄金完成国内头部商业中心全覆盖,品牌化后下一步怎么走?