RocketMq中RouteInfoManger组件的源码分析
1.前言
RouteInfoManager
是 RocketMQ 中 NameServer
的核心组件之一,主要负责管理和维护整个 RocketMQ 集群的路由元数据信息。里面包含一些非常核心的功能:存储和管理 Broker 信息(broker的注册,broker心跳的维护);维护 Topic 的路由信息(topic的创建和更新,topic路由信息的查询);管理队列信息,管理集群信息等。
2.内部数据结构
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// broker长连接过期时间 长连接的空闲时间是2分钟
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
// 代表的broker组的信息 BrokerData包含了一组Broker的信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了
// 有可能会有很多复杂的业务场景 多个Cluster
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//管理Broker的长连接心跳 是否还有心跳
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Filter Server 是rocketMQ的一个高级功能,用来过滤消息
//一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤
//这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server
//我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server
// Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
3.核心方法
3.1 getAllClusterInfo
/**
* 返回的是 broker的cluster信息
* 里面包含的是HashMap<String //brokerName// BrokerData> brokerAddrTable
* HashMap<String //clusterName// , Set<String //brokerName// >> clusterAddrTable
* @return
*/
public ClusterInfo getAllClusterInfo() {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
return clusterInfoSerializeWrapper;
}
3.2 deleteTopic
/**
* 删除某个topic 直接操作topicQueueTable的hashMap
* @param topic
*/
public void deleteTopic(final String topic) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
public void deleteTopic(final String topic, final String clusterName) {
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (brokerNames != null
&& !brokerNames.isEmpty()) {
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
for (String brokerName : brokerNames) {
final QueueData removedQD = queueDataMap.remove(brokerName);
if (removedQD != null) {
log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic,
removedQD);
}
}
if (queueDataMap.isEmpty()) {
log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);
this.topicQueueTable.remove(topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
3.3 getAllTopicList
/**
* 查询所有的topic的列表信息
* @return
*/
public TopicList getAllTopicList() {
TopicList topicList = new TopicList();
try {
try {
this.lock.readLock().lockInterruptibly();
topicList.getTopicList().addAll(this.topicQueueTable.keySet());
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("getAllTopicList Exception", e);
}
return topicList;
}
3.4 registerBroker
详细的注册流程 可以看我以前的博客:RocketMQ中的NameServer主要数据结构-CSDN博客
/**
* broker的注册方法
* @param clusterName broker的集群名称
* @param brokerAddr broker的地址
* @param brokerName broker所属组的名称
* @param brokerId broker机器的id
* @param haServerAddr broker的ha地址
* @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据
* @param filterServerList broker上部署的filterServer的列表
* @param channel netty的网络长连接
* @return broker注册的结果
*/
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
//省略大部分代码
}
3.5 unregisterBroker
/**
* broker的下线逻辑处理
* @param clusterName 集群名
* @param brokerAddr 地址
* @param brokerName broker组的名字
* @param brokerId broker对应的id
*/
public void unregisterBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId) {
try {
try {
//加锁
this.lock.writeLock().lockInterruptibly();
//获取brokerLiveInfo对象 获取保活信息
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
//filterServerTable中删除broker的信息
this.filterServerTable.remove(brokerAddr);
boolean removeBrokerName = false;
//获取broker组中获取到brokerData信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//根据brokerId 从brokerData中移除掉BrokerId对应的地址
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddr
);
//broker组中的机器数量如果为空的话 就移除掉这个broker组的信息
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
}
}
//如果已经移除掉Broker组的信息的话
if (removeBrokerName) {
//从集群中移除掉这个broker组
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
//集群中的broker组的数量如果也为空的话 就移除掉这个集群的信息
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
//根据broker的名字移除掉topic的信息
this.removeTopicByBrokerName(brokerName);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
/**
* 根据broker的名字移除掉topic的信息
* @param brokerName
*/
private void removeTopicByBrokerName(final String brokerName) {
Set<String> noBrokerRegisterTopic = new HashSet<>();
this.topicQueueTable.forEach((topic, queueDataMap) -> {
QueueData old = queueDataMap.remove(brokerName);
if (old != null) {
log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);
}
if (queueDataMap.size() == 0) {
noBrokerRegisterTopic.add(topic);
log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
}
});
noBrokerRegisterTopic.forEach(topicQueueTable::remove);
}
//获取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表) 针对一个topic里有多个queues来进行路由
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<>();
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
//加一把读锁
this.lock.readLock().lockInterruptibly();
//从topicQueueTable中获取到topic对应的 QueueData
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
foundQueueData = true;
//从queueData中获取到broker名字的set集合
brokerNameSet.addAll(queueDataMap.keySet());
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
// skip if filter server table is empty
if (!filterServerTable.isEmpty()) {
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
// only add filter server list when not null
if (filterServerList != null) {
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
3.6 scanNotActiveBroker
扫描出心跳超时的broker,并针对超时的broker进行下线的操作
public int scanNotActiveBroker() {
// 这块的方法主要是brokerLiveTable的集合中的所有元素
//拿到broker最新一次的心跳时间
//broker的最新一次心跳时间+120s 小于 当前时间戳
//就把这个broker进行移除掉
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭连接的channel通道信息
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
//从内存中进行删除缓存的channel连接信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount++;
}
}
return removeCount;
}
//从brokerLiveTable中删除掉broker的保活信息并进行清理掉内存中的保活信息
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
//找到要进行删除的broker信息
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
//下面的代码开始进行删除broker的信息
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
this.lock.writeLock().lockInterruptibly();
//删除 brokerLiveTable中的broker信息
this.brokerLiveTable.remove(brokerAddrFound);
//删除 filterServerTable中的broker信息
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
//删除broker组中的broker信息
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果删除broker完成之后 发现broker组的信息也为空 那就把broker组进行删除操作
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
//删除cluster集群的中的broker组信息
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
//删除topic组在这个删除broker组中对应的信息也进行删除的操作
if (removeBrokerName) {
String finalBrokerNameFound = brokerNameFound;
Set<String> needRemoveTopic = new HashSet<>();
topicQueueTable.forEach((topic, queueDataMap) -> {
QueueData old = queueDataMap.remove(finalBrokerNameFound);
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, old);
if (queueDataMap.size() == 0) {
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
needRemoveTopic.add(topic);
}
});
needRemoveTopic.forEach(topicQueueTable::remove);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}