当前位置: 首页 > news >正文

深入剖析 RocketMQ 的 ConsumerOffsetManager

在消息队列系统中,准确记录和管理消息的消费进度是保障系统可靠性和数据一致性的关键。RocketMQ 作为一款高性能、高可用的分布式消息队列,其 ConsumerOffsetManager 组件在消费进度管理方面发挥着至关重要的作用。本文将详细介绍 ConsumerOffsetManager 的主要功能、重要属性和核心方法。

1. 概述

ConsumerOffsetManager 是 RocketMQ Broker 端的一个重要组件,主要负责管理消费者的消费偏移量(offset)。消费偏移量是指消费者在消息队列中已经消费到的位置,通过管理这个偏移量,ConsumerOffsetManager 可以确保消费者在重启或故障恢复后能够从正确的位置继续消费消息,避免消息的重复消费或遗漏。

2. 主要属性

这是一个核心的属性,其类型为 ConcurrentMap<String, ConcurrentMap<Integer, Long>>。它是一个双层的并发映射结构,外层的键是 topic@consumerGroup 的组合字符串,用于唯一标识一个主题和消费组的组合;内层的键是队列 ID(Integer 类型),值是该队列对应的消费偏移量(Long 类型)。这个属性存储了所有消费者在各个队列上的消费进度信息

3. 主要方法

ConsumerOffsetManager继承于ConfigManager,里面会应用到ConfigManager中的一一些方法

//随着consumer不断的从broker这里消费topic中的queue数据,此时需要进行记录consumer对topic中的每个queue的消费到了哪个位置 offset
//对消费偏移量的管理就是这个ConsumerOffsetManager组件
public class ConsumerOffsetManager extends ConfigManager {
}

3.1 load()

该方法用于从磁盘文件中加载偏移量信息到内存中的 offsetTable 中。在 Broker 启动时,会调用这个方法,将之前持久化的偏移量信息恢复到内存,以便继续管理消费者的消费进度。如果加载成功,方法返回 true;否则返回 false

 public boolean load() {
        String fileName = null;
        try {
            //获取到配置文件的地址 是从子类中进行获取配置文件的地址
            fileName = this.configFilePath();
            //读取文件内容位一个大的json字符串
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                //解码
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
    }

3.2persist()

persist 方法用于将内存中的偏移量信息持久化到磁盘文件中。ConsumerOffsetManager 会定期调用这个方法,将 offsetTable 中的数据写入到 configFilePath 指向的文件中,以确保数据的持久化和可靠性。

   /**
     * 子类文件中的核心数据进行持久化操作
     * 每次进行持久化的时候,老文件的内容进行写入到.bak文件中做备份
     * 新文件的内容写入到.tmp文件中,老文件删除,tmp文件改名为新文件
     */
    public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            String fileName = this.configFilePath();
            try {
                //把json文件写入到磁盘中
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

 3.3 commitOffset()

此方法用于提交消费者的消费偏移量。当消费者成功消费一批消息后,会向 Broker 发送偏移量提交请求,Broker 会调用 commitOffset 方法将新的偏移量更新到 offsetTable 中。参数 group 是消费组名称,topic 是主题名称,queueId 是队列 ID,offset 是新的消费偏移量

 /**
     * 提交offset(消费的偏移量)
     * @param clientHost 客户端地址 机器ip
     * @param group 消费者组
     * @param topic topic的名字
     * @param queueId topic下的队列id
     * @param offset  消费的偏移量
     */
    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    /**
     * 提交offset(消费的偏移量)
     * @param clientHost 客户端地址 机器ip
     * @param key  offsetTable这个Map中key
     * @param queueId topic下的队列id
     * @param offset 消费的偏移量
     */
    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            //如果是空 证明是第一次来进行提交offset 第一次的话就初始化一个Map 把消费的offset给放入到初始化的map中
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            //不为空 更新一下消费者 对传入的queueId消费的偏移量
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

3.4cloneOffset()

cloneOffset 方法的主要功能是将一个消费组在某个主题下的偏移量数据克隆到另一个消费组。在某些场景下,例如需要对消费进度进行备份或者将某个消费组的消费进度复制给新的消费组时,这个方法就非常有用。

 /**
     * 克隆offset
     * @param srcGroup 源的消费者组
     * @param destGroup 目标的消费者组
     * @param topic topic的名字
     */
    public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
        ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
        if (offsets != null) {
            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
        }
    }

3.5removeOffset()

removeOffset 方法用于移除某个消费组在指定主题下的偏移量信息。当某个消费组不再需要消费某个主题的消息,或者需要清理过期的偏移量数据时,可以使用该方法。

    public void removeOffset(final String group) {
        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
            String topicAtGroup = next.getKey();
            if (topicAtGroup.contains(group)) {
                String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
                if (arrays.length == 2 && group.equals(arrays[1])) {
                    it.remove();
                    log.warn("clean group offset {}", topicAtGroup);
                }
            }
        }

    }

3.6 queryOffset()

此方法用于查询特定消费组在指定主题和队列上的消费偏移量。在 RocketMQ 集群消费模式下,它能让消费者清楚当前在某个队列消费到了什么位置,从而保证消息消费的连续性与准确性
  public Map<Integer, Long> queryOffset(final String group, final String topic) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        return this.offsetTable.get(key);
    }

3.7queryMinOffsetInAllGroup()

