当前位置: 首页 > news >正文

Nacos源码—2.Nacos服务注册发现分析四

大纲

5.服务发现—服务之间的调用请求链路分析

6.服务端如何维护不健康的微服务实例

7.服务下线时涉及的处理

8.服务注册发现总结

7.服务下线时涉及的处理

(1)Nacos客户端服务下线的源码

(2)Nacos服务端处理服务下线的源码

(3)Nacos服务端发送服务变动事件给客户端的源码

(1)Nacos客户端服务下线的源码

Nacos客户端的Spring容器被销毁时,会通知Nacos服务端进行服务下线。首先会触发调用AbstractAutoServiceRegistration的destroy()方法。因为该类实现了Spring监听器,并且该方法被@PreDestroy注解修饰。@PreDestroy注解的作用是:Spring容器销毁时回调被该注解修饰的方法。

然后调用NacosServiceRegistry的deregister()方法 -> NamingService的deregisterInstance()方法 -> NamingProxy的deregisterService()方法,最后调用NamingProxy的reqApi()方法向"/nacos/v1/ns/instance"接口发起删除请求。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosDiscoveryProperties);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);}
}public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {......
}public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {private final ServiceRegistry<R> serviceRegistry;...@PreDestroypublic void destroy() {stop();}public void stop() {if (this.getRunning().compareAndSet(true, false) && isEnabled()) {deregister();if (shouldRegisterManagement()) {deregisterManagement();}this.serviceRegistry.close();}}protected void deregister() {//调用NacosServiceRegistry.deregister()方法this.serviceRegistry.deregister(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {...@Overridepublic void deregister(Registration registration) {...NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();try {//调用NamingService.deregisterInstance()方法namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());} catch (Exception e) {log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e);}log.info("De-registration finished.");}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}...
}//以上是nacos-discovery的,以下是nacos-client的
public class NacosNamingService implements NamingService {private BeatReactor beatReactor;private NamingProxy serverProxy;...@Overridepublic void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setClusterName(clusterName);deregisterInstance(serviceName, groupName, instance);}@Overridepublic void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());}//调用NamingProxy.deregisterService()方法serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);}...
}public class NamingProxy implements Closeable {...public void deregisterService(String serviceName, Instance instance) throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.DELETE);}...
}

(2)Nacos服务端处理服务下线的源码

Nacos服务端处理服务下线的入口是InstanceController的deregister()方法,然后会调用ServiceManager的removeInstance()方法移除注册表里的实例,也就是调用ServiceManager的substractIpAddresses()方法。其中会传入remove参数执行ServiceManager的updateIpAddresses()方法,该方法的返回结果不会包含要删除的实例。

在ServiceManager的updateIpAddresses()方法中,判断入参action如果是remove,那么会把对应的Instance移除掉。但此时并不操作内存注册表,只是在返回的结果中删除对应的Instance实例。然后和注册逻辑一样,也是通过异步任务 + 内存队列的方式,去修改注册表。

//Instance operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {@Autowiredprivate ServiceManager serviceManager;...//Deregister instances.@CanDistro@DeleteMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String deregister(HttpServletRequest request) throws Exception {Instance instance = getIpAddress(request);String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);return "ok";}//移除ServiceManager的注册表里的Instance实例serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);return "ok";}...
}//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//Remove instance from service.public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {Service service = getService(namespaceId, serviceName);synchronized (service) {//移除InstanceremoveInstance(namespaceId, serviceName, ephemeral, service, ips);}}private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {//和注册一样,也是先构建keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//在这个instanceList中,不会包含需要删除的Instance实例了List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);//包装成Instances对象Instances instances = new Instances();instances.setInstanceList(instanceList);//调用和注册一样的逻辑,把instanceList中的Instance,通过写时复制的机制,修改内存注册表consistencyService.put(key, instances);}private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE传的Removereturn updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {//移除Instance实例instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}

(3)Nacos服务端发送服务变动事件给客户端的源码

一.处理服务注册或服务下线时让客户端感知的方案

二.处理服务注册或服务下线时发布服务变动事件

三.监听服务变动事件并通过UDP发送推送给客户端

一.处理服务注册或服务下线时让客户端感知的方案

Nacos客户端进行服务注册或服务下线时,其他Nacos客户端如何感知。

方案一:其他Nacos客户端在服务发现时,会通过定时任务去更新客户端本地缓存,但是这样做会有几秒钟的延迟。

方案二:当Nacos服务端的注册表发生了变动,服务端主动通知客户端。其实Nacos服务端在处理服务注册或服务下线时的最后逻辑是一样的。即在通过写时复制修改完注册表后,服务端会发布一个变动事件。然后通过UDP方式通知每一个客户端,从而让客户端更快感知服务变动。

二.处理服务注册或服务下线时发布服务变动事件

服务注册或服务下线时,都会调用ConsistencyService的put()方法,将本次操作包装成Pair对象放入阻塞队列,然后由异步任务Notifier来处理阻塞队列中的Pair对象。

异步任务Notifier对阻塞队列中的Pair对象进行处理时,会调用Pair对象对应的Service服务的onChange()方法,而Service的onChange()方法又会调用Service的updateIPs()方法。

