当前位置: 首页 > news >正文

nacos 服务端与客户端通讯流程实现

服务端处理客户端连接及心跳请求不管是OpenApi Http方式还是RPC方式所实现的功能都是一样的。

这里将会以RPC方式来看下服务端和客户端交互的基本流程。

RPC服务启动

在nacos-core包中定义有GrpcSdkServer其继承自BaseRpcServer,是一个@Service修饰的bean,并且有@PostConstruct注解修饰其start()方法,在创建该bean的时候会调用其start()方法来启动RPC服务。同样服务端口的绑定和http端口有一个1000的偏移量。然后设定RPC的请求处理会交由GrpcRequestAcceptor来处理。

客户端建立连接

客户端与服务端建立连接后会通过ConnectionManager.register()方法存放到ConnectionManager中。

请求处理

GrpcRequestAcceptor 负责处理客户端发来的 RPC 请求,它是 Nacos gRPC 服务器的请求接受和处理核心类。GrpcRequestAcceptor 的 request() 方法是处理请求的关键入口。

客户端请求信息被封装到一个Payload对象中,首先从请求元数据中获取请求的类型,通过ApplicationUtils.isStarted()判断当前RPC服务是否已经启动,如果未启动返回一个错误响应,提示客户端稍后再试。

如果请求类型是 服务器健康检查请求ServerCheckRequest,则响应一个健康检查的结果,告知客户端服务器正在运行。不需要特殊处理。

如果不是健康检测请求,根据请求类型从请求处理器注册表requestHandlerRegistry获取请求处理器

RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);

如果没有找到对应的处理器,则记录警告日志并返回错误。

在处理请求之前,还会检查请求的连接 ID 是否有效。如果连接 ID 无效,则返回连接未注册的错误信息。

boolean requestValid = connectionManager.checkValid(connectionId);

connectionManager存有一个Map<String, Connection> connections 保存所有的客户端连接。

再往下会进行请求内容解析,如果解析失败,捕获异常并返回错误响应。请求内容封装到Request中。

parseObj = GrpcUtils.parse(grpcRequest);

最后使用对应的requestHandler来出去请求,生成响应返回到客户端。

常见的请求类型处理

能处理的请求类型RequestHandler描述
ServerCheckRequest处理服务器健康检查请求,返回服务器是否正常运行的信息。
ConfigQueryRequestConfigQueryRequestHandler配置信息查询请求
HealthCheckRequestHealthCheckRequestHandler心跳检测请求
InstanceRequestInstanceRequestHandler服务注册请求
SubscribeServiceRequestSubscribeServiceRequestHandler订阅某个服务请求,当某个服务的实例信息发生变化时,服务端会主动推送给订阅的客户端
ServiceQueryRequestServiceQueryRequestHandler查询某个服务的实例信息
ServiceListRequestServiceListRequestHandler列出一个服务的所有实例
ConfigBatchListenRequestConfigChangeBatchListenRequestHandler

所有的处理类型在RequestHandlerRegistry中注册登记,RequestHandlerRegistry实现了ApplicationListener<ContextRefreshedEvent>接口,在容器初始化发布refresh事件时,会触发其onApplicationEvent()方法,然后从容器中查找所有的RequestHandler.

Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);

事件处理

在 Nacos 的服务端,NotifyCenter 是一个核心的事件机制组件,它负责处理 Nacos 中的各种异步事件通知。NotifyCenter基于 观察者模式实现,能够让不同的模块之间解耦,通过发布-订阅模式实现事件的传递和处理。NotifyCenter 在 Nacos 里是一个 内部事件总线(event bus)。比如 服务注册变更、实例上下线、配置变更都会通过它来通知订阅方。NotifyCenter 提供一个统一的事件发布/订阅机制。各模块只管发布事件(Event),不用关心谁在监听。订阅者只管订阅自己关心的事件。

Event :是消息的载体,他只管发生了什么。常见事件类型InstancesChangeEvent(服务实例变更)、ConfigDataChangeEvent(配置变更)

Publisher:事件发布器,维护一个消息队列,管理订阅者。默认事件发布器DefaultPublisher。

Subscriber:消费者,负责处理事件。接口方法onEvent(T event)用来处理事件,

事件处理流程

1.NotifyCenter初始化

DefaultPublisher的实例化在NotifyCenter的静态块中来完成。

static {//...//通过SPI方式加载配置的EventPublisher实现final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {//没有配置,使用默认的DefaultPublisherclazz = DefaultPublisher.class;}DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();//调用init()方法初始化publisherpublisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};//...}

DEFAULT_PUBLISHER_FACTORY类型是EventPublisherFactory,是一个函数式接口 (Functional Interface),函数方法apply()方法,这个方法返回EventPublisher实例。

EventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize)

eventType用来指定当前Publisher可以处理的Event类型,maxQueueSize设置消息队列最大队列深度。

2、事件绑定

NotifyCenter 提供有registerToPublisher(final Class<? extends Event> eventType, final EventPublisher publisher) 静态方法,用来将Event和Publisher进行绑定。这一步意思就是当发布什么事件时候需要用哪个Publisher来处理。NotifyCenter里维护了一个简单的映射关系Map<String, EventPublisher> publisherMap

这里map的key是Event类型的className

