当前位置: 首页 > news >正文

【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3)

文章目录

  • 1. 前言
  • 2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
  • 3. prepareHeartbeatData 准备心跳数据
  • 4. sendHearbeat 发送心跳上报请求
  • 5. broker 处理心跳请求
    • 5.1 heartBeat 处理心跳包
    • 5.2 createTopicInSendMessageBackMethod 创建重传 topic
    • 5.3 registerConsumer 注册消费者
      • 5.3.1 updateChannel 更新消费者连接通道
      • 5.3.2 updateSubscription 更新订阅消息
      • 5.3.3 通知消费者重平衡
      • 5.3.4 处理消费者注册事件
        • 5.3.4.1 register 注册消费者过滤信息
        • 5.3.4.2 register 创建 FilterDataMapByTopic 对象
        • 5.3.4.3 register 注册过滤信息
  • 6. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
  • 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
  • 【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)

上两篇文章中我们探讨了生产者的启动流程,这篇文章就来看下生产者启动之后如何发送心跳信息到 broker。


2. sendHeartbeatToAllBrokerWithLock 上报心跳信息

这个方法就是用于上报生产者的心跳信息到所有的 broker,不过可以注意到这个方法是在 MQClientInstance 里面的,上一篇文章我们就说了 MQClientInstance 是生产者和消费者的公共类,所以这个方法上报心跳的时候会把消费者和生产者的信息都一起上报到 broker 中,当然这里的生产者和消费者是指同一个进程的。

/*** 发送心跳信息到所有 broker*/
private void sendHeartbeatToAllBroker() {// 心跳数据,这里是先准备一个心跳包final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();// 如果没有任何生产者和消费者的数据,就不需要发送心跳包if (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);return;}// brokerAddrTable 不为空if (!this.brokerAddrTable.isEmpty()) {// 发送心跳的次数 + 1long times = this.sendHeartbeatTimesTotal.getAndIncrement();// 遍历所有 brokerIterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();// 获取 broker 名字String brokerName = entry.getKey();// 这里的 oneTable 是指 id -> address, 每一个 broker 都需要记录下来这个 broker 所在的集群的其他 broker 地址// oneTable 的 key 就是 id,0 表示主节点,其他数字表示从节点,value 就是节点的地址HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {// 这里就是如果消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了if (consumerEmpty) {if (id != MixAll.MASTER_ID)// producer 生产者只需要和从节点发送心跳即可, 但是如果也有消费者,那么也可以往从节点continue;}try {// 这里就是把心跳包发送给 brokerint version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}// 将版本设置到 brokerVersionTable 中// brokerName -> (address, version) 的映射关系this.brokerVersionTable.get(brokerName).put(addr, version);// 每发送 20 次心跳就打印一次日志if (times % 20 == 0) {log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e);}}}}}}}
}

上面就是这个方法的源码,首先就通过 prepareHeartbeatData 准备一个心跳包,如果说心跳包里面的生产者和消费者都为空,就说明不需要发送心跳信息,直接返回。

如果不为空,需要发送心跳信息,需要遍历所有 broker,由于 broker 集群是以 brokerName 为标记,所以会遍历 brokerAddrTable 集合中的所有 key(brokerName),然后处理 value,value 是 HashMap<Long, String> 类型,Long 是 brokerId,String 是这个 broker 的地址,意思就是 brokerName 集群下面的主从节点。

遍历 broker 时会判断如果心跳包里面的消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了,生产者只需要负责和主节点建立心跳,因为生产者生产消息都是直接存储到主节点的,从节点负责同步。

接下来调用 sendHearbeat 把心跳包发送给 broker,返回 broker 记录的心跳信息的版本,然后存储到本地缓存 brokerVersionTable 中。

/*** broker 信息版本, brokerName 集群 -> (address -> 版本)*/
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =new ConcurrentHashMap<String, HashMap<String, Integer>>();

3. prepareHeartbeatData 准备心跳数据

