当前位置: 首页 > news >正文

Nacos源码—7.Nacos升级gRPC分析三

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

5.服务变动时如何通知订阅的客户端

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

(2)延迟任务的执行引擎源码

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

一.服务注册

Nacos客户端注册服务实例时,Nacos服务端会发布ClientRegisterServiceEvent客户端注册服务实例事件。Nacos服务端在处理客户端注册服务实例事件时,会把clientId写入到注册表,然后接着发布ServiceChangedEvent服务改变事件。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);}
}//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();...//处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addPublisherIndexes(Service service, String clientId) {//判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSetpublisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());//把clientId放入到对应的Service中publisherIndexes.get(service).add(clientId);//发布服务改变事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}...
}

二.服务订阅

客户端查询微服务实例列表进行服务发现时,调用的是订阅接口。服务端处理客户端的订阅请求时会发布ClientSubscribeServiceEvent事件,这个事件的处理逻辑是先向订阅表添加clientId到所订阅服务对应的集合中,如果第一次添加clientId则发布一个ServiceSubscribedEvent服务订阅事件。

//Handler to handle subscribe service.
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {private final ServiceStorage serviceStorage;private final EphemeralClientOperationServiceImpl clientOperationService;...//假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务//也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//构建要查询的Service服务对象,对应的是stock-serivceService service = Service.newService(namespaceId, groupName, serviceName, true);//构建要订阅Service服务的订阅者,对应的是order-serviceSubscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());//1.调用ServiceStorage.getData()方法读取缓存ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);if (request.isSubscribe()) {//2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());} else {clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}
}//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {...//添加订阅者//@param service    service:要查询的Service对象,比如stock-service//@param subscriber subscribe:订阅者,比如对应order-service//@param clientId   id of client:对应order-service与Nacos服务端的连接ID@Overridepublic void subscribeService(Service service, Subscriber subscriber, String clientId) {//传入的service是要查询的Service对象,比如stock-serviceService singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);//传入的clientId是代表着order-service的Client对象,调用EphemeralIpPortClientManager.getClient()方法Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//往代表着order-service的Client对象中,添加订阅者client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();//发布客户端订阅服务事件ClientSubscribeServiceEvent,也就是order-service客户端订阅了service服务NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...//可以处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addSubscriberIndexes(Service service, String clientId) {//传入的service是要查询的Service对象stock-service,clientId是订阅者order-service对应的客户端连接对象IDsubscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());// Fix #5404, Only first time add need notify event. 只有第一次添加时需要发布通知事件if (subscriberIndexes.get(service).add(clientId)) {//发布服务订阅事件ServiceSubscribedEventNotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));}}...
}

(2)延迟任务的执行引擎源码

一.什么是延迟任务执行引擎

延迟任务执行引擎就是可以往执行引擎中添加任务,该任务会被延时执行。Nacos的延迟任务执行引擎就是NacosDelayTaskExecuteEngine类。

Nacos会通过延迟任务执行引擎来处理服务改变事件和服务订阅事件,即ServiceChangedEvent和ServiceSubscribedEvent。

二.延迟任务执行引擎的执行原理

首先,Nacos会定义一个名为NacosTaskProcessor的任务处理器接口。NacosTaskProcessor是一个Interface ,它有很多个实现类。

然后,执行引擎会记录相关的任务处理器实现类。NacosDelayTaskExecuteEngine继承自AbstractNacosTaskExecuteEngine,AbstractNacosTaskExecuteEngine相当于任务执行引擎中心。AbstractNacosTaskExecuteEngine有两个属性来记录这些处理器实现类,并提供了两个方法可以向任务执行引擎中心添加处理器,这两个方法分别是addProcessor()方法和setDefaultTaskProcessor()方法。

接着,创建NacosDelayTaskExecuteEngine时会开启一个定时执行的任务,该定时执行的任务会定时执行ProcessRunnable的run()方法。

