当前位置: 首页 > news >正文

【RocketMQ 生产者和消费者】- 消费者的订阅关系一致性

文章目录

  • 1. 前言
  • 2. 提问
  • 3. 什么是订阅关系一致性
    • 3.1 消费者订阅多个 topic
  • 4. 什么是消费订阅不一致
  • 5. topic 相同但是 tag 不相同
    • 5.1 订阅信息上报源码分析
    • 5.2 示例
    • 5.3 影响
  • 6. topic 不同但是 tag 相同
    • 6.1 示例
    • 6.2 源码分析
  • 7. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
  • 【RocketMQ 生产者和消费者】- 消费者启动源码
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
  • 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响

上面的文章我们讲解了消费者重平衡的源码以及消费者 ID 对负载均衡的影响,这篇文章我们来看下消费者的订阅关系。


2. 提问

首先提出一个问题,为什么需要让一个消费者组里面的消费者保持订阅一致性。


3. 什么是订阅关系一致性

首先确定下,什么是订阅关系一致性,订阅关系我们知道,就是消费者的订阅信息,比如:

  • 消费者 A 订阅了 topicA 下面的 TagA
  • 消费者 B 订阅了 topicB 下面的 TagB

订阅了什么 topic 下面的什么 tag,就是订阅关系,那么订阅关系一致性就是消费者订阅的信息是不是相同的,这里我们直接用官网给的例子来看,图片来自:订阅关系(Subscription),感兴趣的朋友可以去看下官网对于订阅关系的介绍。
在这里插入图片描述
在上面图中可以看到,消费者组 ConsumerGroupA 下面有三个消费者 ConsumerA1、ConsumerA2、ConsumerA3,这三个消费者都是订阅的 TopicA 下面的 Tag a,而消费者组 ConsumerGroupB 下面的三个消费者 ConsumerB1、ConsumerB2、ConsumerB3 订阅的是 TopicA 下面的 Tag b。这种情况下我们就说这两个消费者组内的消费者订阅关系都是一致的,注意下不同消费者组是可以订阅同一个 topic 的,订阅的 tag 不同就可以区分不同的消息

在这里插入图片描述
第二种就是一个消费者组 Consumer Group A 下面的消费者都订阅了两个 topic,分别是 topicA 的 Tag a和 topic B 的 tag b,这种情况下每一个消费者里面有两个订阅关系,互相独立,互不影响。这里消费者其实也是可以订阅多个 topic 的,我们可以看下例子。


3.1 消费者订阅多个 topic

首先设置下生产者,让生产者往 TopicTestA 下面的 TagA 和 TopicTestB 下面的 TagB 发送消息。

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("testGroup");producer.setNamesrvAddr("http://127.0.0.1:9876");producer.start();// 发送 10 条 TopicTestA.TagA 的消息for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestA","TagA",("Hello RocketMQ " + i + "TopicTestA.TagA").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}// 发送 10 条 TopicTestB.TagB 的消息for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestB" /* Topic */,"TagB" /* Tag */,("Hello RocketMQ " + i + "TopicTestB.TagB").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}/** Shut down once the producer instance is not longer in use.*/producer.shutdown();}
}

在这里插入图片描述

然后启动消费者,订阅 TopicTestA 下面的 TagA 和 TopicTestB 下面的 TagB。

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTestA", "TagA");consumer.subscribe("TopicTestB", "TagB");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

在这里插入图片描述
可以看到消费者确实消费了两个订阅关系下面的消息,其实深入到源码也可以看到下面设置订阅关系就是将订阅表达式和 topic 设置到 rebalanceImpl 下面的 subscriptionInner 集合,所以一个消费者是可以订阅多个 topic 表达式的。

