【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件
文章目录
- 1. 前言
- 2. NettyEventExecutor 线程
- 3. NettyEvent 是怎么来的
- 4. NettyEventExecutor 线程处理不同事件的逻辑
- 4.1 IDLE\CLOSE\EXCEPTION - onChannelIdle
- 4.2 CONNECT - onChannelConnect
- 5. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
上一篇文章《【RocketMQ NameServer】- NameServer 启动源码》我们介绍了 NameServer 启动的核心源码,最后也留下了一个问题,就是 NettyEventExecutor 是如何处理 Netty 事件的,这篇文章就来看下。
2. NettyEventExecutor 线程
NettyEventExecutor 本质上是一个线程,上一篇文章中我们说过了就是如果 BrokerHousekeepingService
不为空,就启动 NettyEventExecutor 线程去处理里面的 Netty 事件。由于这个类代码量比较少,所以直接给出全部的代码。
class NettyEventExecutor extends ServiceThread {private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();private final int maxSize = 10000;/*** 将 Netty 事件添加到 eventQueue 中* @param event*/public void putNettyEvent(final NettyEvent event) {int currentSize = this.eventQueue.size();if (currentSize <= maxSize) {this.eventQueue.add(event);} else {log.warn("event queue size [{}] over the limit [{}], so drop this event {}", currentSize, maxSize, event.toString());}}@Overridepublic void run() {log.info(this.getServiceName() + " service started");// 获取事件监听器, 这个监听器就是 BrokerHousekeepingServicefinal ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();while (!this.isStopped()) {try {// 从 eventQueue 阻塞队列中拉取 Netty 事件NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);if (event != null && listener != null) {// 判断事件类型switch (event.getType()) {case IDLE:listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());break;case CLOSE:listener.onChannelClose(event.getRemoteAddr(), event.getChannel());break;case CONNECT:listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());break;case EXCEPTION:listener.onChannelException(event.getRemoteAddr(), event.getChannel());break;default:break;}}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}@Overridepublic String getServiceName() {return NettyEventExecutor.class.getSimpleName();}
}
其实这个方法比较简单,就是线程不断轮循 eventQueue
,从里面取出 NettyEvent
,然后根据不同的状态走不通的处理逻辑。不够要注意一下,listener
是在创建 NettyRemotingServer 的时候设置的。
而创建 NettyRemotingServer 则是在初始化 NamesrvController 的时候设置进去的,可以看到设置进去的就是 BrokerHousekeepingService
。
3. NettyEvent 是怎么来的
NettyEvent 是在 NettyConnectManageHandler
处理入站出站消息的时候产生的,NettyConnectManageHandler 这个类在前一篇文章我们也说过在启动 NameServer 的时候(调用 start() 方法)会调用 this.remotingServer.start()
去启动 Netty 服务端,同时也把 NettyConnectManageHandler 设置到了 NioSocketChannel 的 Pipeline 中。
当 Netty 服务端接受到消息或者发送消息的时候都会经过这个处理器,并且针对不同事件会创建不同的 NettyEvent 加入到集合 eventQueue
中。
@ChannelHandler.Sharable
class NettyConnectManageHandler extends ChannelDuplexHandler {/*** NioSocketChannel 注册到 NioEventLoop* @param ctx* @throws Exception*/@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);super.channelRegistered(ctx);}/*** NioSocketChannel 从 NioEventLoop 中销毁* @param ctx* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);super.channelUnregistered(ctx);}/*** NioSocketChannel 连接成功连接到远程地址或者已经接收了一个新的连接* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);super.channelActive(ctx);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));}}/*** 连接已经关闭或者断开* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);super.channelInactive(ctx);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));}}/*** 用户自定义事件* @param ctx* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.ALL_IDLE)) {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);RemotingUtil.closeChannel(ctx.channel());if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));}}}ctx.fireUserEventTriggered(evt);}/*** 连接事件处理过程中发生异常* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));}RemotingUtil.closeChannel(ctx.channel());}
}
可以看到在 NettyConnectManageHandler 中会针对不同的事件来生成不同的 NettyEvent,比如当有连接建立的时候会生成一个 NettyEventType.CONNECT
类型的事件,连接已经关闭或者断开会生成一个 NettyEvent(NettyEventType.CLOSE
类型的事件,用户自定义事件被触发就会生成一个 NettyEventType.IDLE
事件,当 Channel 在处理过程中发生异常会触发 NettyEventType.EXCEPTION
事件,这里大家就先看下这些概念,因为已经很久没看过 Netty 这块的源码了,到时候看完了再回来详细补充下这些事件的触发机制。
4. NettyEventExecutor 线程处理不同事件的逻辑
既然知道 NetyEvent 从哪来,那么下面就继续回到第二节的内容,看看 NettyEventExecutor 线程里面是怎么处理这些事件的。
4.1 IDLE\CLOSE\EXCEPTION - onChannelIdle
IDLE 事件类型的通过 listener.onChannelIdle(event.getRemoteAddr(), event.getChannel())
去处理,这个类型的事件是触发了用户自定义的事件,下面来看下里面的逻辑。
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {// 连接销毁事件this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
可以看到这个方法走的就是连接销毁的方法,这个方法在 RouteInfoManager 里面,意思是如果触发了用户自定义的事件就是空闲连接需要销毁?
/*** 要销毁的连接的地址* @param remoteAddr* @param channel*/
public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;if (channel != null) {try {try {// 加读锁this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {// 是否找到对应的 broker 连接通道brokerAddrFound = entry.getKey();break;}}} finally {// 解除读锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}// 如果没找到就赋值为 remoteAddrif (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 加写锁this.lock.writeLock().lockInterruptibly();// 从 brokerLiveTable 集合删掉这个 brokerthis.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;// 然后再遍历 brokerAddrTable, 也要维护这个 broker 集群集合的地址消息Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();/*** 1. 维护 brokerAddrTable*/while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {// 获取这个集群下面的 broker 信息BrokerData brokerData = itBrokerAddrTable.next().getValue();// 遍历这个集群下面的主从节点Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 如果找到了要删除的if (brokerAddr.equals(brokerAddrFound)) {// 记录下要删除的 brokerNamebrokerNameFound = brokerData.getBrokerName();// 删掉it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}if (brokerData.getBrokerAddrs().isEmpty()) {// 如果说删除了这个 broker 信息之后这个集群下面的 broker 地址为空了 removeBrokerName = true;// 那么就将这个 brokerName -> brokerData 的映射也删掉, 表明此时这个 broker 集群已经没有节点了itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}/*** 2. 维护 clusterAddrTable*/if (brokerNameFound != null && removeBrokerName) {// 然后如果说删除了一个 brokerName, 也要维护 clusterAddrTable 集合, 这个集合记录了 clusterName -> Set(brokerName) 的集合, // 这里要说下 clusterAddrTable 这个集合, broker 配置文件可以指定 brokerClusterName 为集群名称, 同时这个集群里面可以包含多个 broker// 主从集群, 这些主从 broker 集群的 brokerName 是一样的, 依靠 brokerId 区分主节点还是从节点, 所以这个 table 的 value 是 Set 集合Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();// 集群名称String clusterName = entry.getKey();// 集群下面的 broker 主从集群的 brokerName 集合Set<String> brokerNames = entry.getValue();// 从集合中删掉boolean removed = brokerNames.remove(brokerNameFound);if (removed) {// 如果删掉了就打印日志log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);// 如果删掉这个 brokerName 之后, 集群下面没有 broker 集群了, 就删掉这个 clusterif (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}/*** 3. 维护 topicQueueTable*/if (removeBrokerName) {// 遍历 topicQueueTable 下面的 topic 信息Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();// 遍历所有 topicString topic = entry.getKey();// 遍历 topic 的队列配置信息List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {// 因为 topic 下面的队列可以分配到不同 broker 上面, 所以需要把分配到这个 brokerName 集群上面的队列也// 移除掉QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {// 从集合中移除itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}if (queueDataList.isEmpty()) {// 如果移除之后已经没有队列了, 那么这个 topic 队列配置信息也要删掉了itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {// 解除写锁this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}
}
上面这个方法就是 IDLE 事件的销毁逻辑,broker 向 NameServer 注册信息的时候包括创建 topic 信息的时候都会将一些配置信息存到不同的集合中,上面的方法就是处理这些集合:
- brokerLiveTable: broker 地址 -> broker 活跃信息,里面维持了已建立连接的 broker 的连接通道。
- filterServerTable: broker 地址 -> 这个 broker 的过滤服务器地址,过滤服务器在当前这个版本已经看不到了,所以可以忽略。
- brokerAddrTable: brokerName -> broker 信息的集合(注意主从集群的 brokerName 是一样的,这样才能够存储一个集群下面的所有 broker 地址)。
- topicQueueTable: topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是一个 List。
- clusterAddrTable: 集群名称 -> 集群下面的所有 broker 主从集群的 broker 名称(主从 broker 的 brokerName 一般是一样的)
比如说将这个通道对应的地址删掉之后,需要判断下这个地址是在哪个 brokerName 下面的,然后判断如果删掉之后这个 brokerName 集群下面还有没有其他的地址,如果没有说明这个 broker 集群可以从 brokerAddrTable
中删掉了,接着再去处理剩余的几个队列。
下面就到了 clusterAddrTable
,处理逻辑是一样的,找出哪个 cluster 集群下面存在这个 broker 地址,然后删掉,接着判断删掉之后这个 cluster 集群下面还有没有 broker 地址,如果没有了,就直接把这个集群也从集合中删掉。
然后维护 topicQueueTable
,一个 topic 的队列可以分配到不同的 broker 上,所以如果 brokerName 被删掉了,就需要将分配在这个 brokerName 上面的队列信息也给删掉,然后同理判断下这个 topic 是否还存在队列信息,如果不存在,那么这个 topic 也可以从这个集合中删掉了。
但是大家要注意一点就是,维护 topicQueueTable
的时候是需要判断 brokerName 是否被删掉的,也就是说当这个连接通道对应的 broker 地址被删掉之后,brokerName 这个集群下面可能还有其他 broker 地址,可以是从节点,这种情况下 brokerName 就不会从 brokerAddrTable 中删掉,也就不会维护 topicQueueTable 的地址了。
当然这些 broker 信息、topic 信息是什么时候存到这些集合里面的,这些等到后面介绍 broker 启动或者创建 topic 的时候再详细说。
4.2 CONNECT - onChannelConnect
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
这个方法就是一个空方法,实际上也是,当连接建立的时候 broker 会通过定时任务上报信息给 NameServer,所以理论上是不需要在这里实现的。
5. 小结
好了,最后来总结一下,NettyEventExecutor 就是专门处理 NettyConnectManageHandler 这个处理器在处理入站出站数据的时候添加的 NettyEvent 事件,对于 IDLE\CLOSE\EXCEPTION 这三个类型的 NettyEvent,最终会调用 onChannelIdle 去删除 broker 连接,同时处理 brokerLiveTable、filterServerTable、brokerAddrTable、topicQueueTable、clusterAddrTable 这几个集合,而对于连接事件则是通过 onChannelConnect 去处理,但是这个方法是一个空方法。
如有错误,欢迎指出!!!!