/*** 准备心跳数据包* @return*/
private HeartbeatData prepareHeartbeatData() {// 心跳包HeartbeatData heartbeatData = new HeartbeatData();// 客户端 IDheartbeatData.setClientID(this.clientId);// 遍历所有消费者for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {// 构建 ConsumerData 数据ConsumerData consumerData = new ConsumerData();// 消费者组consumerData.setGroupName(impl.groupName());// 消费类型: PULL 和 PUSHconsumerData.setConsumeType(impl.consumeType());// 消费模式: CLUSTERING 和 BROADCASTING, 也就是集群和广播consumerData.setMessageModel(impl.messageModel());// 消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中consumerData.setConsumeFromWhere(impl.consumeFromWhere());// 订阅信息,包括过滤消息相关标签、SQL规则consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());consumerData.setUnitMode(impl.isUnitMode());// 加入消费者的心跳数据集合中heartbeatData.getConsumerDataSet().add(consumerData);}}// 生产者的心跳for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {// 生产者组名ProducerData producerData = new ProducerData();producerData.setGroupName(entry.getKey());// 添加到生产者集合里面heartbeatData.getProducerDataSet().add(producerData);}}return heartbeatData;
}

对于消费者要设置的心跳信息为:

  • groupName:消费者组
  • consumeType:消费类型,PULL 和 PUSH
  • messageModel:消费模式,CLUSTERING 和 BROADCASTING, 也就是集群和广播
  • consumeFromWhere:消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
  • subscriptionDataSet:消费者组订阅信息,包括过滤消息相关标签、SQL规则
  • unitMode

对于生产者要设置的心跳信息为:

  • groupName:生产者组

4. sendHearbeat 发送心跳上报请求

/*** 发送心跳包给 broker 节点* @param addr* @param heartbeatData* @param timeoutMillis* @return* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public int sendHearbeat(final String addr,final HeartbeatData heartbeatData,final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {// 构建心跳请求,请求编码是 HEART_BEATRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);// 请求语言,默认是 JAVArequest.setLanguage(clientConfig.getLanguage());// 请求体,就是心跳包request.setBody(heartbeatData.encode());// 这里就是通过 Netty 发送心跳包了RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 返回结果,返回版本号return response.getVersion();}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

这个方法就是向 broker 发送心跳信息,可以看到发送之前在请求里面设置了一些属性:

  • 请求语言,默认是 JAVA
  • 心跳包,也就是上面的生产者和消费者
  • 请求 Code 是 HEART_BEAT

最后请求是使用同步发送的,如果发送成功就返回 broker 返回的版本。


5. broker 处理心跳请求

broker 通过 ClientManageProcessor 处理器来处理心跳请求,处理的方法就是 heartBeat
在这里插入图片描述


5.1 heartBeat 处理心跳包

/*** 处理客户端心跳请求* @param ctx* @param request* @return*/
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {// 构建响应命令对象RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解码,获取心跳包数据HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);// 构建客户端连接信息对象ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());// 处理心跳包中的消费者信息for (ConsumerData data : heartbeatData.getConsumerDataSet()) {// 从 broker 的缓存中找出当前消费者组的订阅组配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());// 当 Consumer 发生变化的时候是否需要通知组内其他的 Consumerboolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {// 从配置中获取 isNotifyConsumerIdsChangedEnable,默认就是 trueisNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 消息消费的重试队列,%RETRY%groupNameString newTopic = MixAll.getRetryTopic(data.getGroupName());// 创建消息消费重试队列this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}// 注册消费者boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}// 注册生产者for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}// 设置返回结果response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}

这个方法会分别处理消费者和生产者的心跳信息,对于生产者比较简单,就是调用 registerProducer 注册生产者即可,但是对于消费者就需要给消费者组建立一个重传队列,然后再注册消费者。

重试队列就是消费者消费消息失败的时候会把消息发送到这个队列进行重试,重试队列是以消费者组为维度的,也就是说消费者重传是以消费者组为维度的,一个消费者组里面的消费者共享一个重传队列。


5.2 createTopicInSendMessageBackMethod 创建重传 topic

/*** 创建重传 topic,持久化到配置文件中,文件地址 ${user.home}/store/config/topics.json* @param topic // 待创建的 topic* @param clientDefaultTopicQueueNums // topic 下面的默认队列数, 默认 1* @param perm // 队列权限, 默认读写都有* @param topicSysFlag // topic 标识* @return*/
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 从 topic 配置中获取下这个 topicTopicConfig topicConfig = this.topicConfigTable.get(topic);// 如果已经存在配置了,那么直接返回if (topicConfig != null)return topicConfig;boolean createNew = false;try {// 新建的时候需要加锁,防止创建相同的 topic 互相覆盖if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 再次获取 topic 信息,双重检查锁topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;// 这里就是新建配置topicConfig = new TopicConfig(topic);// 重传队列的读写队列数都是 1topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);// 权限默认是读写topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 添加到 topicConfigTable 集合里面this.topicConfigTable.put(topic, topicConfig);createNew = true;// 获取下一个版本,这个版本用于标识当前 topicConfigTable 有没有发生变化,比如在从节点同步 topicConfigTable 的// 时候就可以使用版本和本地存储的版本进行队列,如果发生了变化再重新写入文件中this.dataVersion.nextVersion();// 持久化到文件中,文件地址: ${user.home}/store/config/topics.jsonthis.persist();} finally {// 解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 如果创建了新的 topicif (createNew) {// 注册 broker 信息this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;
}