/*** 消息订阅* @param topic             消费者消费的 topic* @param subExpression     消费者的订阅表达式, 用于订阅 TAG, 所以这里是默认的 TAG 表达式, 如果是 SQL92 就不是走这个方法了*                          例子: "TAGA || TAGB || TAGC"* @throws MQClientException*/
public void subscribe(String topic, String subExpression) throws MQClientException {try {// 解析订阅表达式,构建 SubscriptionData, 也就是消费者的订阅信息SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);// 将 topic 和消费者订阅信息存储到 subscriptionInner 中this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {// 向所有 broker 发送心跳信息this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}
}

4. 什么是消费订阅不一致

那么有了上面的例子,我们就可以知道什么是消费订阅不一致了,比如一个消费者组下面:

  • 消费者 A 订阅 TopicTestA 下面的 TagA
  • 消费者 B 订阅 TopicTestA 下面的 TagB

这种情况下这个消费者组下面的消费者订阅不一致,也就是 topic 相同但是 tag 不相同,同样的还有下面的案例:

  • 消费者 A 订阅 TopicTestA 下面的 TagA
  • 消费者 B 订阅 TopicTestB 下面的 TagA

这种情况下就是 topic 不同但是 tag 相同,也是订阅关系不一致,那么订阅关系不一致会有什么问题呢?


5. topic 相同但是 tag 不相同

5.1 订阅信息上报源码分析

首先是 topic 相同但是 tag 不同,由于是两个消费者订阅相同 topic 下面的不同 tag,所以要想知道这种情况会发生什么,就得看消费者上报心跳的源码,在这篇文章:【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3) 中有消费者心跳上报的详细逻辑。
在这里插入图片描述
我们还是从源码角度出发看看这种情况下会发生什么,上面的心跳文章并没有介绍这一类特殊情况,这里我们补一下,我们先来看下订阅信息的属性。

/*** 消费者的订阅情况* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");* consumer.subscribe("Topic1", "*");* consumer.subscribe("Topic2", "*");** 一个消费者是可以订阅多个 topic 下面的一个或者多个 tag* 一个消费者也可以订阅一个 topic 下面的一个或者多个 tag*/
public class SubscriptionData implements Comparable<SubscriptionData> {// 默认订阅所有的 tagpublic final static String SUB_ALL = "*";// Consumer 端类过滤模式,用户自定义过滤类时才会用private boolean classFilterMode = false;// 消息主题private String topic;// 订阅的表达式,比如说使用 TAG 过滤的时候可以设置为 "tag1 || tag2 || tag3",意思是过滤这几个 TAGprivate String subString;// 订阅的 TAG 集合private Set<String> tagsSet = new HashSet<String>();// 订阅的 TAG 集合的 hashCode 值private Set<Integer> codeSet = new HashSet<Integer>();// 订阅信息的版本,就是设置成当前时间,根据这个来判断订阅信息有没有发生变化private long subVersion = System.currentTimeMillis();// 表达式类型,是 TAG 还是 SQL92private String expressionType = ExpressionType.TAG;...
}

再来看下 broker 处理消费者订阅信息的逻辑,也就是消费者的 registerConsumer 方法,在这个方法中通过 updateSubscription 去更新消费者的订阅信息。

public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;// 1. 遍历所有订阅信息, 将 subList 设置到 subscriptionTable 中for (SubscriptionData sub : subList) {// 获取原来的订阅信息SubscriptionData old = this.subscriptionTable.get(sub.getTopic());if (old == null) {// 如果获取不到就新建SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}// 如果当前新增的订阅信息版本比原来的要大且消费者类型是 PUSH} else if (sub.getSubVersion() > old.getSubVersion()) {if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}// 更新订阅信息集合, 这种情况下关系到消费者拉取消息就要更新, 如果是 PULL 类型由于是用户控制就不需要更新this.subscriptionTable.put(sub.getTopic(), sub);}}// 2. 删除不存在的订阅信息Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {// 遍历所有订阅信息Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;for (SubscriptionData sub : subList) {if (sub.getTopic().equals(oldTopic)) {// 如果 subscriptionTable 里面存储的 topic 订阅信息不在 subList 集合中, 说明消费者没有上报过来exist = true;break;}}if (!exist) {// 如果不存在, 说明这个订阅信息被修改了log.warn("subscription changed, group: {} remove topic {} {}",this.groupName,oldTopic,next.getValue().toString());// 删掉这个订阅信息it.remove();updated = true;}}// 设置上报心跳的时间this.lastUpdateTimestamp = System.currentTimeMillis();return updated;
}