延时任务执行引擎有一个Map类型的tasks属性存放所有延迟执行的任务,而在ProcessRunnable的run()方法中,会触发调用其processTasks()方法。processTasks()方法会从tasks属性中获取全部的延迟任务,然后遍历处理。即先通过任务key获取具体的任务,再通过任务key获取对应的处理器,接着调用NacosTaskProcessor的process()方法,来完成延迟任务的执行。

最后,NacosDelayTaskExecuteEngine会提供一个addTask()方法,这个方法可以将延迟执行的任务添加到延时任务执行引擎的tasks属性中。

//Abstract nacos task execute engine. 任务执行引擎中心
public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implements NacosTaskExecuteEngine<T> {private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();private NacosTaskProcessor defaultTaskProcessor;...@Overridepublic void addProcessor(Object key, NacosTaskProcessor taskProcessor) {taskProcessors.putIfAbsent(key, taskProcessor);}@Overridepublic void removeProcessor(Object key) {taskProcessors.remove(key);}@Overridepublic NacosTaskProcessor getProcessor(Object key) {return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;}@Overridepublic Collection<Object> getAllProcessorKey() {return taskProcessors.keySet();}@Overridepublic void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) {this.defaultTaskProcessor = defaultTaskProcessor;}...
}//Nacos delay task execute engine. 延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;//任务池protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;protected final ReentrantLock lock = new ReentrantLock();...public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启延时任务,即启动ProcessRunnable线程任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...   @Overridepublic AbstractDelayTask removeTask(Object key) {lock.lock();try {AbstractDelayTask task = tasks.get(key);if (null != task && task.shouldProcess()) {return tasks.remove(key);} else {return null;}} finally {lock.unlock();}}@Overridepublic Collection<Object> getAllTaskKeys() {Collection<Object> keys = new HashSet<Object>();lock.lock();try {keys.addAll(tasks.keySet());} finally {lock.unlock();}return keys;}@Overridepublic void shutdown() throws NacosException {tasks.clear();processingExecutor.shutdown();}@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任务池中tasks.put(key, newTask);} finally {lock.unlock();}}//process tasks in execute engine.protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通过任务key获取对应的NacosTaskProcessor延迟任务处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failed//调用获取到的NacosTaskProcessor延迟任务处理器的process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}private void retryFailedTask(Object key, AbstractDelayTask task) {task.setLastProcessTime(System.currentTimeMillis());addTask(key, task);}private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

一.服务端处理服务变动和服务订阅事件的入口

二.执行推送的任务PushExecuteTask说明

三.客户端收到服务端发送的Service服务实例数据推送的处理

一.服务端处理服务变动和服务订阅事件的入口

处理入口是:NamingSubscriberServiceV2Impl的onEvent()方法。其中,对事件的处理使用了双层内存队列(存储延迟任务 + 同步任务)的异步处理方式。

onEvent()方法主要会往延迟任务执行引擎中添加任务,也就是首先会根据不同的事件类型构建不同的PushDelayTask任务,然后调用延迟任务执行引擎NacosDelayTaskExecuteEngine的addTask()方法,把PushDelayTask延迟任务添加到PushDelayTaskExecuteEngine的任务池。

创建继承自NacosDelayTaskExecuteEngine的PushDelayTaskExecuteEngine延迟任务执行引擎时会创建一个定时任务,定时从任务池中取出任务,然后调用对应的任务处理器的process()方法。

PushDelayTask任务对应的任务处理器是PushDelayTaskProcessor,所以最终会触发执行PushDelayTaskProcessor的process()方法。

在执行PushDelayTaskProcessor的process()方法时,会调用NamingExecuteTaskDispatcher的dispatchAndExecuteTask()方法,提交由PushDelayTask任务封装的PushExecuteTask任务给NacosExecuteTaskExecuteEngine进行处理,此时会调用NacosExecuteTaskExecuteEngine的addTask()方法添加任务。