创建重传 topic 时首先从 broker 的本地缓存 topicConfigTable 中获取 topic 的配置,然后如果已经存在 topic 配置了,就直接返回,否则才去创建。

在新建 topic 的时候需要加锁,防止并发创建 topic,可以看到重传 topic 设置的属性如下:

  • 读写队列数都是 1
  • 权限默认是可读写
  • topic 系统标识

创建出来之后设置到本地缓存 topicConfigTable 中,就代表创建成功了,然后更新版本 dataVersion,接着持久化到文件中,文件地址: ${user.home}/store/config/topics.json,更新版本就是因为 broker 需要向 NameServer 注册 topic 配置信息,把版本也传过去标识 broker 心跳版本,可以说 broker 的心跳就是 topic 配置信息。

最后如果创建了新的 topic,就会像 NameServer 注册 topic 信息,registerBrokerAll 的逻辑可以看这篇文章:【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer。


5.3 registerConsumer 注册消费者

/*** 注册消费者* @param group                             消费者组* @param clientChannelInfo                 客户端连接通道* @param consumeType                       消费类型(PULL 或者 PUSH)* @param messageModel                      消费模式(集群或者广播)* @param consumeFromWhere                  消费点位* @param subList                           消费者组订阅信息, 一个消费者组里面的消费者可以订阅多个 topic 下面的消息* @param isNotifyConsumerIdsChangedEnable  消费者组里面的消费者发生变化时是否需要通知其他消费者重平衡* @return*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 获取消费者组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {// 创建一个新的 ConsumerGroupInfoConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);// 如果不存在才新增ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}// 是否在这个消费者组里面新增了消费者连接boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 是否更新了消费者组订阅信息boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果发生了变更if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 通知消费者重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// 处理消费者注册事件this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);// 是否发生了变更return r1 || r2;
}

注册消费者需要处理两件事,首先就是注册消费者连接到 channelInfoTable 集合中,channelInfoTable 集合用于管理消费者组下面的连接信息,key 是连接,value 是 ClientChannelInfo,是连接信息。

// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);public class ClientChannelInfo {// 连接通道private final Channel channel;// 客户端 IDprivate final String clientId;// 一般是 JAVAprivate final LanguageCode language;// 版本private final int version;// 上一次上报心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}

然后就是更新消费者组的订阅信息,就是本地 subscriptionTable 集合。


5.3.1 updateChannel 更新消费者连接通道

/*** 更新消费者连接* @param infoNew               客户端连接通道* @param consumeType           消费类型(PULL 或者 PUSH)* @param messageModel          消费模式(集群或者广播)* @param consumeFromWhere      消费点位* @return*/
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated = false;this.consumeType = consumeType;this.messageModel = messageModel;this.consumeFromWhere = consumeFromWhere;// 获取原来的连接ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());if (null == infoOld) {// 如果不存在就新增ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);if (null == prev) {log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,messageModel, infoNew.toString());// 标记设置为 trueupdated = true;}infoOld = infoNew;} else {// 如果已存在就判断需不需要更新if (!infoOld.getClientId().equals(infoNew.getClientId())) {// 这里就是出现 BUG 了, 因为正常来说一个连接通道和一个 clientId 对应, 记录下日志log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",this.groupName,infoOld.toString(),infoNew.toString());// 重新修正 channelInfoTable 里面的映射关系this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}// 更新 lastUpdateTimestamp 属性, 表示当前消费者组上一次上报心跳的时间this.lastUpdateTimestamp = System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated;
}

更新连接通道的逻辑就是更新 channelInfoTable 集合,然后更新下 lastUpdateTimestamp,因为这个方法是在 ConsumerGroupInfo 中的,所以更新的 lastUpdateTimestamp 就代表这个消费者组上一次上报心跳时间。更新这个属性是因为 broker 有一个定时任务


5.3.2 updateSubscription 更新订阅消息