上面更新消费者订阅信息的逻辑第一个就是维护原有订阅信息的同时处理那些没有再上报的订阅信息,这里注意下就是 subscriptionTable 这个集合是在 ConsumerGroupInfo 这个类下面的,且以 topic 为 key,所以如果同一个消费者组下的两个消费者订阅了相同 topic 下面的不同 tag,这种情况下第二个 Consumer 上报心跳的时候就会走到 else if (sub.getSubVersion() > old.getSubVersion()) 这个分支,由于订阅信息里面的 subVersion 创建出来的时候设置的就是当前时间,所以这个 else if 肯定会为 true,于是就导致订阅信息覆盖了。
在这里插入图片描述


5.2 示例

上面我们说了订阅信息上报的过程,下面就来演示下,同样的,我们启动两个消费者,一个订阅 TopicTestA 下面的 TagA,一个订阅 TopicTestA 下面的 TagB。

public class ConsumerA {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTestA", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}public class ConsumerB {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTestA", "TagB");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

然后我们启动 broker,同时在刚刚这个 updateSubscription 方法中输出一些日志。
在这里插入图片描述
启动 NameServer、Broker、ConsumerA、ConsumerB,注意看 broker 控制台的输出。

在这里插入图片描述

首先启动 ConsumerA 的时候可以看到新增了 TopicTestA 下面的订阅信息为 TagA,所以此时 subscriptionTable 里面会保存 TopicTestA -> TagA 的记录,下面重传 topic 的不用管,然后我们启动 ConsumerB。
在这里插入图片描述
可以发现,当启动 TagB 的时候 TopicTestA 下面的订阅信息改成了 TagB,就把 TagA 的覆盖了,相当于 ConsumerA 的订阅关系被覆盖了。
在这里插入图片描述

不过大家也发现了,最后当 ConsumerA 和 ConsumerB 继续上报订阅信息的时候,broker 的订阅信息不再更新了,这是因为 ConsumerB 最后上报心跳修改了订阅信息之后,broker 的版本就已经是最新的了,这时候就算 ConsumerA 上报了 TagA,但是版本没有 broker 存储的新,所以最后 broker 就会一致保持 TagB 的订阅关系。


5.3 影响

上面我们也演示了这个问题,下面来看下这种订阅关系有什么影响,其实最直接的就是消息消费影响,比如 Consumer1 本来是订阅了 TagA 的值,但是最后从消息队列里面没办法拉取到 TagA 的数据。

在文章开头前言的部分我们已经讲述了消费者负载均衡的原理以及各种策略,由于这两个消费者都是同一个 Group 下面的,同时又是订阅的一个 topic,所以这种情况下这两个消费者能正常分配消息队列,且分配的结果应该是(我这里说的是默认的情况):
在这里插入图片描述
但是,分配到了队列不代表消息一定能消费了,由于现在还没有看到消费者消费的源码分析,但是可以简单说一下,消费者如果设置了 Tag,会做两层过滤:

  • broker 层面的 Tag HashCode 过滤
  • Consumer 层面的 equals 过滤

broker 在处理消费者的拉取请求的时候可以看到在 getMessage 里面会去通过 hashCode 校验去先筛选出符合条件的消息。
在这里插入图片描述
在消费者拉取到了消息之后,再通过 equals 精确判断一条消息的 tag 是不是这个消费者订阅的 tag。
在这里插入图片描述
由于消费者层面的 tag 信息是没有变化的,也就是 ConsumerA 在消费者本地存储的还是 TagA 的订阅信息,ConsumerB 存储的是 TagB 的,所以消费者层面的过滤是没有问题的,但是 broker 层面的过滤就有问题了。

在这里插入图片描述
broker 在处理消息拉取请求的时候会从 ConsumerGroupInfo 中获取这个 topic 下面的订阅信息,然后用这个订阅信息里面的 TagCode 来过滤,由于订阅信息里面是 TagB,会导致消费者 A 获取到的都是 TagB 的消息,自然在本地判断的时候不通过,也就没办法消费了。
在这里插入图片描述
上面就是大致流程图了,下面我们也可以模拟下,同样还是 5.2 的示例代码,只是我们加多一个生产者,生产 10 条 TagA 和 TagB 的代码,看看两个消费者的消费情况。

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("testGroup");producer.setNamesrvAddr("http://127.0.0.1:9876");producer.start();// 发送 10 条 TopicTestA.TagA 的消息for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestA","TagA",("Hello RocketMQ " + i + "TopicTestA.TagA").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}// 发送 10 条 TopicTestA.TagB 的消息for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTestA" /* Topic */,"TagB" /* Tag */,("Hello RocketMQ " + i + "TopicTestA.TagB").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}/** Shut down once the producer instance is not longer in use.*/producer.shutdown();}
}