其中,PushExecuteTask任务会被分发到NacosExecuteTaskExecuteEngine执行引擎中的一个TaskExecuteWorker处理,TaskExecuteWorker的process()方法会把PushExecuteTask任务放入队列。由于TaskExecuteWorker初始化时会启动一个线程不断从队列中获取任务并执行,所以最终便会执行到PushExecuteTask的run()方法。

//Naming subscriber service for v2.x.
@org.springframework.stereotype.Service
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {private final PushDelayTaskExecuteEngine delayTaskEngine;...@Overridepublic void onEvent(Event event) {if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ServiceEvent.ServiceChangedEvent) {//If service changed, push to all subscribers.//如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;Service service = serviceChangedEvent.getService();//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {//If service is subscribed by one client, only push this client.//如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;Service service = subscribedEvent.getService();//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));}}...
}public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {...private static class PushDelayTaskProcessor implements NacosTaskProcessor {    private final PushDelayTaskExecuteEngine executeEngine;public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {this.executeEngine = executeEngine;}@Overridepublic boolean process(NacosTask task) {//任务类型转换PushDelayTask pushDelayTask = (PushDelayTask) task;//获取要推送的服务;比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;Service service = pushDelayTask.getService();//调用NamingExecuteTaskDispatcher.dispatchAndExecuteTask()方法//提交PushExecuteTask线程任务给NacosExecuteTaskExecuteEngine来处理NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));return true;}}
}public class NamingExecuteTaskDispatcher {private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();private final NacosExecuteTaskExecuteEngine executeEngine;private NamingExecuteTaskDispatcher() {executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);}public static NamingExecuteTaskDispatcher getInstance() {return INSTANCE;}public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {executeEngine.addTask(dispatchTag, task);}public String workersStatus() {return executeEngine.workersStatus();}
}public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去worker.process(task);}...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {//任务存储容器private final BlockingQueue<Runnable> queue;public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}...@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任务放入到阻塞队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任务放入到阻塞队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞队列中的任务Runnable task = queue.take();long begin = System.currentTimeMillis();//调用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

二.执行推送的任务PushExecuteTask说明

在PushExecuteTask的run()方法中,首先会从ServiceStorage获取要推送的服务Service最新的实例数据包装,然后调用PushExecuteTask的getTargetClientIds()方法获取要推送的clientId,接着根据clientId获取订阅了Service服务的的客户端订阅者对象,最后调用PushExecutorDelegate的doPushWithCallback()方法,也就是调用PushExecutorRpcImpl的doPushWithCallback()方法回调客户端,即调用RpcPushService的pushWithCallback()方法回调客户端,即调用GrpcConnection的asyncRequest()方法向客户端发送RPC请求。

执行PushExecuteTask的getTargetClientIds()方法获取要推送的clientId时,会根据PushDelayTask的pushToAll属性来获取对应的clientId。因为在NamingSubscriberServiceV2Impl的onEvent()方法中,如果处理的是服务改变事件,则构造的PushDelayTask是面向所有客户端。如果处理的是服务订阅事件,则构造的PushDelayTask是面向一个客户端。

所以如果PushDelayTask要面向所有客户端推送Service服务实例数据,那么就调用ClientServiceIndexesManager的getAllClientsSubscribeService()方法,从订阅者列表中获取订阅了Service服务的所有clientId。如果PushDelayTask要面向单个客户端推送Service服务实例数据,则通过PushDelayTask的getTargetClients()方法获取对应的clientId即可。

总结:服务变动需要通知全部订阅了该Service服务的客户端对象,服务订阅只需要通知当前订阅者客户端对象即可。

//Nacos naming push execute task.
public class PushExecuteTask extends AbstractExecuteTask {//要推送的Service服务//比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;private final Service service;private final PushDelayTaskExecuteEngine delayTaskEngine;private final PushDelayTask delayTask;public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, PushDelayTask delayTask) {this.service = service;this.delayTaskEngine = delayTaskEngine;this.delayTask = delayTask;}@Overridepublic void run() {try {//从ServiceStorage获取要推送的服务Service最新的实例数据包装PushDataWrapper wrapper = generatePushData();ClientManager clientManager = delayTaskEngine.getClientManager();//遍历订阅了Service服务的、要推送Service服务实例数据的所有clientIdfor (String each : getTargetClientIds()) {//根据clientId获取客户端Client对象Client client = clientManager.getClient(each);if (null == client) {continue;}//调用AbstractClient.getSubscriber()方法//因为AbstractClient对象中存放着它订阅的服务与订阅者对象映射//所以可以根据要推送的Service服务,获取对应的客户端订阅者对象Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);//传入订阅者subscriber,调用PushExecutorDelegate.doPushWithCallback()方法回调客户端delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));}} catch (Exception e) {Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));}}private PushDataWrapper generatePushData() {//调用ServiceStorage.getPushData()方法根据要推送的Service对象,获取包含所有实例信息的ServiceInfo对象ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);return new PushDataWrapper(serviceMetadata, serviceInfo);}private Collection<String> getTargetClientIds() {//通过PushDelayTask的pushToAll属性控制是否对全部订阅了Service服务的客户端Client,进行推送//处理服务改变事件时,delayTask.isPushToAll()就是true//处理服务订阅事件时,delayTask.getTargetClients()就是指定的客户端Client//其中getAllClientsSubscribeService()会从订阅者列表中获取订阅了Service服务的所有clientIdreturn delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service): delayTask.getTargetClients();}...
}public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {private final ClientManager clientManager;private final ClientServiceIndexesManager indexesManager;private final ServiceStorage serviceStorage;private final NamingMetadataManager metadataManager;private final PushExecutor pushExecutor;...
}public class PushDelayTask extends AbstractDelayTask {    private final Service service;private boolean pushToAll;private Set<String> targetClients;//处理服务变动事件,创建PushDelayTask任务时所使用的构造方法public PushDelayTask(Service service, long delay) {this.service = service;pushToAll = true;targetClients = null;setTaskInterval(delay);setLastProcessTime(System.currentTimeMillis());}//处理服务订阅事件,创建PushDelayTask任务时所使用的构造方法public PushDelayTask(Service service, long delay, String targetClient) {this.service = service;this.pushToAll = false;this.targetClients = new HashSet<>(1);//把clientId添加到targetClients中,这个clientId就是发起服务订阅的客户端与服务端建立长连接后的客户端连接IDthis.targetClients.add(targetClient);setTaskInterval(delay);setLastProcessTime(System.currentTimeMillis());}...
}@Component
public class ServiceStorage {//缓存要查询的Service服务对象对应的已注册的服务详情private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;...public ServiceInfo getPushData(Service service) {//调用ServiceStorage.emptyServiceInfo()方法创建空的ServiceInfo对象ServiceInfo result = emptyServiceInfo(service);if (!ServiceManager.getInstance().containSingleton(service)) {return result;}//调用ServiceStorage.getAllInstancesFromIndex()方法获服务取实例列表//ServiceInfo的hosts属性就包含了该服务的所有Instance实例数据result.setHosts(getAllInstancesFromIndex(service));//将获取到的ServiceInfo对象放入到缓存中serviceDataIndexes.put(service, result);return result;}...
}//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...public Collection<String> getAllClientsSubscribeService(Service service) {//从订阅者列表中获取订阅了Service服务的所有clientIdreturn subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : new ConcurrentHashSet<>();}...
}public abstract class AbstractClient implements Client {//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic Subscriber getSubscriber(Service service) {return subscribers.get(service);}@Overridepublic boolean addServiceSubscriber(Service service, Subscriber subscriber) {//服务订阅时,添加订阅者//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)if (null == subscribers.put(service, subscriber)) {MetricsMonitor.incrementSubscribeCount();}return true;}...
}@Component
public class PushExecutorDelegate implements PushExecutor {private final PushExecutorRpcImpl rpcPushExecuteService;private final PushExecutorUdpImpl udpPushExecuteService;...@Overridepublic void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);}private PushExecutor getPushExecuteService(String clientId, Subscriber subscriber) {Optional<SpiPushExecutor> result = SpiImplPushExecutorHolder.getInstance().findPushExecutorSpiImpl(clientId, subscriber);if (result.isPresent()) {return result.get();}return clientId.contains(IpPortBasedClient.ID_DELIMITER) ? udpPushExecuteService : rpcPushExecuteService;}...
}@Component
public class PushExecutorRpcImpl implements PushExecutor {private final RpcPushService pushService;...@Overridepublic void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)), callBack, GlobalExecutor.getCallbackExecutor());}...
}//push response  to clients.
@Service
public class RpcPushService {@Autowiredprivate ConnectionManager connectionManager;...//push response with no ack.public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) {Connection connection = connectionManager.getConnection(connectionId);if (connection != null) {try {//调用GrpcConnection.asyncRequest()方法向客户端发送推送请求connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {@Overridepublic Executor getExecutor() {return executor;}@Overridepublic void onResponse(Response response) {if (response.isSuccess()) {requestCallBack.onSuccess();} else {requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));}} @Overridepublic void onException(Throwable e) {requestCallBack.onFail(e);}});} catch (ConnectionAlreadyClosedException e) {connectionManager.unregister(connectionId);requestCallBack.onSuccess();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("error to send push response to connectionId ={},push response={}", connectionId, request, e);requestCallBack.onFail(e);}} else {requestCallBack.onSuccess();}}...
}

