当前位置: 首页 > wzjs >正文

新手做站必看 手把手教你做网站seo工具在线访问

新手做站必看 手把手教你做网站,seo工具在线访问,淘宝官网首页入口电脑版网址,做网站花了2万多一、引言 在分布式消息队列系统中,消息存储的可靠性和高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设…

一、引言

在分布式消息队列系统中,消息存储的可靠性高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设计思想、实现原理及优化策略,揭示其如何支撑 RocketMQ 实现每秒数十万级消息处理能力。

二、CommitLog 的核心定位

1. 什么是 CommitLog?

CommitLog 是 RocketMQ 消息存储的核心物理文件,所有 Topic 的消息均以顺序追加(Append-Only)的方式写入同一个 CommitLog 文件。这种设计颠覆了传统“每个 Topic 独立存储”的模式,通过统一的顺序写机制最大化磁盘 I/O 效率。

2. 设计哲学
  • 全局顺序写:所有消息顺序写入,规避磁盘随机 I/O 性能瓶颈。

  • 物理连续存储:消息按到达顺序连续存储,消除碎片化。

  • 逻辑与物理分离:消息的消费队列(ConsumeQueue)和索引文件(IndexFile)作为逻辑视图,与物理存储解耦。

三、CommitLog 的物理结构

 1. 文件组织

  • 文件路径${ROCKET_HOME}/store/commitlog/

  • 文件命名:以文件初始偏移量命名(如 00000000000000000000

  • 固定大小:默认 1GB(可配置),写满后创建新文件

2. 消息存储格式

每条消息在 CommitLog 中的存储结构如下

字段长度(字节)说明
Total Size4消息总长度
Magic Code4固定值 0xAABBCCDD
Body CRC4消息体 CRC 校验码
Queue ID4消息所属队列 ID
Flag4消息标记(事务/压缩等)
Queue Offset8在 ConsumeQueue 的物理偏移
Physical Offset8在 CommitLog 的物理偏移
SysFlag4系统标记(压缩/事务等)
Born Timestamp8消息生成时间戳
Born Host8生产者地址
Store Timestamp8存储时间戳
Store Host8Broker 地址
Reconsume Times4重试次数
Prepared Transaction Offset8事务相关偏移量
Body Length4消息体长度
BodyBody Length消息体内容
Topic Length1Topic 名称长度
TopicTopic LengthTopic 名称
Properties Length2属性字段长度
Properties变长扩展属性(Key-Value) 

四、commitLog方法属性介绍

主要属性:

/*** 映射文件的队列*/protected final MappedFileQueue mappedFileQueue;/*** 所属的消息存储组件*/protected final DefaultMessageStore defaultMessageStore;/*** 刷新CommitLog服务的组件*/private final FlushCommitLogService flushCommitLogService;/*** 缓冲区  刷新到CommitLog服务*///If TransientStorePool enabled, we must flush message to FileChannel at fixed periods//如果开启了TransientStorePool 我们必须有一个固定的频率刷新消息到FileChannelprivate final FlushCommitLogService commitLogService;/*** 追加消息的回调函数*/private final AppendMessageCallback appendMessageCallback;/*** 写入消息的线程本地副本*/private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;/*** topic-queueid -> offset*/protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);/*** topic-queueid -> offset*/protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);/*** 确认偏移量*/protected volatile long confirmOffset = -1L;/*** 锁开始的时间*/private volatile long beginTimeInLock = 0;/*** 写入消息的锁*/protected final PutMessageLock putMessageLock;/*** 完成的存储路径*/private volatile Set<String> fullStorePaths = Collections.emptySet();/*** 多路分发的组件*/protected final MultiDispatch multiDispatch;/*** 刷新磁盘的监控组件*/private final FlushDiskWatcher flushDiskWatcher;

主要方法:

1.构造函数

public CommitLog(final DefaultMessageStore defaultMessageStore) {//commitLog的存储路径String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();//是否包含一个切割的符号 存储路径里如果包含了一个切割符号 逗号 那么就走这个多个路径的MappedFile队列//正常情况下走的是普通的MappedFile队列if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);} else {this.mappedFileQueue = new MappedFileQueue(storePath, //存储路径defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), //mappedFileSize 文件的大小 默认是1GdefaultMessageStore.getAllocateMappedFileService()); //分配文件的服务}this.defaultMessageStore = defaultMessageStore;//如果是走同步刷盘的策略的话 就是GroupCommitService 如果是异步刷盘就是FlushRealTimeServiceif (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();} else {this.flushCommitLogService = new FlushRealTimeService();}//this.commitLogService = new CommitRealTimeService();this.appendMessageCallback = new DefaultAppendMessageCallback();putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {@Overrideprotected PutMessageThreadLocal initialValue() {return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());}};//写入消息的时候是否默认使用可重入的锁 默认是truethis.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();this.multiDispatch = new MultiDispatch(defaultMessageStore, this);flushDiskWatcher = new FlushDiskWatcher();}

2. start方法

    //对commitLog组件进行启用public void start() {// 启动刷盘服务this.flushCommitLogService.start();//设置刷新磁盘的监控组件flushDiskWatcher.setDaemon(true);flushDiskWatcher.start();//如果启动了瞬时池化技术if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}

3.recoverNormally方法

可以参考之前的博客:

https://blog.csdn.net/u013127325/article/details/147397018

4.asyncPutMessage方法

可以参考之前的博客:

RocketMQ CommitLog异步写入机制深度解析:从asyncPutMessage看亿级消息吞吐设计-CSDN博客

 

http://www.dtcms.com/wzjs/297137.html

相关文章:

  • 广州哪家公司做网站好产品销售方案与营销策略
  • web网站建设原则软文推广500字
  • 电影网站加盟可以做么最优化方法
  • 企业做网站要大连今日新闻头条
  • 阿联酋网站后缀为企业策划一次网络营销活动
  • 谷歌推广网站建设百度怎么联系客服
  • 外包appseo免费诊断电话
  • 校园网站建设的系统分析郑州seo优化顾问热狗
  • 网站上的地图导航怎么做个人博客登录入口
  • 网站产品介绍长图哪个软件做的台州seo快速排名
  • 成都眉山网站建设网络营销推广活动
  • 怎么做自己网站的后台成人大学报名官网入口
  • 网站logo怎么做的湖北seo诊断
  • 网站建设xywlcn网站建设介绍ppt
  • 建站宝盒全能版慧生活798app下载
  • 网站指向邮箱超链接怎么做临沂百度seo
  • 用tomcat做网站目录seo初级入门教程
  • 众筹那些网站可以做搜狗seo刷排名软件
  • 公司注册网上办理在seo优化中
  • 建设银行官网首页网站招聘网络营销类型有哪些
  • 海外电商有哪些平台小时seo百度关键词点击器
  • 建设pc 移动网站关键词查找的方法有以下几种
  • 织梦网站如何生成伪静态洛阳网站seo
  • b s网站系统如何做性能测试windows优化大师自动下载
  • 做我的世界缩略图的网站个人seo怎么赚钱
  • 陕icp网站建设郑州seo哪家好
  • 网站怎么提高收录外贸建站与推广
  • 专做教育网站拿站百度免费安装
  • pc网站做app哪个平台推广效果好
  • 如何看织梦做的网站的源码站长工具忘忧草社区