当前位置: 首页 > wzjs >正文

随州网站建设多少钱排名软件

随州网站建设多少钱,排名软件,网站图片翻页怎么做,制作网站付款方式1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除 前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。 ConsumeQ…

1、Broker消息分发服务以及构建ConsumeQueue和IndexFile与消息清除

前面分析如何进行刷盘,本章分析Broker的消息分发以及构建ConsumerQueue和IndexFile,两者构建是为了能够提高效率,减少消息查找时间以及减少网络带宽与存储空间。

ConsumeQueue 可以快速定位消息,IndexFile 可以快速定位 ConsumeQueue 中的位置,并随时实时更新,以保证消息的消费速度。

ConsumeQueue 和 IndexFile 采用了压缩存储和分片存储的方式,消费者可以仅仅消费需要的消息,从而节省存储空间。

ConsumeQueue 可以快速定位消息,减少消息传递所消耗的网络带宽,提高消息传递效率。

消息分发

在执行Broker的启动方法会去启动消息分发服务,简单理解就是为消息建立一种索引更加快速有效的消费查询消息。
broker处理消息分发的类是ReputMessageService,启动一个线程不断地将commitLong分到到对应的consumerQueue与indexFile,消息分发流程处理完毕;


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 休眠1s,进行分发ConsumeQueue和IndexFileThread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

消息每隔1s进行消息的分发,尝试去构建ConsumerQueue索引与

IndexFile索引。
private void doReput() {if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}// 如果分发偏移量小于commitlog的最大物理偏移量,那么循环分发for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 从commitLog中获取需要分发的消息SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// CommitLog日志分发到ConsumeQueue和IndexFileDefaultMessageStore.this.doDispatch(dispatchRequest);// 长轮询:如果有消息到了主节点,并且开启了长轮询。(消息拉取逻辑后面单独讲)if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {//messageArrivingListener实例是NotifyMessageArrivingListener//通知被hold住的消费者拉取消息的请求DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {// 如果等于0,表示读取到MappedFile文件尾// 获取下一个文件的起始索引this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}
}

通过getData()方法来获取需要进行分发的消息,根据reputFromOffset的物理偏移量找到mappedFileQueue中对应的CommitLog文件的MappedFile,然后从该MappedFile中截取一段自reputFromOffset偏移量开始的ByteBuffer。
依次进行消息的分发调用doDispatch方法,把消息写入ConsumerQueue与IndexFile中。
如果当前节点是主节点且开启长轮询,则调用messageArrivingListener.arriving方法进行消息的投递,具体长轮询的方案后期介绍。
构建ConsumerQueue与IndexFile


public void doDispatch(DispatchRequest req) {//dispatchList中包含CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex,// 分别用来构建ConsumeQueue文件和IndexFile文件。for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);}
}

调用dispatch进行消息的分发,其中包括三个实现类。

CommitLogDispatcherBuildConsumeQueue类:写ConsumeQueue文件,构建ConsumeQueue索引。

CommitLogDispatcherBuildIndex类:写IndexFile文件,构建IndexFile索引。

CommitLogDispatcherCalcBitMap类:构建布隆过滤器,加速SQL92过滤效率,避免每次都解析sql。

消息清理

当消息到达一定程度就会被RocketMQ丢弃,而在Broker启动会去添加定时删除文件的任务,60s之后执行第一个后面每隔10s执行一次。

// 过期文件删除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 清除方法DefaultMessageStore.this.cleanFilesPeriodically();}
}private void cleanFilesPeriodically() {// 删除CommitLog文件this.cleanCommitLogService.run();// 清除ConsumerQueue文件和IndexFile文件this.cleanConsumeQueueService.run();
}

清除的主要逻辑:首先清除CommitLog文件,然后在清除ConsumerQueue文件和IndexFile文件。

