Nacos源码—6.Nacos升级gRPC分析一
大纲
1.Nacos 2.x版本的一些变化
2.客户端升级gRPC发起服务注册
3.服务端进行服务注册时的处理
4.客户端服务发现和服务端处理服务订阅的源码分析
1.Nacos 2.x版本的一些变化
变化一:客户端和服务端的交互方式由HTTP升级为gRPC
Nacos 1.x服务端会提供一系列的HTTP接口供客户端请求调用,Nacos 2.x服务端会定义一些列Handler处理类来处理客户端的gRPC请求。
Nacos 1.x进行服务注册时使用的是HTTP短连接,Nacos 2.x进行服务注册时使用的是RPC长连接。
变化二:注册中心的注册表由双重Map结构变为轻量的一个Map
Nacos 1.x版本中的注册中心是使用双重Map结构来存储注册表的,这样使得注册表还是比较重量级的,在并发高时也需要考虑并发冲突。
Nacos 2.x版本则把注册表轻量化了,服务端在处理服务注册时,只是简单地往一个Map记写入客户端连接的ID。
变化三:大量使用了事件驱动
Nacos 2.x版本的Nacos里,使用了非常多的事件驱动。比如服务注册、服务销毁、服务变更等都先通过通知中心来发布一个事件,然后通过处理事件来来处理后续的逻辑流程。
2.客户端升级gRPC发起服务注册
(1)客户端和服务端的版本选择
(2)Nacos客户端项目启动时自动触发服务实例注册
(3)Nacos客户端通过gRPC方式发起服务实例注册
(4)总结
(1)客户端和服务端的版本选择
<!-- 使用的是Nacos 1.4.1 -->
<!-- <properties><java.version>1.8</java.version><spring-cloud.version>Hoxton.SR8</spring-cloud.version><spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties> --><!-- 使用的是Nacos 2.1.0 -->
<properties><java.version>1.8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version><spring-cloud-alibaba.version>2.2.8.RELEASE</spring-cloud-alibaba.version>
</properties>
(2)Nacos客户端项目启动时自动触发服务实例注册
spring-cloud-starter-alibaba-nacos-discovery依赖包会自动注册服务。查看这个依赖包中的spring.factories文件,会指定一些Configuration类。Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。
在spring.pactories文件中,与注册相关的类就是:NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。
Nacos服务注册自动配置类NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean:
第一个Bean:NacosServiceRegistry
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
第二个Bean:NacosRegistration
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
第三个Bean:NacosAutoServiceRegistration
这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean,然后该Bean继承了AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。
调用AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用AbstractAutoServiceRegistration的bind()方法,然后调用AbstractAutoServiceRegistration的start()方法,接着调用AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。
而AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类NacosServiceRegistryAutoConfiguration,创建第三个Bean—NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);}
}public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {...private NacosRegistration registration;public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {super(serviceRegistry, autoServiceRegistrationProperties);this.registration = registration;}...
}public abstract class AbstractAutoServiceRegistration<R extends Registration>implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {...private final ServiceRegistry<R> serviceRegistry;private AutoServiceRegistrationProperties properties;protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {this.serviceRegistry = serviceRegistry;this.properties = properties;}...@Override@SuppressWarnings("deprecation")public void onApplicationEvent(WebServerInitializedEvent event) {bind(event);}public void bind(WebServerInitializedEvent event) {ApplicationContext context = event.getApplicationContext();if (context instanceof ConfigurableWebServerApplicationContext) {if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {return;}}this.port.compareAndSet(0, event.getWebServer().getPort());this.start();}public void start() {if (!isEnabled()) {if (logger.isDebugEnabled()) {logger.debug("Discovery Lifecycle disabled. Not starting");}return;}if (!this.running.get()) {this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));//发起注册register();if (shouldRegisterManagement()) {registerManagement();}this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));this.running.compareAndSet(false, true);}}protected void register() {//调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法this.serviceRegistry.register(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {private final NacosDiscoveryProperties nacosDiscoveryProperties;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;}@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}NamingService namingService = namingService();//获取服务ID、分组String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();//创建Instance对象Instance instance = getNacosInstanceFromRegistration(registration);try {//发起服务实例注册namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());} catch (Exception e) {if (nacosDiscoveryProperties.isFailFast()) {log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);rethrowRuntimeException(e);} else {log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e);}}}...
}
总结:
Nacos客户端项目启动时自动触发注册服务实例的流程:Spring监听器调用onApplicationEvent() -> bind() -> start() -> register(),最后register()方法会调用serviceRegistry属性的register()方法进行注册。
整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性,在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。
(3)Nacos客户端通过gRPC方式发起服务实例注册
nacos-client会提供接口给Nacos客户端调用来进行服务实例注册。
在NacosNamingService提供的服务注册接口registerInstance()中,会调用NamingClientProxyDelegate的registerService()方法来注册服务。此时会先调用NamingClientProxyDelegate的getExecuteClientProxy()方法,来判断要注册的服务实例是否为临时实例来获取gRPC代理还是HTTP代理。如果注册的是临时实例,则使用gRPC方式注册,否则用HTTP方式注册,然后再调用NamingGrpcClientProxy的registerService()方法注册服务。
在NamingGrpcClientProxy的registerService()方法中,则会调用NamingGrpcClientProxy的doRegisterService()方法执行注册。此时先根据要注册的服务信息创建一个InstanceRequest请求参数对象,然后调用NamingGrpcClientProxy的requestToServer()方法发出请求,也就是通过调用RpcClient的request()方法向Nacos服务端发出gRPC请求。
至于RpcClient的request()方法的底层实现,则是通过一个本地存根代理类grpcFutureServiceStub调用gRPC的接口,来实现向Nacos服务端发起RPC调用的。
//Nacos Naming Service.
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {...private NamingClientProxy clientProxy;private void init(Properties properties) throws NacosException {...//初始化clientProxy属性为NamingClientProxyDelegate对象实例this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}//register a instance to service with specified instance properties.@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法clientProxy.registerService(serviceName, groupName, instance);}...
}//Delegate of naming client proxy.
public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}...@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}...
}//Naming grpc client proxy.
public class NamingGrpcClientProxy extends AbstractNamingClientProxy {private final String namespaceId;private final String uuid;private final Long requestTimeout;private final RpcClient rpcClient;private final NamingGrpcRedoService redoService;public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {super(securityProxy);this.namespaceId = namespaceId;this.uuid = UUID.randomUUID().toString();this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));Map<String, String> labels = new HashMap<String, String>();labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);//通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);start(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));//调用RpcClient.start()方法建立与服务端的连接rpcClient.start();NotifyCenter.registerSubscriber(this);}...//Register a instance to service with specified instance properties.@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);redoService.cacheInstanceForRedo(serviceName, groupName, instance);//执行服务实例的注册doRegisterService(serviceName, groupName, instance);}//Execute register operation.public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {//创建请求参数对象InstanceRequestInstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);//向服务端发起请求requestToServer(request, Response.class);redoService.instanceRegistered(serviceName, groupName);}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//实际会调用RpcClient.request()方法发起gRPC请求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}//abstract remote client to connect to server.
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class RpcClient implements Closeable {protected volatile Connection currentConnection;...public final void start() throws NacosException {...// connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = RETRY_TIMES;while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;ServerInfo serverInfo = nextRpcServer();LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);//调用GrpcClient.connectToServer()方法建立和服务端的长连接connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);}}if (connectToServer != null) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());//currentConnection其实就是一个用于客户端的GrpcConnection对象实例this.currentConnection = connectToServer;rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {switchServerAsync();}...}//connect to server.public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception;...//send request.public Response request(Request request) throws NacosException {return request(request, DEFAULT_TIMEOUT_MILLS);}//send request.public Response request(Request request, long timeoutMills) throws NacosException {int retryTimes = 0;Response response;Exception exceptionThrow = null;long start = System.currentTimeMillis();while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {boolean waitReconnect = false;try {if (this.currentConnection == null || !isRunning()) {waitReconnect = true;throw new NacosException(NacosException.CLIENT_DISCONNECT, "Client not connected, current status:" + rpcClientStatus.get());}//发起gRPC请求,调用GrpcConnection.request()方法response = this.currentConnection.request(request, timeoutMills);if (response == null) {throw new NacosException(SERVER_ERROR, "Unknown Exception.");}if (response instanceof ErrorResponse) {if (response.getErrorCode() == NacosException.UN_REGISTER) {synchronized (this) {waitReconnect = true;if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {LoggerUtils.printIfErrorEnabled(LOGGER, "Connection is unregistered, switch server, connectionId = {}, request = {}", currentConnection.getConnectionId(), request.getClass().getSimpleName());switchServerAsync();}}}throw new NacosException(response.getErrorCode(), response.getMessage());}//return response.lastActiveTimeStamp = System.currentTimeMillis();return response;} catch (Exception e) {if (waitReconnect) {try {//wait client to reconnect.Thread.sleep(Math.min(100, timeoutMills / 3));} catch (Exception exception) {//Do nothing.}}LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes, e.getMessage());exceptionThrow = e;}retryTimes++;}if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {switchServerAsyncOnRequestFail();}if (exceptionThrow != null) {throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow : new NacosException(SERVER_ERROR, exceptionThrow);} else {throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");}}...
}//gRPC connection.
public class GrpcConnection extends Connection {//stub to send request.//grpcFutureServiceStub属性会在NamingGrpcClientProxy初始化 -> 调用RpcClient.start() -> GrpcClient.connectToServer()时,//通过GrpcConnection.setGrpcFutureServiceStub()方法进行设置protected RequestGrpc.RequestFutureStub grpcFutureServiceStub;...@Overridepublic Response request(Request request, long timeouts) throws NacosException {Payload grpcRequest = GrpcUtils.convert(request);//调用gRPC提供的接口发起请求,属于io.grpc包下的内容了ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);Payload grpcResponse;try {grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, e);}return (Response) GrpcUtils.parse(grpcResponse);}...
}
(4)总结
3.服务端进行服务注册时的处理
(1)服务端处理客户端发起的服务注册请求
(2)服务端对客户端注册事件的处理源码
(3)总结
(1)服务端处理客户端发起的服务注册请求
客户端向服务端发起服务注册时,会先根据要注册的服务信息来创建一个InstanceRequest请求参数对象,再调用NamingGrpcClientProxy的requestToServer()方法向服务端发请求,也就是通过调用RpcClient的request()方法来向服务端发出gRPC请求。
InstanceRequestHandler的handle()方法就是用来处理服务注册请求的,该方法会继续调用InstanceRequestHandler的registerInstance()方法,根据客户端发过来的请求类型来选择是注册服务还是注销服务。
如果客户端发过来的请求类型是注册服务实例,则调用EphemeralClientOperationServiceImpl的registerInstance()方法,在该方法中:
一.首先会调用ServiceManager的getSingleton()方法
根据由请求信息创建的Service对象获取一个已注册的Service对象。
需要注意:在Nacos 1.x中,ServiceManager使用一个双层Map存放服务和命名空间。在Nacos 2.x中,ServiceManager则使用两个Map存放服务和命名空间。
在Nacos 1.x中的双层Map是:
Map<String, Map<String, Service>>;
例如Map(namespace, Map(group::serviceName, Service));在Nacos 2.x中的两个Map是:
ConcurrentHashMap<Service, Service>、
ConcurrentHashMap<String_namespace, Set<Service>>;
二.然后调用ClientManagerDelegate的getClient()方法
根据请求参数中的connectionId来获取一个IpPortBasedClient对象。在执行ClientManagerDelegate的getClient()方法时,会先根据connectionId选出具体的ClientManager实现类,接着再调用比如EphemeralIpPortClientManager的getClient()方法,从EphemeralIpPortClientManager.clients属性中获取一个Client对象。clients属性是一个ConcurrentMap,key是请求参数中的connectionId,value是继承了实现Client接口的AbstractClient的IpPortBasedClient对象。
需要注意:Nacos中的gRPC底层是基于Netty实现的。当客户端和服务端建立长连接后,服务端会生成SocketChannel连接对象,这个SocketChannel连接对象就代表了客户端。Nacos会在这个SocketChannel连接对象的基础上,封装一个Client对象,并且生成一个connectionId将SocketChannel对象与Client对象关联起来。
所以当要进行服务注册的客户端和服务端建立好长连接后,服务端就会为客户端创建一个IpPortBasedClient对象,并将该对象存放在EphemeralIpPortClientManager.clients属性里。
三.接着调用ClientOperationService的getPublishInfo()方法
将请求中的instance实例信息封装为InstancePublishInfo对象。
四.然后调用IpPortBasedClient对象的addServiceInstance()方法
往IpPortBasedClient对象里添加Service对象 -> InstancePublishInfo对象。
由于IpPortBasedClient继承自实现了Client接口的AbstractClient抽象类,所以实际是调用AbstractClient的addServiceInstance()方法添加服务实例。
在AbstractClient抽象类中,有一个名为publishers的属性。它是一个ConcurrentHashMap,用于存储客户端服务注册请求中的Instance信息。就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务,当然这些信息已经被封装为InstancePublishInfo对象了。所以AbstractClient.publishers属性的key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象。
在AbstractClient的addServiceInstance()方法中,首先往publishers放入这次要注册的Service对象和客户端Instance实例,然后发布客户端改变事件ClientChangedEvent,用来同步集群间的数据。
五.最后发布客户端注册服务实例事件和服务实例元数据事件
客户端注册服务实例事件是ClientRegisterServiceEvent,服务实例元数据事件是InstanceMetadataEvent。
//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);}
}//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//1.从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//2.从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//3.将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//4.往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//5.发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//5.发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}public interface ClientOperationService {... //get publish info.default InstancePublishInfo getPublishInfo(Instance instance) {InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());Map<String, Object> extendDatum = result.getExtendDatum();if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {extendDatum.putAll(instance.getMetadata());}if (StringUtils.isNotEmpty(instance.getInstanceId())) {extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());}if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());}if (!instance.isEnabled()) {extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());}String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME : instance.getClusterName();result.setHealthy(instance.isHealthy());result.setCluster(clusterName);return result;}
}//Nacos service manager for v2.
public class ServiceManager {private static final ServiceManager INSTANCE = new ServiceManager();//key是根据请求参数创建的Service对象,value是已经注册的Service对象private final ConcurrentHashMap<Service, Service> singletonRepository;//key是命名空间,value是相同命名空间的已注册的Service对象集合private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;private ServiceManager() {singletonRepository = new ConcurrentHashMap<>(1 << 10);namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);}public static ServiceManager getInstance() {return INSTANCE;}public Set<Service> getSingletons(String namespace) {return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));}//Get singleton service. Put to manager if no singleton.//@param service new service//@return if service is exist, return exist service, otherwise return new servicepublic Service getSingleton(Service service) {//往singletonRepository这个ConcurrentHashMap中添加一个Service对象,如果存在就不添加singletonRepository.putIfAbsent(service, service);//从这个ConcurrentHashMap中把已注册的Service对象取出来Service result = singletonRepository.get(service);//将已注册的Service对象添加到namespaceSingletonMaps中//由于namespaceSingletonMaps会按命名空间对已注册的Service对象进行分类//所以namespaceSingletonMaps的key是命名空间,value是相同命名空间的已注册的Service对象集合namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());namespaceSingletonMaps.get(result.getNamespace()).add(result);return result;}...
}//Client manager delegate.
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {private final ConnectionBasedClientManager connectionBasedClientManager;private final EphemeralIpPortClientManager ephemeralIpPortClientManager; private final PersistentIpPortClientManager persistentIpPortClientManager;public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager, EphemeralIpPortClientManager ephemeralIpPortClientManager, PersistentIpPortClientManager persistentIpPortClientManager) {this.connectionBasedClientManager = connectionBasedClientManager;this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;this.persistentIpPortClientManager = persistentIpPortClientManager;}...@Overridepublic Client getClient(String clientId) {//通过请求参数中的connectionId获取一个Client对象,比如调用EphemeralIpPortClientManager.getClient()方法return getClientManagerById(clientId).getClient(clientId);}private ClientManager getClientManagerById(String clientId) {if (isConnectionBasedClient(clientId)) {return connectionBasedClientManager;}return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;}...
}//The manager of {@code IpPortBasedClient} and ephemeral.
@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();private final DistroMapper distroMapper;private final ClientFactory<IpPortBasedClient> clientFactory;public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {this.distroMapper = distroMapper;GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);}@Overridepublic boolean clientConnected(String clientId, ClientAttributes attributes) {return clientConnected(clientFactory.newClient(clientId, attributes));}@Overridepublic boolean clientConnected(final Client client) {clients.computeIfAbsent(client.getClientId(), s -> {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;ipPortBasedClient.init();return ipPortBasedClient;});return true;}@Overridepublic boolean syncClientConnected(String clientId, ClientAttributes attributes) {return clientConnected(clientFactory.newSyncedClient(clientId, attributes));}@Overridepublic Client getClient(String clientId) {//客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clientsreturn clients.get(clientId);}...
}public class IpPortBasedClient extends AbstractClient {......
}public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务;//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//服务注册时,如果是第一次put进去Service对象,会返回nullif (null == publishers.put(service, instancePublishInfo)) {//监视器记录MetricsMonitor.incrementInstanceCount();}//发布客户端改变事件ClientChangedEvent,用于处理集群间的数据同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}...
}
(2)服务端对客户端注册事件的处理源码
一.服务端处理客户端的服务注册请求时的主要工作
往三个ConcurrentHashMap里放入内容 + 发布三个事件。
三个ConcurrentHashMap分别是:第一个是ServiceManager中的,key是根据请求参数创建的Service对象,value是已经注册的Service对象。第二个是ServiceManager中的,key是命名空间,value是相同命名空间的已注册的Service对象集合。第三个是AbstractClient中的,key是注册的Service服务对象,value是根据请求中的Instance实例封装的InstancePublishInfo对象。
三个事件分别是:第一个是在AbstractClient的addServiceInstance()方法中,发布的ClientChangedEvent客户端改变事件。第二个是在EphemeralClientOperationService的registerInstance()方法中,发布的ClientRegisterServiceEvent客户端注册事件。第三个是在EphemeralClientOperationService的registerInstance()方法中,发布的InstanceMetadataEvent服务实例元数据事件。
二.服务端对客户端注册事件ClientRegisterServiceEvent的处理
当执行EphemeralClientOperationService的registerInstance()方法,发布一个ClientRegisterServiceEvent客户端注册事件时,便会触发执行ClientServiceIndexesManager的onEvent()方法,然后执行ClientServiceIndexesManager的handleClientOperation()方法,最终调用ClientServiceIndexesManager的addPublisherIndexes()方法。
其中addPublisherIndexes()方法会把clientId放入到publisherIndexes中。publisherIndexes是一个ConcurrentMap,它的key是要注册的服务实例所属的服务Service对象,它的value是某服务Service对象下的所有clientId即connectionId。由于connectionId代表了一个Client对象,也就是一个客户端,所以publisherIndexes的value可理解为服务Service下的所有客户端实例。
//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();...//可以处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addPublisherIndexes(Service service, String clientId) {//判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSetpublisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());//把clientId放入到对应的Service中publisherIndexes.get(service).add(clientId);//发布服务改变事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}...
}
(3)总结
服务端处理服务实例注册时,会使用多个Map来存储微服务实例的信息。在注册表也就是ClientServiceIndexesManager.publisherIndexes属性中,只是简单记录每个Service服务对象下包含的clientId字符串集合。通过clientId可以在clients属性中获取到IpPortBasedClient对象,IpPortBasedClient的父类AbstractClient会存储对应的Instance实例信息,所以这样的注册表是可记录每个Service服务包含的所有Instance实例的。