public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {//使用默认的DEFAULT_PUBLISHER_FACTORY,第一步静态块初始化的DefaultPublisherreturn registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}//获取Event实例类型的全路径名final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {/**这里的MapUtil.computeIfAbsent()是nacos的一个工具类,需要5个参数参数1:map对象参数2: map中key值参数3:获取value的函数接口,这里facotry是EventPublisherFactory,函数方法是apply()参数4:函数接口参数1参数6:函数接口参数2这里实际最后就是publisherMap.computeIfAbsent(topic,factory.apply(eventType,queueMaxSize))*/MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);
}

3.DefaultPublisher初始化

上面放入publisherMap时会调用apply()方法获取Publisher实例, 在第一步NotifyCenter 静态块里,apply()方法实现最后会调用DefaultPublisher#init()来完成Publisher的初始化。DefaultPublisher是一个线程类并且实现了EventPublisher接口。

DefaultPublisher#init()方法

public void init(Class<? extends Event> type, int bufferSize) {setDaemon(true);//设置为守护线程//设置线程名称setName("nacos.publisher-" + type.getName());//可以处理的Event类型this.eventType = type;//消息队列最大深度this.queueMaxSize = bufferSize;if (this.queueMaxSize == -1) {this.queueMaxSize = ringBufferSize;}//创建消息队列this.queue = new ArrayBlockingQueue<>(this.queueMaxSize);//启动线程,开始干活start();
}

这里看到消息队列类型又是BlockingQueue阻塞队列,接着看线程的run方法

public void run() {openEventHandler();
}void openEventHandler() {try {//如果还未有消息订阅者,先等60秒int waitTimes = 60;while (!shutdown && !hasSubscriber() && waitTimes > 0) {ThreadUtils.sleep(1000L);waitTimes--;}while (!shutdown) {//从队列中获取Eventfinal Event event = queue.take();//处理EventreceiveEvent(event);//更新lastEventSequence值UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : ", ex);}
}

这里看到线程启动时候就是一直while循环从消息队列中尝试取消息,由于队列是阻塞队列,如果队列中没有消息会take方法处一直阻塞等待消息。拿到消息后交由receiveEvent()方法进行处理。

4.事件发布

NotifyCenter为事件的发布和订阅提供了一个统一的接口.publishEvent(final Event event),允许不同模块之间通过事件驱动的方式进行通信。这是一个静态方法,可以在需要发布事件地方直接调用。

NotifyCenter#publishEvent()

public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}
}private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;
}

事件来到是,首先从映射表publisherMap<消息类型名称,EventPublisher>中获取对应的Publisher方法,然后通过publisher.publish(event)方法发布事件。

DefaultPublisher#publish()

public boolean publish(Event event) {checkIsStart();boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;
}

没有生命特别的就是将事件放入队列中。

5.事件处理

第三步看当从队列中取出消息时,是通过DefaultPublisher#receiveEvent()方法来进行事件处理。

void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//如果消息没有订阅者结束返回if (!hasSubscriber()) {LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);return;}//逐个拿出订阅者进行消息通知for (Subscriber subscriber : subscribers) {if (!subscriber.scopeMatches(event)) {continue;}// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",event.getClass());continue;}notifySubscriber(subscriber, event);}
}

这里subscribers是一个ConcurrentHashSet<Subscriber> subscribers类型的列表,通过NotifyCenter .registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory)方法用来绑定订阅事件,最后是通过Publisher.addSubscriber()将订阅者放入Publisher的订阅列表里,也就是subscribers这个集合里。

DefaultPublisher#notifySubscriber()

public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}
}

这里通过subscriber.onEvent()方法来处理消息,如果subscriber中有线程池则使用线程池执行,否则直接调用run()方法进行同步执行。

整个流程下来就是通过NotifyCenter将Event,Publisher,Subscriber串联起来。下面来个具体的事件来看下消息处理流程。

http://www.dtcms.com/a/411884.html

相关文章:

  • Docker05-Redis启动
  • 给你一个网站如何做推广wordpress 页面加载时间 查询次数_和内存
  • 【LeetCode 每日一题】120. 三角形最小路径和——(解法二)自底向上
  • 教育培训门户网站模板下载网上有什么做兼职的网站
  • AI工具使用随笔
  • 基于PyQt5的邮件客户端开发:完整实现与深度解析
  • 建手机wap网站大概多少钱页面模板够30条
  • idea可以做网站吗大数据营销网站
  • wordpress导航站源码天津做系统集成的公司网站
  • 郑州网站制作公司怎么样wordpress 视频站主题
  • 谷歌外贸网站seo怎么做免费舆情信息网站
  • 整站优化和单词深圳互联网公司集中在哪个区
  • 【crud】update
  • Qt键盘输入法的开源方案
  • 门户网站 建设 通知wordpress略缩图压缩
  • .net网站开发简介广州平面设计培训机构
  • 沭阳网站设计绿植网站怎么做
  • DVWA | XSS 跨站脚本注入
  • 建设银行网站公告在哪wordpress前台浏览量插件
  • Java的认识及环境搭载
  • 网站做著作权网络信息公司是做什么的
  • Milvus 2.6 Data-in, Data-out,简化向量搜索
  • 鄠邑区建设和住房保障局网站广东深圳龙岗区天气
  • 级a做爰片免费视网站看看如何判断网站被google k
  • 陕西建设技术学院网站网站超市安装
  • 【读书笔记】架构整洁之道 P2~3 编程范式设计原则
  • 基于 PyTorch 的 CIFAR-10 图像分类学习总结
  • (附源码)医院门诊综合管理系统
  • 做外贸经常用的网站网站中单选按钮怎么做
  • 国家合同模板网站wordpress 首页伪静态