两个消费者的输出结果如下:
在这里插入图片描述
在这里插入图片描述

可以看到 Consumer 一条消息都没有消费,而 ConsumerB 则是消费了 6 条消息,TagB 明明发送了 10 条消息,怎么会消费了 6 条呢?这就要看下生产者的输出了。
在这里插入图片描述
红框的 10 条就是发送给 TagB 的 10 条消息,而消费者消费的 [0,1,4,5,8,9] 这几个编号消费的是队列 [2,3] 的消息,根据上一篇文章负载均衡讲述的默认策略 AllocateMessageQueueAveragely 。CosumerA 分配到的是 [0,1] 两个队列,ConsumerB 分配到的是 [2,3] 两个队列,所以剩下的四条队列 0 和 1 的就没有消费了,因为 ConsumerA 本地过滤丢掉了,我们关闭 ConsumerA,然后只留下一个 ConsumerB,可以看到就算负载均衡将所有队列都分配给 ConsumerB,剩下的四条消息也不会再被消费。
在这里插入图片描述
所以这也能证明,确实消息是分配到了 ConsumerA 之后被过滤掉了,这样队列 [0,1] 的消费偏移量 offset 会被更新成下一次要拉取的位置,ConsumerB 也不会再拉取到这几条没有消费的 TagB 消息。


6. topic 不同但是 tag 相同

上面我们看了 topic 相同但是 tag 不同的问题,这里来看下 topic 不同但是 tag 相同的问题,还是用下官网的图:订阅关系一致
在这里插入图片描述
上面就是 topic 不同但是 tag 相同的订阅关系,消费者组中 C1,C2,C3 都订阅了不同 topic 下面的所有 tag,那么这种情况下又会有什么问题呢?


6.1 示例

从前面的负载均衡文章中我们可以得知消费者负载均衡是通过消费者组来负载的,因此在 broker 负载均衡阶段是不会有什么问题的,比如还是上面 5.2 的例子,两个消费者 ConsumerA、ConsumerB,一个订阅 TopicTestA 下面的 tagA,一个订阅 TopicTestB 下面的 tagA,接着我们分别从控制台来看下这两个 topic 的队列分配。

