当前位置: 首页 > news >正文

RocketMQ 中 DefaultMessageStore 的 AllocateMappedFileService 属性详解

一、引言

在 RocketMQ 这个高性能分布式消息队列系统里,DefaultMessageStore 是消息存储的核心实现类,承担着消息的持久化存储、检索等关键任务。而 AllocateMappedFileService 作为 DefaultMessageStore 中的一个重要属性,在文件分配和管理方面发挥着至关重要的作用。本文将深入剖析 AllocateMappedFileService 的属性和方法,带你了解它在 RocketMQ 消息存储机制中的工作原理和重要意义。

二、AllocateMappedFileService 概述

AllocateMappedFileService 是一个后台服务,其主要职责是预先分配物理文件,也就是 MappedFile 实例。在 RocketMQ 中,消息会被存储在一系列的物理文件中,为了避免在消息写入时临时创建文件带来的性能开销,AllocateMappedFileService 会提前将这些文件分配好,并进行初始化,以确保消息能够快速、高效地写入。

三、核心属性

    //日志组件
   private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    //等待超时时间 默认5秒
    private static int waitTimeOut = 1000 * 5;

    //分配请求映射表
    private ConcurrentMap<String, AllocateRequest> requestTable =
        new ConcurrentHashMap<String, AllocateRequest>();

    //分配请求的队列
    private PriorityBlockingQueue<AllocateRequest> requestQueue =
        new PriorityBlockingQueue<AllocateRequest>();

    //是否有异常的标识
    private volatile boolean hasException = false;

    //外层消息存储的组件
    private DefaultMessageStore messageStore;

四、核心方法

1.putRequestAndReturnMappedFile方法

方法概述

putRequestAndReturnMappedFile 方法的主要作用是把文件分配请求添加到请求队列中,并且等待文件分配完成,最终返回分配好的 MappedFile 实例。该方法通过线程同步机制,确保调用者在请求文件分配后,能够等待分配操作完成并获取到可用的文件。

方法源码分析

 /**
     * 把一个分配mappedfile请求放入进来 以及返回一个分配出来的mappedfile
     * @param nextFilePath 下一个文件的路径
     * @param nextNextFilePath 下下一个文件的路径
     * @param fileSize 文件的大小
     * @return
     */
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        //能够请求分配线程的数量 默认是2
        //一次分配请求,是针对两个磁盘文件申请的MappedFile,nextFilePath,nextNextFilePath
        int canSubmitRequests = 2;
        //是否启用了瞬时存储的池化技术 默认是不开启的 必须手动开启 必须是异步刷盘 必须是master节点
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            //如果存储池中没有可用的buffer 是否启用了fast fail 而且broker角色不能使slave
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                //获取到瞬时存储池中可用的buffer数量,减去请求队列中数量,获取到可以进行提交的请求数量
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            }
        }

        //构建一个分配请求
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        //把请求放入到请求映射表
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        //写入映射表成功
        if (nextPutOK) {
            //如果可以提交请求的数量小于等于0
            if (canSubmitRequests <= 0) {
                //此时队列中剩余请求数量过多,此时就会移除请求
                //移除请求映射表
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            //如果可以提交请求,那么请求入队
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            //提交请求的数量减1
            canSubmitRequests--;
        }

        //针对下下个文件进行创建分配请求
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        //把下下个文件给添加到请求映射表 并进行入队
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }
        //从请求映射表里获取第一个文件的分配请求,如果此时不为null,那么说明第一个文件分配请求都没有完成
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                //通过countDownLatch 等待5秒钟 如果5秒内还没分配完成 直接返回null
                //但是呢,上面代码刚把第一个请求文件分配请求写入到队列里去,立马就被当前服务线程拿出来分配好了,映射表里不会移除这个请求
                //此时拿到分配请求的结果,此时的countDownLatch应该已经写入了1 此时你才可以countdown一下
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    //说明第一个文件已经处理完毕了 从映射表里移除这个请求 给返回对应的MappedFile
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }

        return null;
    }