public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;// 1. 遍历所有订阅信息, 将 subList 设置到 subscriptionTable 中for (SubscriptionData sub : subList) {// 获取原来的订阅信息SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {// 如果获取不到就新建SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}// 如果当前新增的订阅信息版本比原来的要大且消费者类型是 PUSH} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}// 更新订阅信息集合, 这种情况下关系到消费者拉取消息就要更新, 如果是 PULL 类型由于是用户控制就不需要更新this.subscriptionTable.put(sub.getTopic(), sub);}}// 2. 删除不存在的订阅信息Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {// 遍历所有订阅信息Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;for (SubscriptionData sub : subList) {if (sub.getTopic().equals(oldTopic)) {// 如果 subscriptionTable 里面存储的 topic 订阅信息不在 subList 集合中, 说明消费者没有上报过来exist = true;break;}}if (!exist) {// 如果不存在, 说明这个订阅信息被修改了log.warn("subscription changed, group: {} remove topic {} {}",this.groupName,oldTopic,next.getValue().toString());// 删掉这个订阅信息it.remove();updated = true;}}// 设置上报心跳的时间this.lastUpdateTimestamp = System.currentTimeMillis();return updated;
}

更新订阅消息流程分为两大步,首先是遍历所有订阅信息, 将消费者上报的 subList 设置到 subscriptionTable 中,如果 broker 中没有存储这个订阅消息,又或者存储的这个订阅信息版本过期了,就会更新到 subscriptionTable 中。

然后就是删除不存在的订阅信息,比如原来 subscriptionTable 存在 topicA -> subAtopicB -> subB 订阅信息,而消费者上报过来的消费者组订阅信息是 topicA -> subA,那么 topicB -> subB 就会被删掉,这也透露一件事,就是消费者组里面的消费者订阅关系需要一直,具体可以看官方的这篇文章:RocketMQ 订阅关系。


5.3.3 通知消费者重平衡

当消费者组里面新增了消费者又或者消费者的订阅关系发生了变化,比如又订阅了多一个 topic,这种情况下会通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()) 通知消费者进行重平衡。当然这个方法在 【RocketMQ Broker 相关源码】- NettyRemotingClient 和 NettyRemotingServer 这篇文章的 3.3.1 有讲解,所以这里不再多说。


5.3.4 处理消费者注册事件

同样的注册事件也是通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()) 来处理,而这里的注册主要指的是消费者过滤信息的注册,就是 SQL92 过滤信息的注册。
在这里插入图片描述
可以看到就是通过 register 方法去注册过滤信息,下面就从这个方法入手。


5.3.4.1 register 注册消费者过滤信息
/*** 注册消费者组过滤信息* @param consumerGroup* @param subList*/
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {for (SubscriptionData subscriptionData : subList) {// 一个一个注册register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getExpressionType(),subscriptionData.getSubVersion());}// 获取消费者组过滤信息Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();// 遍历旧的过滤信息while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {// 现在新上报的订阅信息是已经存在的if (subscriptionData.getTopic().equals(filterData.getTopic())) {// 信息存在, 不需要删除exist = true;break;}}// 如果这个过滤信息已经不再上报了, 将原来的过滤信息过期事件设置为当前时间, 相当于懒删除了if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());// 日志输出log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);}}
}/*** 获取消费者组的过滤信息* @param consumerGroup* @return*/
public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();// 遍历 filterDataByTopicIterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();while (topicIterator.hasNext()) {FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();// 获取 topic 下面的消费者过滤信息Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();// 遍历这些过滤信息while (filterDataIterator.hasNext()) {ConsumerFilterData filterData = filterDataIterator.next();// 如果跟参数消费者组一样, 说明这个 topic 是这个消费者组需要消费的if (filterData.getConsumerGroup().equals(consumerGroup)) {// 添加到返回集合中ret.add(filterData);}}}return ret;
}

客户端上报过来的心跳信息里面的消费者订阅信息包括了消费者的一些过滤信息,之前也说过了消费者组里面的消费者订阅关系需要保持一致,所以消费者上报过来的订阅信息就可以认为是这个消费者组的订阅信息,因此可以看到遍历 subList 一个一个注册之后,需要将旧的过滤信息里面没有上报过来的删掉,不过这里的删掉是懒删除,只是设置下 ConsumerFilterData#deadTime 为当前时间,就表示这个过滤信息已经过期了。