这个方法用于查询某个主题下所有消费组在指定队列上的最小消费偏移量。在某些场景下,比如清理消息时,需要知道所有消费组的最小消费位置,以确保不会删除还未被消费的消息。

  /**
     * 在所有的消费者组中查询最小的offset
     *  意思是 topic中每个queue被消费的最小的offset是什么
     * @param topic  topic的名字
     * @param filterGroups 要排除的一写消费者组的信息
     * @return
     */
    public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
        Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
        // 先从offsetTable中过滤掉一些不需要进行查询的数据
        Set<String> topicGroups = this.offsetTable.keySet();
        if (!UtilAll.isBlank(filterGroups)) {
            for (String group : filterGroups.split(",")) {
                Iterator<String> it = topicGroups.iterator();
                while (it.hasNext()) {
                    if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
                        it.remove();
                    }
                }
            }
        }

        // 遍历offsetTable
        for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
            String topicGroup = offSetEntry.getKey();
            String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
            // 判断topic跟指定的查询的topic是否相等
            if (topic.equals(topicGroupArr[0])) {
                for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
                    //查询当前topic的队列的最小offset
                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
                    // 如果消费者组里的offset大于等于最小的offset
                    if (entry.getValue() >= minOffset) {
                        Long offset = queueMinOffset.get(entry.getKey());
                        if (offset == null) {
                            queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));
                        } else {
                            queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));
                        }
                    }
                }
            }

        }
        return queueMinOffset;
    }

3.8scanUnsubscribedTopic()

在 RocketMQ 的实际运用中,消费者可能会动态地订阅或取消订阅某些主题。当一个主题不再被任何消费者订阅时,其对应的消费偏移量数据就不再有存在的必要。scanUnsubscribedTopic方法会周期性地检查并清理这些不再使用的数据,避免数据冗余。

 /**
     * 去扫描没有被消费者订阅的topic
     */
    public void scanUnsubscribedTopic() {
        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
            String topicAtGroup = next.getKey();
            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
            if (arrays.length == 2) {
                // topic@group 第一个元素为topic 第二个元素为group(消费组)
                String topic = arrays[0];
                String group = arrays[1];
                // ConsumerManager consumer管理的组件 去看看当前消费组里是不是还有这个消费者连接过来对这个topic进行拉取和订阅
                if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
                        //这个消费组之前还有消费者拉取数据的时候,消费的offset已经落后最新的offset已经很多了
                    && this.offsetBehindMuchThanData(topic, next.getValue())) {
                    // 上面两个条件都满足的场景 这个消费组以前有消费者进行消费数据,但是现在没有消费者了,并且以前消费的offset已经落后了很多
                    // 把这个topic@group的消费偏移量信息从内存中移除
                    it.remove();
                    log.warn("remove topic offset, {}", topicAtGroup);
                }
            }
        }
    }

    /**
     * 判断在当前的topic的消费偏移量信息是否已经落后了很多
     * @param topic topic的名字
     * @param table map结果,map的key为queueId,map的value为消费的offset
     * @return
     */
    private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) {
        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
        boolean result = !table.isEmpty();

        while (it.hasNext() && result) {
            Entry<Integer, Long> next = it.next();
            //获取当前queueId在store(消息存储组件里)中的最小offset
            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, next.getKey());
            //某一个消费组对一个queue 消费的offset(偏移量)
            long offsetInPersist = next.getValue();
            result = offsetInPersist <= minOffsetInStore;
        }

        return result;
    }

4. 总结

ConsumerOffsetManager 是 RocketMQ 中一个非常重要的组件,它通过管理消费者的消费偏移量,确保了消息的准确消费和系统的可靠性。了解 ConsumerOffsetManager 的主要属性和方法,有助于开发者更好地理解 RocketMQ 的工作原理,优化消息消费的性能,以及处理各种异常情况。在实际应用中,合理配置 flushDelayOffsetInterval 等参数,可以在性能和数据可靠性之间找到最佳平衡点。

相关文章:

  • RK3568开发笔记-egtouch触摸屏ubuntu系统屏幕校准
  • vue3中,通过获取路由上的token直接进入首页,跳过登录页面
  • 【前端 vue 或者麦克风,智能语音识别和播放功能】
  • python八股(—) --FBV,CBV
  • Python元组
  • LeetCode面试经典150题
  • 《网络安全等级测评报告模版(2025版)》
  • 点云分割方法
  • vue3:十一、主页面布局(实现基本左侧菜单+右侧内容效果)
  • 万亿级数据量的OceanBase应用从JVM到协议栈立体化改造实现性能调优
  • 对比学习(Contrastive Learning)初接触
  • 通过仿真确定抗积分饱和策略的最佳系数
  • 《TCP/IP网络编程》学习笔记 | Chapter 20:Windows 中的线程同步
  • JVM垃圾回收笔记01-垃圾回收算法
  • ffmpeg介绍(一)——解封装
  • 如何让低于1B参数的小型语言模型实现 100% 的准确率
  • SQLMesh SCD-2 时间维度实战:餐饮菜单价格演化追踪
  • JAVA 之「优先队列」:大顶堆与小顶堆的实现与应用
  • aws(学习笔记第三十四课) dockerized-app with asg-alb
  • claude-3-7-sonnet-20250219 支持深度思考,流式输出
  • 上海工匠学院首届学历班56人毕业,新一届拟招生200人
  • 农林生物安全全国重点实验室启动建设,聚焦重大有害生物防控等
  • 王毅同印度国家安全顾问多瓦尔通电话
  • 价格周报|供需回归僵局,本周生猪均价与上周基本持平
  • 巴基斯坦空袭印度多地空军基地,巴战机进入印领空
  • 巴西总统卢拉将访华