2.mmapOperation

方法功能概述

mmapOperation方法的主要功能是根据传入的文件分配请求AllocateRequest,创建一个新的内存映射文件,并对其进行必要的初始化操作,最后返回创建好的MappedFile对象。

方法代码分析

 private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            //从队列中获取分配到请求
            req = this.requestQueue.take();
            //从请求映射表里获取到分配请求
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }
            //还没分配
            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                //是否开启了瞬时存储池化技术
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        //通过jdk提供的ServiceLoader加载一些MappedFile
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        //还需要对拿到的MappedFile进行初始化
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    //直接创建 MappedFile 常规化的操作
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
                if (elapsedTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }

                // pre write mappedFile
                //如果满足了一定的条件之后 会对MappedFile进行预热 提前将磁盘数据加载到内存区域里
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMappedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();
        }
        return true;
    }

五、工作流程

  1. 请求生成:当 DefaultMessageStore 检测到需要新的文件来存储消息时,会创建一个 AllocateRequest 对象,并将其放入 AllocateMappedFileService 的 requestQueue 中。

  2. 文件分配AllocateMappedFileService 的线程会不断从 requestQueue 中取出请求,调用 mmapOperation 方法进行文件分配。

  3. 同步返回:调用 putRequestAndReturnMappedFile 方法的线程会等待文件分配完成,分配成功后会返回 MappedFile 实例供消息写入使用。

  4. 服务停止:当 DefaultMessageStore 关闭时,会调用 AllocateMappedFileService 的 destroy 方法,停止服务并清理资源。

六、总结

AllocateMappedFileService 作为 DefaultMessageStore 中的重要属性,通过预先分配物理文件的方式,大大提高了 RocketMQ 消息存储的性能和效率。其核心属性和方法协同工作,确保了文件分配操作的有序进行和资源的合理利用。深入理解 AllocateMappedFileService 的工作原理,有助于我们更好地优化 RocketMQ 的性能和解决相关的存储问题。在实际应用中,我们可以根据具体的业务场景和系统需求,对 AllocateMappedFileService 的参数和配置进行调整,以达到最佳的性能表现。

相关文章:

  • 【Linux】Linux 权限:数字背后的神秘 “门禁卡” 系统
  • 剖析Spring中的设计模式(一) | 工厂观察者
  • 【零基础玩转多模态AI:Gemma3 27B开源视觉模型本地部署与远程访问】
  • 全星APQP软件:为用户提供高效、合规、便捷的研发管理体验
  • HDLBIT知识点
  • 探索 Vue 3 响应式系统:原理与实践
  • 蓝桥杯电子赛_E2PROM(AT24C02)
  • Agent 2 Agent VS MCP
  • 【C++】深拷贝与浅拷贝
  • GitHub 趋势日报 (2025年04月08日)
  • C语言精讲-12
  • 【Linux】基础开发工具
  • 八大可商用桌面客户端应用开发框架深度指南-优雅草卓伊凡
  • 操作系统基础:05 系统调用实现
  • playwright 教程高级篇:掌握网页自动化与验证码处理等关键技术详解
  • [数据结构]排序 --2
  • 【C++】C++的引用
  • 在 Ubuntu 下通过 Docker 部署 Caddy 服务器
  • C++双链表介绍及实现
  • 从输入URL到页面渲染:浏览器请求的完整旅程解析
  • 巴基斯坦称成功拦截印度导弹,空军所有资产安全
  • 上海国际电影节推出三大官方推荐单元,精选十部优秀影片
  • 壹基金发布2024年度报告,公益项目惠及937万人次
  • 一周文化讲座|城市移民与数字时代的新工作
  • 圆桌丨权威专家解读中俄关系:在新形势下共同应对挑战、共创发展机遇
  • 同观·德国|默茨当总理后,能否带领德国在欧盟“说了算”?