当前位置: 首页 > news >正文

RocketMq中RouteInfoManger组件的源码分析

1.前言

RouteInfoManager 是 RocketMQ 中 NameServer 的核心组件之一,主要负责管理和维护整个 RocketMQ 集群的路由元数据信息。里面包含一些非常核心的功能:存储和管理 Broker 信息(broker的注册,broker心跳的维护);维护 Topic 的路由信息(topic的创建和更新,topic路由信息的查询);管理队列信息,管理集群信息等。

2.内部数据结构

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // broker长连接过期时间 长连接的空闲时间是2分钟
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    //读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上
    private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
    //  代表的broker组的信息 BrokerData包含了一组Broker的信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了
    // 有可能会有很多复杂的业务场景 多个Cluster
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //管理Broker的长连接心跳 是否还有心跳
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // Filter Server 是rocketMQ的一个高级功能,用来过滤消息
    //一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤
    //这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server
    //我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server
    // Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

3.核心方法

    3.1 getAllClusterInfo

  /**
     * 返回的是 broker的cluster信息
     * 里面包含的是HashMap<String //brokerName//  BrokerData> brokerAddrTable
     * HashMap<String  //clusterName// , Set<String //brokerName// >> clusterAddrTable
     * @return
     */
    public ClusterInfo getAllClusterInfo() {
        ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
        clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
        clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
        return clusterInfoSerializeWrapper;
    }

3.2 deleteTopic

  /**
     * 删除某个topic 直接操作topicQueueTable的hashMap
     * @param topic
     */
    public void deleteTopic(final String topic) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.topicQueueTable.remove(topic);
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("deleteTopic Exception", e);
        }
    }

    public void deleteTopic(final String topic, final String clusterName) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (brokerNames != null
                    && !brokerNames.isEmpty()) {
                    Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
                    if (queueDataMap != null) {
                        for (String brokerName : brokerNames) {
                            final QueueData removedQD = queueDataMap.remove(brokerName);
                            if (removedQD != null) {
                                log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic,
                                    removedQD);
                            }
                        }
                        if (queueDataMap.isEmpty()) {
                            log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);
                            this.topicQueueTable.remove(topic);
                        }
                    }

                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("deleteTopic Exception", e);
        }
    }

3.3 getAllTopicList

   /**
     * 查询所有的topic的列表信息
     * @return
     */
    public TopicList getAllTopicList() {
        TopicList topicList = new TopicList();
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                topicList.getTopicList().addAll(this.topicQueueTable.keySet());
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("getAllTopicList Exception", e);
        }

        return topicList;
    }

3.4  registerBroker

   详细的注册流程 可以看我以前的博客:RocketMQ中的NameServer主要数据结构-CSDN博客

 /**
     * broker的注册方法
     * @param clusterName broker的集群名称
     * @param brokerAddr broker的地址
     * @param brokerName broker所属组的名称
     * @param brokerId   broker机器的id
     * @param haServerAddr broker的ha地址
     * @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据
     * @param filterServerList broker上部署的filterServer的列表
     * @param channel netty的网络长连接
     * @return broker注册的结果
     */
    public RegisterBrokerResult registerBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final Channel channel) {
        //省略大部分代码
    }