在Service的updateIPs()方法中:会先调用Cluster的updateIps()方法通过写时复制机制去修改注册表,然后调用PushService的serviceChanged()方法发布服务变动事件。

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);}    if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}public int getTaskSize() {return tasks.size();}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去,会调用Service.onChange()方法listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}
}//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model, 
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...@Overridepublic void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {//Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}//Update instances. 这里的instances里就包含了新注册的实例对象public void updateIPs(Collection<Instance> instances, boolean ephemeral) {//clusterMap表示的是该服务的集群Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}//遍历全部实例对象:包括已经注册过的实例对象 和 新注册的实例对象//这里的作用就是对相同集群下的instance进行分类for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}//判定客户端传过来的instance实例中,是否设置了ClusterNameif (StringUtils.isEmpty(instance.getClusterName())) {//如果否,就设置instance实例的ClusterName为DEFAULTinstance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}//判断之前是否存在对应的CLusterName,如果没有则需要创建新的Cluster对象if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());//创建新的Cluster集群对象Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();//将新创建的Cluster对象放入到集群clusterMap中getClusterMap().put(instance.getClusterName(), cluster);}//根据集群名字,从ipMap里面获取集群下的所有实例List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}//将客户端传过来的新注册的instance实例,添加到clusterIPs,也就是ipMap中clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}//对所有的服务实例分好类之后,按照ClusterName来更新注册表for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//entryIPs已经是根据ClusterName分好组的实例列表了List<Instance> entryIPs = entry.getValue();//调用Cluster.updateIps()方法,根据写时复制,对注册表中的每一个Cluster对象进行更新clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());//使用UDP方式通知Nacos客户端getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();@JsonIgnoreprivate Service service;...//Update instance list.public void updateIps(List<Instance> ips, boolean ephemeral) {//先判定是否是临时实例,然后把对应的实例数据取出来,放入到新创建的toUpdateInstances集合中Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;//将老的实例列表toUpdateInstances复制一份到oldIpMap中HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}...//最后把传入进来的实例列表,重新初始化一个HaseSet,赋值给toUpdateInstancestoUpdateInstances = new HashSet<>(ips);//判断是否是临时实例,将CLuster的persistentInstances或ephemeralInstances替换为toUpdateInstancesif (ephemeral) {//直接把之前的实例列表替换成新的ephemeralInstances = toUpdateInstances;} else {//直接把之前的实例列表替换成新的persistentInstances = toUpdateInstances;}}...
}

三.监听服务变动事件并通过UDP发送通知给客户端

PushService的serviceChanged()方法发布服务变动事件。由于PushService实现了ApplicationListener,所以PushService的onApplicationEvent()方法会收到发布的服务变动事件,然后调用PushService的udpPush()方法通过UDP协议主动通知客户端。

总结:如果Nacos服务端的注册表发生变动,会通过UDP协议主动通知客户端。UDP协议比较轻量化,它无需建立连接就可以发送封装的IP数据包。虽然UDP协议下的传输不可靠,但是不可靠也没关系。因为每个客户端本地还有一个定时任务去更新本地实例列表缓存。

@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {private ApplicationContext applicationContext;private static DatagramSocket udpSocket;private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<>();private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();...//Service changed.public void serviceChanged(Service service) {//merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}//发布服务变动事件this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));}@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");//获取某服务下的所有Nacos客户端ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();//遍历所有客户端for (PushClient client : clients.values()) {...//通过UDP进行通知udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);//通过UDP协议发送消息udpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}}...
}

(4)服务下线的处理总结

8.服务注册发现总结

一.客户端

nacos-discovery利用了Spring的事件监听机制,在Spring容器启动时的调用Nacos服务端提供的服务实例注册接口。在调用服务实例注册接口时,客户端会开启一个异步任务来做发送心跳。

在客户端进行微服务调用时,nacos-discovery会整合Ribbon,然后查询Nacos服务端的服务实例列表来维护本地缓存,从而通过Ribbon实现服务调用时的负载均衡。

在关闭Spring容器时,会触发Nacos客户端销毁的方法,然后调用Nacos服务端的服务下线接口,从而完成服务下线流程。

二.服务端

服务端的核心功能:服务注册、服务查询、服务下线、心跳健康。服务注册的实现要点:异步任务 + 内存阻塞队列、内存注册表、写时复制。

服务端也会开启心跳健康检查的定时任务来检查不健康的实例。如果发现Instance超过15秒没有心跳,则标记为不健康。如果发现Instance超过30秒没有心跳,则会直接删除。

进行服务查询时,是直接从内存注册表中获取Instance列表进行返回。

相关文章:

  • Openharmony4.1 Release——软总线部分错误码表
  • 数据库概论速成期中版
  • MySQL 中的最左前缀法则
  • ISO和 IEC机构的区别
  • 信雅达 AI + 悦数 Graph RAG | 大模型知识管理平台在金融行业的实践
  • Microsoft .NET Framework 3.5 离线安装包 下载
  • 【动手学大模型开发】使用 LLM API:智谱 GLM
  • Python中的defaultdict方法
  • 信息过载(Information Overload):太多的信息导致了信息处理能力的饱和
  • JVM | CMS垃圾收集器详解
  • 基于tabula对pdf中的excel进行识别并转换成word(三)
  • FlexNoC-Latency
  • 进程自动守护,监控并自动重启
  • 完整的 SSL 证书生成与 Spring Boot 配置流程
  • MySQL下载与安装
  • 无人设备遥控器之移动手持定位系统篇
  • qtfaststart使用教程(moov置前)
  • MLOps全链路能力:模型监控、版本回滚与持续训练
  • 2025年- H13-Lc120-189.轮转数组(普通数组)---java版
  • MinIO中mc工具的安装、配置、简单使用
  • 暗蓝评《性别打结》丨拆解性别之结需要几步?
  • 一位排球青训教练的20年时光:努力提高女排球员成才率
  • 一季度我国服务进出口总额19741.8亿元,同比增长8.7%
  • 财政部农业农村司司长吴奇修接受纪律审查和监察调查
  • 中公教育薪酬透视:董监高合计涨薪122万,员工精简近三成
  • 大学2025丨专访南开人工智能学院院长赵新:人工智能未来会变成通识类课程