public class ConsumerA {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTestA", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}public class ConsumerB {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");consumer.setNamesrvAddr("localhost:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("TopicTestB", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

在这里插入图片描述
在这里插入图片描述

可以看到这个消费者组里面不管是 TopicTestA 还是 TopicTestB 都没办法分配给两个消费者,TopicTestA 下面的队列 2、3 没有分配给 ConsumerB,反过来也一样,但是注意下上面两个 Topic 不是说这个消费者组里面会存储两个 topic 的订阅信息,而是不断变化,比如一会存了 TopicTestA 的 TagA,一会又会被修改成 TopicTestB 的 TagA,比如下面的图片。
在这里插入图片描述
在这里插入图片描述


6.2 源码分析

上面就是发现的问题了,主要有两个问题:

  1. topic 下面的队列没办法完全分配给消费者
  2. 消费者组里面的订阅信息不断变化

首先来看第 2 个问题,为什么消费者组里面的订阅信息会不断变化,首先看下 broker 控制台的输出日志,我提前加了几行代码输出。
在这里插入图片描述
可以看到当 ConsumerB 上报心跳的时候 broker 订阅信息里面是 TopicTestA 下面的 TagA,而更新之后,再当 ConsumerA 上报心跳时,broker 里面的订阅消息又变成了 TopicTestB 下面的 TagA,当然上面当 ConsumerA 上报心跳的时候 “当前 broker 订阅信息” 这里显示的时 %RETRY%testGroupConsumer,那是因为 TopicTestB.TagA 在后面,截图接不下了,但是知道就行。下面来看下源码,将目光放到 updateSubscription 这个方法,上面 5.1 也是这个方法。
在这里插入图片描述
首先第一部分就是新增不存在的订阅关系,由于是以 topic 为 key,所以当 ConsumerA 或者 ConsumerB 上报订阅关系的时候自然是不存在的,所以会新增,这种情况下集合 subscriptionTable 里面就包含了 TopicTestA 和 TopicTestB 的订阅关系,我们再来看第二部分。
在这里插入图片描述
这第二部分就是删除不存在的订阅关系,在第一部分的代码中 subscriptionTable 里面已经包含了 TopicTestA 和 TopicTestB 的订阅关系,但是第二部分会把不存在的删掉,比如消费者 ConsumerB 上报的 TopicTestB,这种情况下就会将 TopicTestA 给删掉,所以最终看到的就是上面那两张控制台图片的效果了,订阅关系来回变动,来回被修改。其实从源码中也能看出了 RocketMQ 就是把一个消费者组里面的消费者按相同的订阅关系来处理的。

下面我们再来看下第 1 个问题,为什么 topic 下面的队列没办法完全分配给消费者,在看源码之前先把 broker 的输出给删掉,不然有干扰。

我们再来回顾下 rebalanceByTopic 这个方法,看看是如何分配队列的。
在这里插入图片描述
可以看到,集群模式下,一开始就获取当前消费者要负载均衡的 topic 下面的队列,然后再获取消费者组的所有消费者的 clientId 集合,所以这里我们大概就能猜到了,队列肯定是有四个队列的,但问题是这个消费者组下面有两个消费者,所以当前消费者只能获取到两个队列,我们可以打印下日志。
在这里插入图片描述
在这里插入图片描述

分别在一开始、分配到队列以及处理完分配队列之后来输出下日志到控制台,我们来看看队列的分配情况,首先启动 ConsumerA,这时候不启动 ConsumerB。
在这里插入图片描述
可以看到 ConsumerA 的分配是没有问题的,那么这时候再启动 ConsumerB。
在这里插入图片描述
可以看到这时候 ConsumerA 就只分配到了 [0,1] 两个消息队列,对应的 ConsumerB 也一样,只能分配到 [2,3] 两个队列。
在这里插入图片描述
所以到这里我们就清楚为什么 TopicTestA 和 TopicTestB 下面都只有两个队列能分配到,剩下两个队列没办法被消费了,就是因为两个不同 topic 的消费者都参与负载均衡了。这种情况就会导致消息队列里面的消息一直堆积,没办法被消费。


7. 小结

上面 5、6 小节我们就演示并从源码层面去理解订阅关系不一致会发生什么情况,RocketMQ 官方也有专门的文章来说明这一点:订阅关系一致,有兴趣可以看下这篇文章。





如有错误,欢迎指出!!!!

相关文章:

  • 【分布式技术】Bearer Token以及MAC Token深入理解
  • 《HTTP权威指南》 第7章 缓存
  • 算法入门——排序算法详解(C++实现)
  • ANN、CNN、RNN 深度解析
  • Java面试复习:Java基础、OOP与并发编程精要
  • Coilcraft电感上的横线是什么意思?电感有方向么?
  • 每日算法刷题Day35 6.22:leetcode枚举技巧枚举中间2道题,用时1h
  • 用可观测工具高效定位和查找设计中深度隐藏的bug
  • 跨平台高稳定低延迟的RTSP转RTMP推送方案实践
  • 抖音小程序开发:ttml和传统html的区别
  • 选择大于努力,是学习FPGA硬件设计还是学习软件设计?
  • aws(学习笔记第四十六课) codepipeline-build-deploy
  • 【代码解析】opencv 安卓 SDK sample - 1 - HDR image
  • 基于51单片机的智能药物盒proteus仿真
  • KES数据库部署工具使用
  • Google DeepMind 的 “心智进化”(Mind Evolution)
  • LabVIEW机器视觉零件检测
  • react day.js使用及经典场景
  • 针对数据仓库方向的大数据算法工程师面试经验总结
  • Elasticsearch Kibana (一)
  • 新广告法 做网站的/域名交易平台
  • 湛江低价网站建设/郑州厉害的seo优化顾问
  • 企业营销推广/爱站网seo
  • 大神自己做的下载音乐的网站/站内推广的方法和工具
  • 衡水哪个公司做网站好/合肥网站推广优化公司
  • wordpress添加中文语言/搜索引擎关键词优化有哪些技巧