Nacos源码—2.Nacos服务注册发现分析三
大纲
5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
5.服务发现—服务之间的调用请求链路分析
(1)微服务通过Nacos完成服务调用的请求流程
(2)Nacos客户端进行服务发现的源码
(3)Nacos服务端进行服务查询的源码
(4)总结
(1)微服务通过Nacos完成服务调用的请求流程
按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:
步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。
步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。
步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。
(2)Nacos客户端进行服务发现的源码
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
package com.netflix.loadbalancer;import java.util.List;//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {public List<T> getInitialListOfServers();//Return updated list of servers. This is called say every 30 secspublic List<T> getUpdatedListOfServers();
}
当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {......
}public class NacosServerList extends AbstractServerList<NacosServer> {private NacosDiscoveryProperties discoveryProperties;private String serviceId;public NacosServerList(NacosDiscoveryProperties discoveryProperties) {this.discoveryProperties = discoveryProperties;}@Overridepublic List<NacosServer> getInitialListOfServers() {return getServers();}@Overridepublic List<NacosServer> getUpdatedListOfServers() {return getServers();}private List<NacosServer> getServers() {try {//读取分组String group = discoveryProperties.getGroup();//通过服务名称、分组、true(表示只需要健康实例),//调用NacosNamingService.selectInstances()方法来查询服务实例列表List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);//把Instance转换成NacosServer类型return instancesToServerList(instances);} catch (Exception e) {throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);}}private List<NacosServer> instancesToServerList(List<Instance> instances) {List<NacosServer> result = new ArrayList<>();if (CollectionUtils.isEmpty(instances)) {return result;}for (Instance instance : instances) {result.add(new NacosServer(instance));}return result;}public String getServiceId() {return serviceId;}@Overridepublic void initWithNiwsConfig(IClientConfig iClientConfig) {this.serviceId = iClientConfig.getClientName();}
}
NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。
所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。
UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。
public class NacosNamingService implements NamingService {private HostReactor hostReactor;...@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {return selectInstances(serviceName, groupName, healthy, true);}@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);}@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;//这个参数传入默认就是trueif (subscribe) {serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}return selectInstances(serviceInfo, healthy);}...
}public class HostReactor implements Closeable {//服务实例列表的本地缓存private final Map<String, ServiceInfo> serviceInfoMap;private final Map<String, Object> updatingMap;private final NamingProxy serverProxy;private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();private final ScheduledExecutorService executor;...public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}//先查询本地缓存中的服务实例列表ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);//如果本地缓存实例列表为空if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());//调用Nacos服务端的服务实例列表查询接口,立即更新Service数据updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {//hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//开启定时任务,维护本地缓存scheduleUpdateIfAbsent(serviceName, clusters);//最后从本地缓存中,获取服务实例列表数据return serviceInfoMap.get(serviceObj.getKey());}private ServiceInfo getServiceInfo0(String serviceName, String clusters) {String key = ServiceInfo.getKey(serviceName, clusters);//从本地缓存中获取服务实例列表return serviceInfoMap.get(key);}private void updateServiceNow(String serviceName, String clusters) {try {updateService(serviceName, clusters);} catch (NacosException e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);}}//Update service now.public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {//调用Nacos服务端的服务实例查询接口String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);//如果结果不为空,则更新本地缓存if (StringUtils.isNotEmpty(result)) {//更新本地缓存processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}...public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}//向调度线程池提交一个延迟执行的任务ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}public synchronized ScheduledFuture<?> addTask(UpdateTask task) {//向调度线程池提交一个延迟执行的任务return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private final String clusters; private final String serviceName;private int failCount = 0;public UpdateTask(String serviceName, String clusters) {this.serviceName = serviceName;this.clusters = clusters;}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));//如果本地缓存为空if (serviceObj == null) {updateService(serviceName, clusters);return;}//lastRefTime是最大的Long型if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {refreshOnly(serviceName, clusters);}lastRefTime = serviceObj.getLastRefTime();if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime = serviceObj.getCacheMillis();resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);} finally {//向调度线程池继续提交一个延迟执行的任务继续同步本地缓存executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}}}
}public class NamingProxy implements Closeable {...//向Nacos服务端发起HTTP形式的服务实例列表查询请求public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));//通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);}...
}
(3)Nacos服务端进行服务实例查询的源码
由于Nacos客户端向服务端发起查询服务实例列表的请求时,调用的是HTTP下的"/nacos/v1/ns/instance/list"接口,所以Nacos服务端处理该请求的入口是InstanceController的list()方法。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {...//Get all instance of input service.@GetMapping("/list")@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception {//获取请求参数String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));//查询实例return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);}//Get service full information with instances.public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {...//从serviceManager的内存注册表中获取服务Service对象Service service = serviceManager.getService(namespaceId, serviceName);...//从Service对象中获取服务实例列表srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));...}...
}//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model,
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...//Get all instance from input clusters.public List<Instance> srvIPs(List<String> clusters) {if (CollectionUtils.isEmpty(clusters)) {clusters = new ArrayList<>();clusters.addAll(clusterMap.keySet());}//拿到需要查询的集群对象return allIPs(clusters);}//Get all instance from input clusters.public List<Instance> allIPs(List<String> clusters) {List<Instance> result = new ArrayList<>();//遍历集群对象for (String cluster : clusters) {Cluster clusterObj = clusterMap.get(cluster);if (clusterObj == null) {continue;}//获取cluster对象中所有的Instance实例result.addAll(clusterObj.allIPs());}return result;}...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();...public List<Instance> allIPs() {//返回持久化实例、临时实例List<Instance> allInstances = new ArrayList<>();allInstances.addAll(persistentInstances);allInstances.addAll(ephemeralInstances);return allInstances;}...
}
(4)总结
一.微服务之间进行调用时获取微服务列表的流程
每一个客户端本地都会缓存微服务列表。在客户端发起请求前,会通过微服务名称找到对应的微服务列表,最终选举一台被调用的实例对象,进行HTTP调用。而且本地缓存列表会有一个定时任务,及时对微服务列表进行更新。
二.Nacos客户端进行服务发现的源码
首先Nacos客户端指引入了nacos-discovery + nacos-client依赖的项目,其中nacos-discovery会整合Ribbon。
Nacos客户端在微服务调用前,会向Nacos服务端发起服务列表查询请求,然后把请求结果缓存本地,同时会不断开启延迟执行任务维护本地缓存。而Nacos服务端中的查询服务实例列表的接口,会从内存注册表中获取数据。
6.服务端如何维护不健康的微服务实例
(1)Nacos服务管理的心跳机制
(2)服务端处理心跳请求的源码
(3)服务端定时检查心跳是否健康的源码
(4)Nacos维护微服务实例的健康状态总结
(1)Nacos服务管理的心跳机制
Nacos客户端发起服务实例注册时,会开启一个发送心跳任务。该任务会每隔5s调用一次服务端的实例心跳接口,告诉服务端它还活着。服务端接收到实例心跳接口的请求后,先通过IP + Port找到对应Instance。然后把Instance对象的lastBeat属性修改成当前最新的时间,再返回响应。
当服务端接收到客户端的服务注册请求时,也会开启一个健康检查任务。这个任务就是专门用来判断Instance状态是否可用的,也就是对比每一个Instance的lastBeat属性和当前时间。如果lastBeat超过当前时间15s,表示实例状态不健康。如果lastBeat超过当前时间30s,Nacos则会自动把该实例进行删除。
(2)服务端处理心跳请求的源码
一.客户端发送心跳请求的源码
二.服务端处理心跳请求的源码
三.服务端处理心跳请求总结
一.客户端发送心跳请求的源码
调用NacosNamingService的registerInstance()方法注册服务实例时,在调用NamingProxy的registerService()方法来注册服务实例之前,会根据注册的服务实例是临时实例来构建和添加心跳信息到beatReactor,也就是调用BeatReactor的buildBeatInfo()和addBeatInfo()方法。
在BeatReactor的buildBeatInfo()方法中,会通过BeatInfo的setPeriod()方法设置心跳间隔时间,默认是5秒。
在BeatReactor的addBeatInfo()方法中,倒数第二行会开启一个延时执行的任务。执行的任务是根据心跳信息BeatInfo封装的BeatTask。该BeatTask任务会交给BeatReactor的ScheduledExecutorService来执行,并通过BeatInfo的getPeriod()方法获取延时执行的时间为5秒。
在BeatTask的run()方法中,就会调用NamingProxy的sendBeat()方法发送心跳请求给Nacos服务端,也就是调用NamingProxy的reqApi()方法向Nacos服务端发起心跳请求。如果返回的心跳响应表明服务实例不存在,则重新发起服务实例注册请求。无论心跳响应如何,继续根据心跳信息BeatInfo封装一个BeatTask任务,然后将该任务交给线程池ScheduledExecutorService来延时5秒执行。
由此可见,在客户端在发起服务注册期间,会开启一个心跳健康检查的延时任务,这个任务每间隔5s执行一次。任务内容就是通过HTTP请求调用发送Nacos提供的服务实例心跳接口。
public class NacosServiceRegistry implements ServiceRegistry<Registration> {private final NacosDiscoveryProperties nacosDiscoveryProperties;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;}@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}NamingService namingService = namingService();//服务名称String serviceId = registration.getServiceId();//服务分组String group = nacosDiscoveryProperties.getGroup();//服务实例,包含了IP、Port等信息Instance instance = getNacosInstanceFromRegistration(registration);try {//调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());} catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);rethrowRuntimeException(e);}}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}private Instance getNacosInstanceFromRegistration(Registration registration) {Instance instance = new Instance();instance.setIp(registration.getHost());instance.setPort(registration.getPort());instance.setWeight(nacosDiscoveryProperties.getWeight());instance.setClusterName(nacosDiscoveryProperties.getClusterName());instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());instance.setMetadata(registration.getMetadata());instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());return instance;}...
}public class NacosNamingService implements NamingService {private BeatReactor beatReactor;private NamingProxy serverProxy;...@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//获取分组服务名字String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//判定要注册的服务实例是否是临时实例if (instance.isEphemeral()) {//如果是临时实例,则构建心跳信息BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);//添加心跳信息beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//接下来调用NamingProxy的注册方法registerService()来注册服务实例serverProxy.registerService(groupedServiceName, groupName, instance);}...
}public class BeatReactor implements Closeable {...//Build new beat information.public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);//getInstanceHeartBeatInterval()的返回值是5000beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;}...
}@JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {...public long getInstanceHeartBeatInterval() {//Constants.DEFAULT_HEART_BEAT_INTERVAL,默认是5000return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);}...
}public class BeatReactor implements Closeable {private final ScheduledExecutorService executorService;private final NamingProxy serverProxy;public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();public BeatReactor(NamingProxy serverProxy) {this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);}public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}...//Add beat information.public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//开启一个延时执行的任务,执行的任务是BeatTaskexecutorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}...class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {//判断是否需要停止if (beatInfo.isStopped()) {return;}//获取下一次执行的时间,同样还是5slong nextTime = beatInfo.getPeriod();try {//调用NamingProxy.sendBeat()方法发送心跳请求给Nacos服务端JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}//获取Nacos服务端返回的code状态码int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}//如果code = RESOURCE_NOT_FOUND,没有找到资源,那么表示之前注册的信息,已经被Nacos服务端移除了if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {//然后重新组装参数,重新发起注册请求Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try { //调用NamingProxy.registerService()方法发送服务实例注册请求到Nacos服务端serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());}//把beatInfo又重新放入延迟任务当中,并且还是5秒,所以一直是个循环的状态executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}
}public class NamingProxy implements Closeable {...//Send beat.public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);}...
}
二.服务端处理心跳请求的源码
服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。
如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。
在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。
//Instance operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {...//Create a beat for instance.@CanDistro@PutMapping("/beat")@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);//获取请求参数、namespaceId、serviceNameRsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);//通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);//如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance()if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());//重新注册服务实例serviceManager.registerInstance(namespaceId, serviceName, instance);}//从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}//提交客户端服务实例的心跳健康检查任务,更改lastBeat属性service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;}...
}@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {...public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//立即执行HealthCheckReactor.scheduleNow(clientBeatProcessor);}...
}//Health check reactor.
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class HealthCheckReactor {...//Schedule client beat check task without a delay.public static ScheduledFuture<?> scheduleNow(Runnable task) {//提交任务到线程池立即执行return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);}...
}public class GlobalExecutor {public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ? 1 : Runtime.getRuntime().availableProcessors() / 2;//线程池的线程数是可用线程的一半private static final ScheduledExecutorService NAMING_HEALTH_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), DEFAULT_THREAD_COUNT, new NameThreadFactory("com.alibaba.nacos.naming.health"));...public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long delay, TimeUnit unit) {return NAMING_HEALTH_EXECUTOR.schedule(command, delay, unit);}...
}//Thread to update ephemeral instance triggered by client beat.
public class ClientBeatProcessor implements Runnable {private RsInfo rsInfo;private Service service;...@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}//获取ip、clusterNameString ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);//获取当前cluster下的所有临时实例List<Instance> instances = cluster.allIPs(true);//遍历临时实例for (Instance instance : instances) {//判断ip、port,只操作当前发送心跳检查的instance实例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//把instance实例的最后心跳时间修改为当前时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {//如果instance实例之前的状态是不健康if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}...
}
三.服务端处理心跳请求总结
首先通过请求参数,在ServiceManager的内存注册表中找Instance对象。如果找不到对应的Instance对象,那么会重新进行服务注册。如果找到对应的Instance对象,则继续从ServiceManager的内存注册表中找出对应的Service对象,然后通过Service对象提交一个ClientBeatProcessor异步任务。
在这个异步任务中,会找到相同集群下的所有临时实例。然后通过for循环,并根据IP + Port来找到对应的Instance实例对象。接着修改Instance实例对象的lastBeat属性为当前时间,并且判断Instance实例对象是否健康,如果不健康则重新标记为健康状态。
对于健康的客户端实例,每5s会定时发送实例心跳请求。对于不健康的客户端实例,则不会每5s发送实例心跳请求。所以对于不健康的服务实例,Nacos是如何感知和处理的?
(3)服务端定时检查心跳是否健康的源码
一.Service服务被创建时的处理流程
二.异步任务ClientBeatCheckTask的run()方法的核心逻辑
一.Service服务被创建时的处理流程
ServiceManager的registerInstance()方法处理服务注册请求时,会调用ServiceManager的createEmptyService()方法看是否需要创建服务。
在ServiceManager的createEmptyService()方法中,如果需要创建一个新的服务Service,则会先new一个Service对象,然后调用ServiceManager的putServiceAndInit()方法。
ServiceManager的putServiceAndInit()方法会将新的Service放入注册表,然后调用Service的init()方法提交一个异步任务ClientBeatCheckTask到线程池,其中线程池的线程数是可用线程数的一半。
//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();...//Register an instance to a service in AP mode.//This method creates service or cluster silently if they don't exist.public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.创建一个空的服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3.添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...//1.创建一个空服务public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}//Create service if not exist.public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));//now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}private void putServiceAndInit(Service service) throws NacosException {//把Service放入注册表serviceMap中putService(service);service.init();//把Service作为监听器添加到consistencyService的listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//Put service into manager.public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}...
}@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {@JsonIgnoreprivate ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);...public void init() {//提交一个clientBeatCheckTask延时任务,每5s执行一次HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}}...
}public class HealthCheckReactor {private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();...//Schedule client beat check task with a delay.public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));}...
}public class GlobalExecutor {public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ? 1 : Runtime.getRuntime().availableProcessors() / 2;//线程池的线程数是可用线程的一半private static final ScheduledExecutorService NAMING_HEALTH_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), DEFAULT_THREAD_COUNT,new NameThreadFactory("com.alibaba.nacos.naming.health"));...public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long initialDelay, long delay, TimeUnit unit) {//以相对固定的频率来执行某项任务,即只有等这一次任务执行完了(不管执行了多长时间),才能执行下一次任务return NAMING_HEALTH_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);}...
}public final class ExecutorFactory {...public static final class Managed {private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();...//Create a new scheduled executor service with input thread factory and register to manager.public static ScheduledExecutorService newScheduledExecutorService(final String group, final int nThreads, final ThreadFactory threadFactory) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);return executorService;}}...
}
二.异步任务ClientBeatCheckTask的run()方法的核心逻辑
ClientBeatCheckTask的run()方法的作用就是进行服务实例的健康检查。即检查哪些客户端服务实例是不健康的,如果不健康就对它进行处理。
第一个循环的主要作用是:找出哪些Instance服务实例是不健康的。如果不健康就需要把Instance实例的healthy属性更改为false,而判断不健康的依据就是Instance实例的lastBeat属性。如果是健康的,则客户端每5s会发送一次心跳请求更新lastBeat属性。如果是不健康的,那么lastBeat属性是不会变化的。一旦超过15s还没变化,这个Instance就会被定时任务标记为不健康。
第二个循环的主要作用是:找出哪些Instance是可以删除的。Instance服务实例可以被删除的依据还是lastBeat属性,一旦超过30s没更新lastBeat属性,定时任务则会把该Instance删除掉。
//Check and update statues of ephemeral instances, remove them if they have been expired.
public class ClientBeatCheckTask implements Runnable { private Service service;public ClientBeatCheckTask(Service service) {this.service = service;}...@Overridepublic void run() {try {//获取全部的临时实例List<Instance> instances = service.allIPs(true);//遍历每一个临时实例for (Instance instance : instances) {//判断:当前时间 - 实例最后心跳时间 > 心跳超时时间//instance.getInstanceHeartBeatTimeOut()取常量=15sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {//marked默认为false,所以这个if成立if (!instance.isMarked()) {//如果这个instance还是健康的状态if (instance.isHealthy()) {//最终就改成不健康状态instance.setHealthy(false);//事件发布监听事件,通过udp协议来发送通知getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}//又一次遍历全部的临时实例for (Instance instance : instances) {if (instance.isMarked()) {continue;}//判断:当前时间 - 最后一次心跳时间 > 心跳删除时间//instance.getIpDeleteTimeout()取常量=30sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {//直接把对应的instance从注册表中删除deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}...
}
(4)Nacos维护微服务实例的健康状态总结
Nacos客户端会有一个心跳任务,每隔5s会给Nacos服务端发送心跳,Nacos服务端会根据心跳时间修改对应Instance实例的lastBeat属性。
并且Nacos服务端在注册一个服务实例时,会按Service服务维度提交一个心跳健康检查任务给线程池定时执行。把超过15s没有心跳的Instance微服务实例设置为不健康状态,把超过30s没有心跳的Instance微服务实例直接从注册表中删除。