三.客户端收到服务端发送的Service服务实例数据推送的处理

NamingPushRequestHandler的requestReply()方法会处理服务端的推送,即调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。

public class NamingPushRequestHandler implements ServerRequestHandler {private final ServiceInfoHolder serviceInfoHolder;public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {this.serviceInfoHolder = serviceInfoHolder;}@Overridepublic Response requestReply(Request request) {if (request instanceof NotifySubscriberRequest) {//进行类型转换NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;//更新客户端本地缓存数据serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());return new NotifySubscriberResponse();}return null;}
}public class ServiceInfoHolder implements Closeable {private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;...public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}//获取本地缓存中的服务实例ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {return oldService;}//更新本地缓存中的服务实例serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);//判断服务实例是否有改变boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));//发布服务实例改变事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));//将服务实例信息写入本地磁盘DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;}...
}

(4)总结

相关文章:

  • Python程序打包为EXE文件的全面指南
  • 从AI到新能源:猎板PCB的HDI技术如何定义高端制造新标准?
  • RGB矩阵照明系统详解及WS2812配置指南
  • Vue Router 3 使用详解:从零构建嵌套路由页面
  • 多账号管理与自动化中的浏览器指纹对抗方案
  • LSTM的简单模型
  • 22、城堡防御工事——React 19 错误边界与监控
  • Docker Compose 部署 MeiliSearch 指南
  • 【C】初阶数据结构14 -- 归并排序
  • 基于设备指纹识别的反爬虫技术:给设备办 “身份证”
  • vue3 全局注册自定义指令,input聚焦失焦展示对应值
  • NXP iMX8MP ARM 平台多屏幕克隆显示测试
  • kuka, fanuc, abb机器人和移动相机的标定
  • 对golang中CSP的理解
  • 学习记录:DAY28
  • 7.3.隐私合规
  • [春秋云镜] Brute4Road 仿真场景
  • 使用JMETER中的JSON提取器实现接口关联
  • ASP.NET中Tailspin Travel的UI层奥秘分析
  • 电机密集型工厂环境下的无线通信技术选型与优化策略
  • “苏河超级管”调研:桥下公园“留白”很好,指引差点
  • 工程院院士葛世荣获聘任为江西理工大学校长
  • 秦洪看盘|交易型资金收缩,释放短线压力
  • 湖南省职业病防治院通报3岁女童确诊“铊中毒”:去年病例,编辑误写为“近日”
  • 央行:当前我国债券市场定价效率、机构债券投资交易和风险管理能力仍有待提升
  • 美众议院通过法案将“墨西哥湾”更名为“美国湾”