当前位置: 首页 > wzjs >正文

.net网站开发的例子承接网站建设服务

.net网站开发的例子,承接网站建设服务,国外域名注册哪家比较好,百度网网站建设的目标一、引言 在分布式消息队列系统中,消息存储的可靠性和高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设…

一、引言

在分布式消息队列系统中,消息存储的可靠性高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设计思想、实现原理及优化策略,揭示其如何支撑 RocketMQ 实现每秒数十万级消息处理能力。

二、CommitLog 的核心定位

1. 什么是 CommitLog?

CommitLog 是 RocketMQ 消息存储的核心物理文件,所有 Topic 的消息均以顺序追加(Append-Only)的方式写入同一个 CommitLog 文件。这种设计颠覆了传统“每个 Topic 独立存储”的模式,通过统一的顺序写机制最大化磁盘 I/O 效率。

2. 设计哲学
  • 全局顺序写:所有消息顺序写入,规避磁盘随机 I/O 性能瓶颈。

  • 物理连续存储:消息按到达顺序连续存储,消除碎片化。

  • 逻辑与物理分离:消息的消费队列(ConsumeQueue)和索引文件(IndexFile)作为逻辑视图,与物理存储解耦。

三、CommitLog 的物理结构

 1. 文件组织

  • 文件路径${ROCKET_HOME}/store/commitlog/

  • 文件命名:以文件初始偏移量命名(如 00000000000000000000

  • 固定大小:默认 1GB(可配置),写满后创建新文件

2. 消息存储格式

每条消息在 CommitLog 中的存储结构如下

字段长度(字节)说明
Total Size4消息总长度
Magic Code4固定值 0xAABBCCDD
Body CRC4消息体 CRC 校验码
Queue ID4消息所属队列 ID
Flag4消息标记(事务/压缩等)
Queue Offset8在 ConsumeQueue 的物理偏移
Physical Offset8在 CommitLog 的物理偏移
SysFlag4系统标记(压缩/事务等)
Born Timestamp8消息生成时间戳
Born Host8生产者地址
Store Timestamp8存储时间戳
Store Host8Broker 地址
Reconsume Times4重试次数
Prepared Transaction Offset8事务相关偏移量
Body Length4消息体长度
BodyBody Length消息体内容
Topic Length1Topic 名称长度
TopicTopic LengthTopic 名称
Properties Length2属性字段长度
Properties变长扩展属性(Key-Value) 

四、commitLog方法属性介绍

主要属性:

/*** 映射文件的队列*/protected final MappedFileQueue mappedFileQueue;/*** 所属的消息存储组件*/protected final DefaultMessageStore defaultMessageStore;/*** 刷新CommitLog服务的组件*/private final FlushCommitLogService flushCommitLogService;/*** 缓冲区  刷新到CommitLog服务*///If TransientStorePool enabled, we must flush message to FileChannel at fixed periods//如果开启了TransientStorePool 我们必须有一个固定的频率刷新消息到FileChannelprivate final FlushCommitLogService commitLogService;/*** 追加消息的回调函数*/private final AppendMessageCallback appendMessageCallback;/*** 写入消息的线程本地副本*/private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;/*** topic-queueid -> offset*/protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);/*** topic-queueid -> offset*/protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);/*** 确认偏移量*/protected volatile long confirmOffset = -1L;/*** 锁开始的时间*/private volatile long beginTimeInLock = 0;/*** 写入消息的锁*/protected final PutMessageLock putMessageLock;/*** 完成的存储路径*/private volatile Set<String> fullStorePaths = Collections.emptySet();/*** 多路分发的组件*/protected final MultiDispatch multiDispatch;/*** 刷新磁盘的监控组件*/private final FlushDiskWatcher flushDiskWatcher;

主要方法:

1.构造函数

public CommitLog(final DefaultMessageStore defaultMessageStore) {//commitLog的存储路径String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();//是否包含一个切割的符号 存储路径里如果包含了一个切割符号 逗号 那么就走这个多个路径的MappedFile队列//正常情况下走的是普通的MappedFile队列if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);} else {this.mappedFileQueue = new MappedFileQueue(storePath, //存储路径defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), //mappedFileSize 文件的大小 默认是1GdefaultMessageStore.getAllocateMappedFileService()); //分配文件的服务}this.defaultMessageStore = defaultMessageStore;//如果是走同步刷盘的策略的话 就是GroupCommitService 如果是异步刷盘就是FlushRealTimeServiceif (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();} else {this.flushCommitLogService = new FlushRealTimeService();}//this.commitLogService = new CommitRealTimeService();this.appendMessageCallback = new DefaultAppendMessageCallback();putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {@Overrideprotected PutMessageThreadLocal initialValue() {return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());}};//写入消息的时候是否默认使用可重入的锁 默认是truethis.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();this.multiDispatch = new MultiDispatch(defaultMessageStore, this);flushDiskWatcher = new FlushDiskWatcher();}

2. start方法

    //对commitLog组件进行启用public void start() {// 启动刷盘服务this.flushCommitLogService.start();//设置刷新磁盘的监控组件flushDiskWatcher.setDaemon(true);flushDiskWatcher.start();//如果启动了瞬时池化技术if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}

3.recoverNormally方法

可以参考之前的博客:

https://blog.csdn.net/u013127325/article/details/147397018

4.asyncPutMessage方法

可以参考之前的博客:

RocketMQ CommitLog异步写入机制深度解析:从asyncPutMessage看亿级消息吞吐设计-CSDN博客

 

http://www.dtcms.com/wzjs/579689.html

相关文章:

  • 网站地图怎么提交网络推广十大平台
  • 学设计的视频网站新手seo网站做什么类型好
  • 网站制作公司 北京房产机构网站建设目标定位
  • 东莞网站优化电话网站是做流程图
  • 美创网站建设优势wordpress正版插件
  • 杭州哪家公司做网站比较好甘肃泾川县门户网站两学一做
  • 做艺术品展览的网站石家庄制作网站软件
  • 单位做网站需要准备什么如何查询网站备案时间
  • 大型门户网站是这样炼成的源代码登封建设局网站
  • 学校网站建设注意点瑞安网
  • 合肥网站建设合肥网络口碑营销案例
  • 贵州省建设银行网站手机版网站源码
  • 哈尔滨建立网站公司现在做网站建设都是自建
  • 开发网站的语言外贸网站后台
  • 那个网站做图片微信公众号推广软文案例
  • 男人需要网站网页设计快速培训
  • 2_ 如何写一份详细的网站开发方案做网站如何来钱
  • 中企动力制作的网站后台企业营销网站服务器1g够
  • 泰州市住房和城乡建设局网站好的建站平台
  • 简单网站开发实例上海松江品划建设网站
  • 广州建设六马路小学网站抚州建设银行网站
  • 网站版权该怎么做呢可以直接进入的正能量网站
  • 台州自助建站系统学院网站建设目标
  • 南通网站建设方案书手机电视网站大全
  • 阿里云一个域名做两个网站国内大型电子网站建设
  • 成都中小企业网站建设手机做网站视频
  • 南昌网站建设培训班做mla网站
  • 服务器上的网站做代理的项目在哪个网站
  • 容县网站建设免费建网站服务最好的公司
  • 网站如何做线下推广微信广告投放推广平台