3.5 unregisterBroker

  /**
     * broker的下线逻辑处理
     * @param clusterName 集群名
     * @param brokerAddr 地址
     * @param brokerName broker组的名字
     * @param brokerId broker对应的id
     */
    public void unregisterBroker(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId) {
        try {
            try {
                //加锁
                this.lock.writeLock().lockInterruptibly();
                //获取brokerLiveInfo对象 获取保活信息
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                        brokerLiveInfo != null ? "OK" : "Failed",
                        brokerAddr
                );

                //filterServerTable中删除broker的信息
                this.filterServerTable.remove(brokerAddr);

                boolean removeBrokerName = false;
                //获取broker组中获取到brokerData信息
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null != brokerData) {
                    //根据brokerId 从brokerData中移除掉BrokerId对应的地址
                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                            addr != null ? "OK" : "Failed",
                            brokerAddr
                    );
                    //broker组中的机器数量如果为空的话 就移除掉这个broker组的信息
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                                brokerName
                        );

                        removeBrokerName = true;
                    }
                }
                //如果已经移除掉Broker组的信息的话
                if (removeBrokerName) {
                    //从集群中移除掉这个broker组
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if (nameSet != null) {
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                                removed ? "OK" : "Failed",
                                brokerName);
                        //集群中的broker组的数量如果也为空的话 就移除掉这个集群的信息
                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                    clusterName
                            );
                        }
                    }
                    //根据broker的名字移除掉topic的信息
                    this.removeTopicByBrokerName(brokerName);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
    }

    /**
     *  根据broker的名字移除掉topic的信息
     * @param brokerName
     */
    private void removeTopicByBrokerName(final String brokerName) {
        Set<String> noBrokerRegisterTopic = new HashSet<>();

        this.topicQueueTable.forEach((topic, queueDataMap) -> {
            QueueData old = queueDataMap.remove(brokerName);
            if (old != null) {
                log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);
            }

            if (queueDataMap.size() == 0) {
                noBrokerRegisterTopic.add(topic);
                log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
            }
        });

        noBrokerRegisterTopic.forEach(topicQueueTable::remove);
    }

    //获取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表)  针对一个topic里有多个queues来进行路由

    public TopicRouteData pickupTopicRouteData(final String topic) {
        TopicRouteData topicRouteData = new TopicRouteData();
        boolean foundQueueData = false;
        boolean foundBrokerData = false;
        Set<String> brokerNameSet = new HashSet<>();
        List<BrokerData> brokerDataList = new LinkedList<>();
        topicRouteData.setBrokerDatas(brokerDataList);

        HashMap<String, List<String>> filterServerMap = new HashMap<>();
        topicRouteData.setFilterServerTable(filterServerMap);

        try {
            try {
                //加一把读锁
                this.lock.readLock().lockInterruptibly();
                //从topicQueueTable中获取到topic对应的 QueueData
                Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
                if (queueDataMap != null) {
                    topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
                    foundQueueData = true;
                    //从queueData中获取到broker名字的set集合
                    brokerNameSet.addAll(queueDataMap.keySet());

                    for (String brokerName : brokerNameSet) {
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                    .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;

                            // skip if filter server table is empty
                            if (!filterServerTable.isEmpty()) {
                                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                    List<String> filterServerList = this.filterServerTable.get(brokerAddr);

                                    // only add filter server list when not null
                                    if (filterServerList != null) {
                                        filterServerMap.put(brokerAddr, filterServerList);
                                    }
                                }
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("pickupTopicRouteData Exception", e);
        }

        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

        if (foundBrokerData && foundQueueData) {
            return topicRouteData;
        }

        return null;
    }

3.6 scanNotActiveBroker

  扫描出心跳超时的broker,并针对超时的broker进行下线的操作

  public int scanNotActiveBroker() {
        // 这块的方法主要是brokerLiveTable的集合中的所有元素
        //拿到broker最新一次的心跳时间
        //broker的最新一次心跳时间+120s 小于 当前时间戳
        //就把这个broker进行移除掉
        int removeCount = 0;
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                //关闭连接的channel通道信息
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                //从内存中进行删除缓存的channel连接信息
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

                removeCount++;
            }
        }

        return removeCount;
    }

    //从brokerLiveTable中删除掉broker的保活信息并进行清理掉内存中的保活信息
    public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        //找到要进行删除的broker信息
        if (channel != null) {
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                            this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

        if (null == brokerAddrFound) {
            brokerAddrFound = remoteAddr;
        } else {
            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
        }

        //下面的代码开始进行删除broker的信息
        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    //删除 brokerLiveTable中的broker信息
                    this.brokerLiveTable.remove(brokerAddrFound);
                    //删除 filterServerTable中的broker信息
                    this.filterServerTable.remove(brokerAddrFound);
                    String brokerNameFound = null;
                    boolean removeBrokerName = false;
                    //删除broker组中的broker信息
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                            this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();

                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            if (brokerAddr.equals(brokerAddrFound)) {
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                        brokerId, brokerAddr);
                                break;
                            }
                        }
                        //如果删除broker完成之后 发现broker组的信息也为空 那就把broker组进行删除操作
                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                    brokerData.getBrokerName());
                        }
                    }

                    //删除cluster集群的中的broker组信息
                    if (brokerNameFound != null && removeBrokerName) {
                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                        brokerNameFound, clusterName);

                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                            clusterName);
                                    it.remove();
                                }

                                break;
                            }
                        }
                    }
                    //删除topic组在这个删除broker组中对应的信息也进行删除的操作
                    if (removeBrokerName) {
                        String finalBrokerNameFound = brokerNameFound;
                        Set<String> needRemoveTopic = new HashSet<>();

                        topicQueueTable.forEach((topic, queueDataMap) -> {
                            QueueData old = queueDataMap.remove(finalBrokerNameFound);
                            log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                    topic, old);

                            if (queueDataMap.size() == 0) {
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                        topic);
                                needRemoveTopic.add(topic);
                            }
                        });

                        needRemoveTopic.forEach(topicQueueTable::remove);
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }
    }

相关文章:

  • 【java】作业1
  • Ai知识点总结
  • 每日一题——不同路径的数目与矩阵最小路径和
  • 性格测评小程序07用户登录
  • 【第14章:神经符号集成与可解释AI—14.2 可解释AI技术:LIME、SHAP等的实现与应用案例】
  • 2025年2月16日笔记
  • NSSCTF Pwn [HUBUCTF 2022 新生赛]singout WP
  • 二〇二四年终总结
  • 搭建Deepseek推理服务
  • dify新版,chatflow对deepseek的适配情况
  • bps是什么意思
  • 网络安全:从攻击到防御的全景解析
  • AI视频创作教程:如何用AI让古画动起来。
  • 动量突破均值回归策略
  • 【PYTORCH】官方的turoria实现中英文翻译
  • 水务+AI应用探索(一)| FastGPT+DeepSeek 本地部署
  • 团体程序设计天梯赛-练习集——L1-041 寻找250
  • nlf 3d pose 部署学习笔记
  • (尚硅谷 Java 学习 B 站大学版)Day17 多态练习
  • 目标检测IoU阈值全解析:YOLO/DETR模型中的精度-召回率博弈与工程实践指南
  • 韩德洙成为韩国执政党总统大选候选人
  • 习近平会见古巴国家主席迪亚斯-卡内尔
  • 本周看啥|喜欢二次元的观众,去电影院吧
  • 专访|“甲亢哥”的操盘手,带NBA球星们玩转中国流量
  • 央视315晚会曝光“保水虾仁”后,湛江4家涉事企业被罚超800万元
  • 降准又降息!央行发布3类10项措施