nacos 服务端与客户端通讯流程实现
服务端处理客户端连接及心跳请求不管是OpenApi Http方式还是RPC方式所实现的功能都是一样的。
这里将会以RPC方式来看下服务端和客户端交互的基本流程。
RPC服务启动
在nacos-core包中定义有GrpcSdkServer其继承自BaseRpcServer,是一个@Service修饰的bean,并且有@PostConstruct注解修饰其start()方法,在创建该bean的时候会调用其start()方法来启动RPC服务。同样服务端口的绑定和http端口有一个1000的偏移量。然后设定RPC的请求处理会交由GrpcRequestAcceptor来处理。
客户端建立连接
客户端与服务端建立连接后会通过ConnectionManager.register()方法存放到ConnectionManager中。
请求处理
GrpcRequestAcceptor 负责处理客户端发来的 RPC 请求,它是 Nacos gRPC 服务器的请求接受和处理核心类。GrpcRequestAcceptor 的 request() 方法是处理请求的关键入口。
客户端请求信息被封装到一个Payload对象中,首先从请求元数据中获取请求的类型,通过ApplicationUtils.isStarted()判断当前RPC服务是否已经启动,如果未启动返回一个错误响应,提示客户端稍后再试。
如果请求类型是 服务器健康检查请求ServerCheckRequest,则响应一个健康检查的结果,告知客户端服务器正在运行。不需要特殊处理。
如果不是健康检测请求,根据请求类型从请求处理器注册表requestHandlerRegistry获取请求处理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
如果没有找到对应的处理器,则记录警告日志并返回错误。
在处理请求之前,还会检查请求的连接 ID 是否有效。如果连接 ID 无效,则返回连接未注册的错误信息。
boolean requestValid = connectionManager.checkValid(connectionId);
connectionManager存有一个Map<String, Connection> connections
保存所有的客户端连接。
再往下会进行请求内容解析,如果解析失败,捕获异常并返回错误响应。请求内容封装到Request中。
parseObj = GrpcUtils.parse(grpcRequest);
最后使用对应的requestHandler来出去请求,生成响应返回到客户端。
常见的请求类型处理
能处理的请求类型 | RequestHandler | 描述 |
---|---|---|
ServerCheckRequest | 处理服务器健康检查请求,返回服务器是否正常运行的信息。 | |
ConfigQueryRequest | ConfigQueryRequestHandler | 配置信息查询请求 |
HealthCheckRequest | HealthCheckRequestHandler | 心跳检测请求 |
InstanceRequest | InstanceRequestHandler | 服务注册请求 |
SubscribeServiceRequest | SubscribeServiceRequestHandler | 订阅某个服务请求,当某个服务的实例信息发生变化时,服务端会主动推送给订阅的客户端 |
ServiceQueryRequest | ServiceQueryRequestHandler | 查询某个服务的实例信息 |
ServiceListRequest | ServiceListRequestHandler | 列出一个服务的所有实例 |
ConfigBatchListenRequest | ConfigChangeBatchListenRequestHandler |
所有的处理类型在RequestHandlerRegistry中注册登记,RequestHandlerRegistry实现了ApplicationListener<ContextRefreshedEvent>
接口,在容器初始化发布refresh事件时,会触发其onApplicationEvent()方法,然后从容器中查找所有的RequestHandler.
Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
事件处理
在 Nacos 的服务端,NotifyCenter 是一个核心的事件机制组件,它负责处理 Nacos 中的各种异步事件通知。NotifyCenter基于 观察者模式实现,能够让不同的模块之间解耦,通过发布-订阅模式实现事件的传递和处理。NotifyCenter 在 Nacos 里是一个 内部事件总线(event bus)。比如 服务注册变更、实例上下线、配置变更都会通过它来通知订阅方。NotifyCenter 提供一个统一的事件发布/订阅机制。各模块只管发布事件(Event),不用关心谁在监听。订阅者只管订阅自己关心的事件。
Event :是消息的载体,他只管发生了什么。常见事件类型InstancesChangeEvent
(服务实例变更)、ConfigDataChangeEvent
(配置变更)
Publisher:事件发布器,维护一个消息队列,管理订阅者。默认事件发布器DefaultPublisher。
Subscriber:消费者,负责处理事件。接口方法onEvent(T event)用来处理事件,
事件处理流程
1.NotifyCenter初始化
DefaultPublisher的实例化在NotifyCenter的静态块中来完成。
static {//...//通过SPI方式加载配置的EventPublisher实现final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {//没有配置,使用默认的DefaultPublisherclazz = DefaultPublisher.class;}DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();//调用init()方法初始化publisherpublisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};//...}
DEFAULT_PUBLISHER_FACTORY类型是EventPublisherFactory,是一个函数式接口 (Functional Interface),函数方法apply()方法,这个方法返回EventPublisher实例。
EventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize)
eventType用来指定当前Publisher可以处理的Event类型,maxQueueSize设置消息队列最大队列深度。
2、事件绑定
NotifyCenter 提供有registerToPublisher(final Class<? extends Event> eventType, final EventPublisher publisher) 静态方法,用来将Event和Publisher进行绑定。这一步意思就是当发布什么事件时候需要用哪个Publisher来处理。NotifyCenter里维护了一个简单的映射关系Map<String, EventPublisher> publisherMap
,
这里map的key是Event类型的className
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {//使用默认的DEFAULT_PUBLISHER_FACTORY,第一步静态块初始化的DefaultPublisherreturn registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}//获取Event实例类型的全路径名final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {/**这里的MapUtil.computeIfAbsent()是nacos的一个工具类,需要5个参数参数1:map对象参数2: map中key值参数3:获取value的函数接口,这里facotry是EventPublisherFactory,函数方法是apply()参数4:函数接口参数1参数6:函数接口参数2这里实际最后就是publisherMap.computeIfAbsent(topic,factory.apply(eventType,queueMaxSize))*/MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);
}
3.DefaultPublisher初始化
上面放入publisherMap时会调用apply()方法获取Publisher实例, 在第一步NotifyCenter 静态块里,apply()方法实现最后会调用DefaultPublisher#init()来完成Publisher的初始化。DefaultPublisher是一个线程类并且实现了EventPublisher接口。
DefaultPublisher#init()方法
public void init(Class<? extends Event> type, int bufferSize) {setDaemon(true);//设置为守护线程//设置线程名称setName("nacos.publisher-" + type.getName());//可以处理的Event类型this.eventType = type;//消息队列最大深度this.queueMaxSize = bufferSize;if (this.queueMaxSize == -1) {this.queueMaxSize = ringBufferSize;}//创建消息队列this.queue = new ArrayBlockingQueue<>(this.queueMaxSize);//启动线程,开始干活start();
}
这里看到消息队列类型又是BlockingQueue阻塞队列,接着看线程的run方法
public void run() {openEventHandler();
}void openEventHandler() {try {//如果还未有消息订阅者,先等60秒int waitTimes = 60;while (!shutdown && !hasSubscriber() && waitTimes > 0) {ThreadUtils.sleep(1000L);waitTimes--;}while (!shutdown) {//从队列中获取Eventfinal Event event = queue.take();//处理EventreceiveEvent(event);//更新lastEventSequence值UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : ", ex);}
}
这里看到线程启动时候就是一直while循环从消息队列中尝试取消息,由于队列是阻塞队列,如果队列中没有消息会take方法处一直阻塞等待消息。拿到消息后交由receiveEvent()方法进行处理。
4.事件发布
NotifyCenter为事件的发布和订阅提供了一个统一的接口.publishEvent(final Event event),允许不同模块之间通过事件驱动的方式进行通信。这是一个静态方法,可以在需要发布事件地方直接调用。
NotifyCenter#publishEvent()
public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}
}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;
}
事件来到是,首先从映射表publisherMap<消息类型名称,EventPublisher>
中获取对应的Publisher方法,然后通过publisher.publish(event)方法发布事件。
DefaultPublisher#publish()
public boolean publish(Event event) {checkIsStart();boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;
}
没有生命特别的就是将事件放入队列中。
5.事件处理
第三步看当从队列中取出消息时,是通过DefaultPublisher#receiveEvent()方法来进行事件处理。
void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//如果消息没有订阅者结束返回if (!hasSubscriber()) {LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);return;}//逐个拿出订阅者进行消息通知for (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}notifySubscriber(subscriber, event);}
}
这里subscribers是一个ConcurrentHashSet<Subscriber> subscribers
类型的列表,通过NotifyCenter .registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory)方法用来绑定订阅事件,最后是通过Publisher.addSubscriber()将订阅者放入Publisher的订阅列表里,也就是subscribers这个集合里。
DefaultPublisher#notifySubscriber()
public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}
}
这里通过subscriber.onEvent()方法来处理消息,如果subscriber中有线程池则使用线程池执行,否则直接调用run()方法进行同步执行。
整个流程下来就是通过NotifyCenter将Event,Publisher,Subscriber串联起来。下面来个具体的事件来看下消息处理流程。