当前位置: 首页 > wzjs >正文

洛阳网站建设哪家权威杭州网站推广优化公司

洛阳网站建设哪家权威,杭州网站推广优化公司,甘肃城乡建设部网站首页,小说网站的里面的搜索是怎么做的1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除 前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。 ConsumeQ…

1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除

前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。

ConsumeQueue 可以快速定位消息,IndexFile 可以快速定位 ConsumeQueue 中的位置,并随时实时更新,以保证消息的消费速度。

ConsumeQueue 和 IndexFile 采用了压缩存储和分片存储的方式,消费者可以仅仅消费需要的消息,从而节省存储空间。

ConsumeQueue 可以快速定位消息,减少消息传递所消耗的网络带宽,提高消息传递效率。

消息分发

在执行Broker的启动方法会去启动消息分发服务,简单理解就是为消息建立一种索引更加快速有效的消费查询消息。
broker处理消息分发的类是ReputMessageService,启动一个线程不断地将commitLong分到到对应的consumerQueue与indexFile,消息分发流程处理完毕;


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 休眠1s,进行分发ConsumeQueue和IndexFileThread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

消息每隔1s进行消息的分发,尝试去构建ConsumerQueue索引与

IndexFile索引。
private void doReput() {if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}// 如果分发偏移量小于commitlog的最大物理偏移量,那么循环分发for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 从commitLog中获取需要分发的消息SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// CommitLog日志分发到ConsumeQueue和IndexFileDefaultMessageStore.this.doDispatch(dispatchRequest);// 长轮询:如果有消息到了主节点,并且开启了长轮询。(消息拉取逻辑后面单独讲)if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {//messageArrivingListener实例是NotifyMessageArrivingListener//通知被hold住的消费者拉取消息的请求DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {// 如果等于0,表示读取到MappedFile文件尾// 获取下一个文件的起始索引this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}
}

通过getData()方法来获取需要进行分发的消息,根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer。
依次进行消息的分发调用doDispatch方法,把消息写入ConsumerQueue与IndexFile中。
如果当前节点是主节点且开启长轮询,则调用messageArrivingListener.arriving方法进行消息的投递,具体长轮询的方案后期介绍。
构建ConsumerQueue与IndexFile


public void doDispatch(DispatchRequest req) {//dispatchList中包含CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,// 分别用来构建ConsumeQueue文件和IndexFile文件。for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

调用dispatch进行消息的分发,其中包括三个实现类。

CommitLogDispatcherBuildConsumeQueue类:写ConsumeQueue文件,构建ConsumeQueue索引。

CommitLogDispatcherBuildIndex类:写IndexFile文件,构建IndexFile索引。

CommitLogDispatcherCalcBitMap类:构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。

消息清理

当消息到达一定程度就会被RocketMQ丢弃,而在Broker启动会去添加定时删除文件的任务,60s之后执行第一个后面每隔10s执行一次。

// 过期文件删除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 清除方法DefaultMessageStore.this.cleanFilesPeriodically();}
}private void cleanFilesPeriodically() {// 删除CommitLog文件this.cleanCommitLogService.run();// 清除ConsumerQueue文件和IndexFile文件this.cleanConsumeQueueService.run();
}

清除的主要逻辑:首先清除CommitLog文件,然后在清除ConsumerQueue文件和IndexFile文件。