CommitLog文件的清除
// 定期删除CommitLog文件的地方
public void run() {try {// 删除文件this.deleteExpiredFiles();// 删除挂起文件this.redeleteHangedFile();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {int deleteCount = 0;long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();//K2 Broker触发文件删除的三个条件boolean timeup = this.isTimeToDelete(); // deleteWhen是否达到,配置项boolean spacefull = this.isSpaceToDelete(); // 判断磁盘是否达到删除,diskmax配置项boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 手动删除次数// 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",fileReservedTime,timeup,spacefull,manualDeleteFileSeveralTimes,cleanAtOnce);fileReservedTime *= 60 * 60 * 1000;deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}}
}

首先删除文件需要满足的要求是其一deleteWhen是否达到默认是04(可配置),其二判断磁盘是否达到删除默认75(可配置),其三手动删除。

触发过期⽂件删除时,有两个检查的纬度,⼀个是,是否到了触发删除的时间,就是broker.conf⾥配置的deleteWhen属性。另外还会检查磁盘利⽤率,达到阈值也会触发过期⽂件删除。这个阈值默认是75%,可以在broker.conf⽂件当中定制。但是最⼤值为95,最⼩值为10。

注意:观察源码可得 并未判断消息是否已经被消费,所以当长时间积压的消息,触发上面三个条件之一就会导致消息丢失未消费的情况,所以我们得避免消息积压。

如果删除的时候我们的文件正在被使用这就导致删除不会生效此时就需要redeleteHangedFile方法区删除挂起文件。

ConsumerQueueIndexFile清理
public void run() {try {// 删除过期的filethis.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
}private void deleteExpiredFiles() {// 间隔100int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {this.lastPhysicalMinOffset = minOffset;ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {// 进行consumerQueue删除int deleteCount = logic.deleteExpiredFile(minOffset);if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {try {Thread.sleep(deleteLogicsFilesInterval);} catch (InterruptedException ignored) {}}}}// 进行IndexFile删除DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);}
}

在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。

2、Broker端整体流程以及文件索引结构

至此我们的Producer发送消息以及Broker如何处理存储消息以及介绍完了,首先我们需要去整理一下整个流程,然后后面再来分析Consumer端的消费,以及各种消息类型的案例源码等。

发送端流程总结

生产者发送消息写入内存推送给Broker进行刷盘入CommitLog文件,同时处理分发将消息索引存放到ConsumerQueue与IndexFile文件当中。

如果是主从架构就会去进行主从之间的同步。

接下来当消息达到一定条件就会触发删除机制,程序注册了一个定时任务首次60s执行后面每隔10s去执行一次清理工作。注意:删除文件并没有判断是否被消费过了只要达到要求就会删除继而导致消息的丢失,所以我们得去避免消息的积压。
在这里插入图片描述
索引结构

CommitLog⽂件的⼤⼩是固定的,但是其中每个存储的每个消息单元长度是不固定的,在源码CommitLog类的calMsgLength方法有计算消息长度的方法。

protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;final int msgLen = 4 //TOTALSIZE+ 4 //MAGICCODE+ 4 //BODYCRC+ 4 //QUEUEID+ 4 //FLAG+ 8 //QUEUEOFFSET+ 8 //PHYSICALOFFSET+ 4 //SYSFLAG+ 8 //BORNTIMESTAMP+ bornhostLength //BORNHOST+ 8 //STORETIMESTAMP+ storehostAddressLength //STOREHOSTADDRESS+ 4 //RECONSUMETIMES+ 8 //Prepared Transaction Offset+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY+ 1 + topicLength //TOPIC+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength+ 0;return msgLen;
}

正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个CommitLog⽂件,⽂件名为当前消息的偏移量。

ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。⽽消费者在ConsumeQueue⽂件当中的消费进度会保存在config/consumerOffset.json⽂件当中。

ConsumerQueue=30w固定大小20byte的数据块组成(8字节msgPhyOffset表示起始位置+4字节msgSize文件占用长度+8字节magTagCode消息的tag的Hash值)。

IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。它的⽂件名⽐较特殊不以消息偏移量命名⽽是⽤的时间命名,但是其实它也是⼀个固定⼤⼩的⽂件。

IndexFile=固定40字节indexHeader+固定20字节的个数500w的slot+固定20字节最多个数500W*4的index。

indexHeader表示文件头,slots表示一系列的槽位,index表示真正的索引,槽位里面存放index,槽位紧挨着indexHeader而每个槽位里面最多存放4个index。

在这里插入图片描述
在这里插入图片描述

http://www.dtcms.com/wzjs/214472.html

相关文章:

  • 建设网站的总结网络推广好做吗?
  • 网站内容页面怎么做外链一键开发小程序
  • 温岭建设阳光网站企业查询免费
  • 香港企业网站设计公司项目推广方案怎么写
  • 网站开发H5热搜排行榜今日排名
  • 广州企业建设网站重庆网站搜索引擎seo
  • win7做系统网站哪个好推广普通话的重要意义
  • 企业展示网站赚钱软件
  • wordpress gravatar屏蔽烟台seo快速排名
  • 做c 题的网站网络搜索关键词
  • 深圳b2c电子商务网站最近新闻摘抄50字
  • 注册成立公司的基本流程西安seo代理
  • 想自己搞一个视频网站怎么做外链系统
  • 四川省住房和城乡建设厅官网查证南通seo网站优化软件
  • 学java学费大概是多少长春关键词优化公司
  • 响应式自助建站平台网络销售的工作内容
  • 济南网站优化技术厂家网络优化seo薪酬
  • 大同网站建设哪里好百度电脑版下载安装
  • 微网站和微信公共平台的区别宣传软文模板
  • 网站开发与维护难学吗seo整站优化外包
  • 企业网站备案要多久近几天的新闻摘抄
  • 周口网站建设公司搜狗网站提交入口
  • 做网站范本百度平台联系方式
  • 单页面 网站 模板网络运营是做什么的工作
  • 做淘宝代销哪个网站好网络营销研究现状文献综述
  • 网站推广技术某企业网站的分析优化与推广
  • 晋江网站建设公司深圳网络推广系统
  • 网站建设属那种营业2021最近最火的关键词
  • 网站建设资讯版块如何做用户运营网络推广是诈骗吗
  • 网站月流量怎么免费给自己建网站