getByGroup 就是获取这个消费者组的消费者过滤信息,获取的逻辑是遍历 filterDataByTopic 集合来获取。filterDataByTopic 以 topic 为 key,因为一个 topic 可以被多个消费者组下面的消费者去消费,所以 value 是 FilterDataMapByTopic 对象,这个对象里面的属性是一个 ConcurrentMap 集合和 topic,集合的 key 是消费者组,value 是消费者组的过滤信息。

/*** 消费者过滤信息, 一个 topic 可以被多个消费者组下面的消费者去消费, 所以这里是以 topic 为 key*/
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);public static class FilterDataMapByTopic {/*** 消费者组的过滤信息*/private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();private String topic;...
}

getByGroup 的遍历逻辑就是遍历所有 topic 下面的 FilterDataMapByTopic,然后继续遍历 FilterDataMapByTopic 里面的 groupFilterData 属性,判断这个 topic 是否被这个消费者组消费,逻辑并不复杂,就是里面的集合有点绕。


5.3.4.2 register 创建 FilterDataMapByTopic 对象

继续回到 register 方法,这个方法就是负责创建出 FilterDataMapByTopic,上面我们也说了 filterDataByTopic 以 FilterDataMapByTopic 为 value,所以这个方法主要还是创建出 FilterDataMapByTopic 对象。

/*** 注册 SQL92 信息到消费者组集合 filterDataByTopic 中* @param topic* @param consumerGroup* @param expression* @param type* @param clientVersion* @return*/
public boolean register(final String topic, final String consumerGroup, final String expression,final String type, final long clientVersion) {// 如果是 TAG 类型的过滤信息, 直接返回if (ExpressionType.isTagType(type)) {return false;}// 如果 SQL92 过滤表达式为空, 直接返回if (expression == null || expression.length() == 0) {return false;}// 获取 topic 下面的过滤信息集合, 注意这里就是消费者组下面一个 topic 一个过滤信息集合FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {// 不存在就新建一个FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;}// 计算出 consumerGroup#topic 的布隆过滤器信息// 1.经过 k 次 hash 求出来的 k 位// 2.布隆过滤器总共多少位BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);// 注册过滤信息到 filterDataMapByTopic 中return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}

register 方法会在一开始就判断如果过滤类型是 TAG 就不需要注册,那就是说明 filterDataByTopic 这个集合 只存储 SQL92 过滤类型的过滤数据

这个方法逻辑不多,值得关注的是 bloomFilter.generate,可以看到对传入的 consumerGroup#topic 进行 hash 之后生成布隆过滤器信息,下面就看下这个布隆过滤器信息包括什么。

在讲解 generate 方法前,我们来想一下布隆过滤器有什么重要信息:

  1. hash 函数个数 k
  2. bit 数组长度 m
  3. 误差率 fpp

对于布隆过滤器,如果要往里面设置一个字符串,就需要先通过 k 个哈希函数求出 k 个位,然后将布隆过滤器的这 k 个位设置为1,如果感兴趣可以看这篇文章:详细说说布隆过滤器 BloomFilter。

对于消费者过滤信息,创建的 BloomFilterData 包括两个重要信息:这 k 个 hash 函数求出来的 bit 数组(长度为 k)bit 数组长度 m
在这里插入图片描述


5.3.4.3 register 注册过滤信息

上面创建好布隆过滤器信息之后,最终调用 register 注册过滤信息,而注册过滤信息主要就是往 groupFilterData 集合设置,上面也说了,groupFilterData 集合存储的是 consumerGroup -> ConsumerFilterData 的映射关系。

/*** 注册过滤信息* @param consumerGroup     消费者组* @param expression        SQL92 过滤表达式* @param type              过滤类型, SQL92* @param bloomFilterData   布隆过滤器数据* @param clientVersion     订阅信息版本* @return*/
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,long clientVersion) {// 获取旧的过滤信息ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {// 创建一个新的 ConsumerFilterDataConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;}// 设置布隆过滤器信息consumerFilterData.setBloomFilterData(bloomFilterData);// 添加到 groupFilterData 中old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {// 这里就是新增成功, 直接返回log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 并发添加了, 注意 FilterDataMapByTopic 是以 topic 为维度的, 一个 topic 可以被多个消费者组消费if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {// 打印下日志, 意思就是并发添加的这两过滤信息还不一样log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}// 版本一样, 另一个线程添加的过滤信息过期了, 就直接设置下过期时间为 0, 重新启用if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}// 版本一样同时另一个线程添加的没有过期或者当前线程版本比较低, 当前线程就注册失败了return false;} else {// 这里就是新增的版本比原来的要高, 直接覆盖原来的this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;}}} else {// 原来过滤信息已经存在了且当前添加的版本 <= 原来的if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {// 打印下日志, 意思就是过滤表达式不一样log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);}// 如果跟原来的版本一样就直接重新启用, 逻辑和上面的一样if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;}// 添加的版本比原来的版本还要低或者相同但是原来的没有过期return false;}// 这里就是添加的版本比原来的高, 但是还是得看下过滤信息是否有变化, 首先是过滤表达式和类型boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);// 然后是布隆过滤器信息是否发生了变化if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;}if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;}// 过滤表达式发生了变化if (change) {// 创建新的过滤表达式ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.// 这里就是过滤表达式有问题, 将旧的删掉, 然后直接返回 false 表示注册失败, 所以说如果表达式有问题也会把旧的删掉this.groupFilterData.remove(consumerGroup);return false;}// 设置布隆过滤器数据consumerFilterData.setBloomFilterData(bloomFilterData);// 创建成功, 添加到 groupFilterData 中this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {// 过滤表达式没有发生变化, 直接设置版本号old.setClientVersion(clientVersion);// 如果旧的已经过期了, 重新启用if (old.isDead()) {reAlive(old);}return true;}}
}protected void reAlive(ConsumerFilterData filterData) {// 重新设置过期时间为 0long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
}