CommitLog文件的清除
// 定期删除CommitLog文件的地方
public void run() {try {// 删除文件this.deleteExpiredFiles();// 删除挂起文件this.redeleteHangedFile();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {int deleteCount = 0;long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();//K2 Broker触发文件删除的三个条件boolean timeup = this.isTimeToDelete(); // deleteWhen是否达到,配置项boolean spacefull = this.isSpaceToDelete(); // 判断磁盘是否达到删除,diskmax配置项boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 手动删除次数// 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",fileReservedTime,timeup,spacefull,manualDeleteFileSeveralTimes,cleanAtOnce);fileReservedTime *= 60 * 60 * 1000;deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}}
}

首先删除文件需要满足的要求是其一deleteWhen是否达到默认是04(可配置),其二判断磁盘是否达到删除默认75(可配置),其三手动删除。

触发过期⽂件删除时,有两个检查的纬度,⼀个是,是否到了触发删除的时间,就是broker.conf⾥配置的deleteWhen属性。另外还会检查磁盘利⽤率,达到阈值也会触发过期⽂件删除。这个阈值默认是75%,可以在broker.conf⽂件当中定制。但是最⼤值为95,最⼩值为10。

注意:观察源码可得 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。

如果删除的时候我们的文件正在被使用这就导致删除不会生效此时就需要redeleteHangedFile方法区删除挂起文件。

ConsumerQueueIndexFile清理
public void run() {try {// 删除过期的filethis.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {// 间隔100int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {this.lastPhysicalMinOffset = minOffset;ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {// 进行consumerQueue删除int deleteCount = logic.deleteExpiredFile(minOffset);if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {try {Thread.sleep(deleteLogicsFilesInterval);} catch (InterruptedException ignored) {}}}}// 进行IndexFile删除DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);}
}

在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。

2、Broker端整体流程以及文件索引结构

至此我们的Producer发送消息以及Broker如何处理存储消息以及介绍完了,首先我们需要去整理一下整个流程,然后后面再来分析Consumer端的消费,以及各种消息类型的案例源码等。

发送端流程总结

生产者发送消息写入内存推送给Broker进行刷盘入CommitLog文件,同时处理分发将消息索引存放到ConsumerQueue与IndexFile文件当中。

如果是主从架构就会去进行主从之间的同步。

接下来当消息达到一定条件就会触发删除机制,程序注册了一个定时任务首次60s执行后面每隔10s去执行一次清理工作。注意:删除文件并没有判断是否被消费过了只要达到要求就会删除继而导致消息的丢失,所以我们得去避免消息的积压。
在这里插入图片描述
索引结构

CommitLog⽂件的⼤⼩是固定的,但是其中每个存储的每个消息单元长度是不固定的,在源码CommitLog类的calMsgLength方法有计算消息长度的方法。

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个CommitLog⽂件,⽂件名为当前消息的偏移量。

ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件当中的消费进度会保存在config/consumerOffset.json⽂件当中。

ConsumerQueue=30w固定大小20byte的数据块组成(8字节msgPhyOffset表示起始位置+4字节msgSize文件占用长度+8字节magTagCode消息的tag的Hash值)。

IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。它的⽂件名⽐较特殊不以消息偏移量命名⽽是⽤的时间命名,但是其实它也是⼀个固定⼤⼩的⽂件。

IndexFile=固定40字节indexHeader+固定20字节的个数500w的slot+固定20字节最多个数500W*4的index。

indexHeader表示文件头,slots表示一系列的槽位,index表示真正的索引,槽位里面存放index,槽位紧挨着indexHeader而每个槽位里面最多存放4个index。

在这里插入图片描述
在这里插入图片描述

http://www.dtcms.com/wzjs/1283.html

相关文章:

  • 学校网站建设费用百度推广找谁做
  • 网站建设团队扬州新网站排名优化怎么做
  • 电子化业务管理与网站建设自己怎么做网页
  • 做画册找什么网站怎么推广引流客户
  • 网站定制开发与模版域名被墙检测
  • 百度给公司做网站效果咋样如何推广引流
  • 网站侧面菜单展开怎么做百度百家号
  • 小程序开发成本广州seo外包多少钱
  • 网站建设增值服务怎么查询搜索关键词
  • wordpress 支持 插件内江seo
  • 怎么做网站上翻译泰剧seo最强
  • 网站功能测试方法什么是搜索引擎优化seo
  • 世界网站排名网店营销策划方案
  • 潍坊建设网站的公司网络服务平台
  • 深圳动画设计制作哪些类型快速网站排名优化
  • 广州网站建设 讯度网络关键词搜索引擎
  • 怎么做微信网站吗企业网站管理系统怎么操作
  • WordPress资讯网站重庆网站建设软件
  • 好网站你知道线上推广的渠道和方法
  • 没网站做cpa关键词的选取原则
  • 广州哪里有网站建设百度联盟点击广告赚钱
  • 长沙网站seo收费查询seo
  • 优秀网站制作定制sem是什么基团
  • 网站建设合并但与那个天津百度关键词推广公司
  • 做有搜索功能的网站免费视频外链生成推荐
  • 什么网站需要备案厦门人才网唯一官方网站
  • 株洲企业网站制作获客引流100种方法
  • 阿里妈妈网站怎么做月嫂免费政府培训中心
  • 重庆营销网站建设公司营销策划品牌策划
  • 硬件开发教程宁波seo优化公司排名