里面的注释都写得比较清楚,也可以看到这里面维护 groupFilterData 集合是根据两种情况来维护,一种是原来没有,一种是原来有的,如果是原来集合里面就有 group -> ConsumerFilterData 的映射,就需要判断当前上报的过滤信息版本和原来的对比,如果版本不是最新的就会被覆盖,一般来说如果过滤信息不变版本都不会变。

然后在这个方法中也可以看到当 old == null 的时候应该是为了防止并发,使用 putIfAbsent 添加成功之后返回 old,还要判断这个 old 是不是已经存在了,如果存在还需要对比添加的版本来决定留下哪个。

最后注意下里面的 reAlive 方法,这个方法就是将一个过期的过滤信息变可用,过滤信息 ConsumerFilterData 里面有一个属性 deadTime 表示过期时间,如果设置成不为 0 就表示过期了,所以这个 reAlive 方法就是重新将这个属性设置为 0,表示复用里面的过滤信息。


6. 小结

好了,这篇文章中我们探讨了上报心跳信息到 broker 的源码,可以看到上传的心跳信息里面不单单包括生产者,也包括消费者的,所以这个方法是生产者和消费者的共用方法。

而 broker 处理心跳的时候对于消费者不单单要处理消费者连接信息,同时也要处理消费者组订阅消息以及 SQL92 过滤信息,也就是上面的 5.3 小节。





如有错误,欢迎指出!!!!

相关文章:

  • 复盘20250522
  • LeetCode 76题「最小覆盖子串」
  • 从零基础到最佳实践:Vue.js 系列(8/10):《性能优化与最佳实践》
  • Spring AI 之提示词
  • 论文解读 | 《桑黄提取物对小鼠宫颈癌皮下移植瘤的抑制及机制研究》
  • 红黑树插入的旋转变色
  • 使用C语言实现字符串拷贝与程序编译全解析 ——从strcopy实现到程序内存布局,一文掌握核心知识
  • FPGA通信之VGA
  • 【结构体宏定义】C语言结构体与宏定义:传感器配置的巧妙结合
  • transformer网络
  • 全栈开发中主流 AI 编程辅助工具的实践与对比分析20250522
  • thinkpad x220降频到0.7Ghz解决办法
  • 小白的进阶之路系列之三----人工智能从初步到精通pytorch计算机视觉详解下
  • Python 训练 day31
  • Python打卡训练营day32
  • 改写文章打造原创内容,ai智能改写工具在线高效完成!
  • 点云技术原理概要
  • Oracle 的V$ACTIVE_SESSION_HISTORY 视图
  • 大语言模型 18 - MCP Model Context Protocol 基本项目 测试案例
  • 技术篇-2.3.Golang应用场景及开发工具安装
  • 龙文国土局漳滨村新农村建设网站/百度资源搜索资源平台
  • 外贸单页网站案例/新闻危机公关
  • 网站建设方案书 下载/东莞关键词优化推广
  • 长白山网站学做管理平台/谷歌seo技巧
  • 免费下载微信/seo人人网
  • wordpress hosts/百度seo推广方案