当前位置: 首页 > news >正文

湘乡网站建设wordpress 虚拟主机 推荐

湘乡网站建设,wordpress 虚拟主机 推荐,网站色调设计方案,惠州外贸网站建设推广一、引言 在 RocketMQ 这个高性能分布式消息队列系统里,DefaultMessageStore 是消息存储的核心实现类,承担着消息的持久化存储、检索等关键任务。而 AllocateMappedFileService 作为 DefaultMessageStore 中的一个重要属性,在文件分配和管理…

一、引言

在 RocketMQ 这个高性能分布式消息队列系统里,DefaultMessageStore 是消息存储的核心实现类,承担着消息的持久化存储、检索等关键任务。而 AllocateMappedFileService 作为 DefaultMessageStore 中的一个重要属性,在文件分配和管理方面发挥着至关重要的作用。本文将深入剖析 AllocateMappedFileService 的属性和方法,带你了解它在 RocketMQ 消息存储机制中的工作原理和重要意义。

二、AllocateMappedFileService 概述

AllocateMappedFileService 是一个后台服务,其主要职责是预先分配物理文件,也就是 MappedFile 实例。在 RocketMQ 中,消息会被存储在一系列的物理文件中,为了避免在消息写入时临时创建文件带来的性能开销,AllocateMappedFileService 会提前将这些文件分配好,并进行初始化,以确保消息能够快速、高效地写入。

三、核心属性

    //日志组件private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//等待超时时间 默认5秒private static int waitTimeOut = 1000 * 5;//分配请求映射表private ConcurrentMap<String, AllocateRequest> requestTable =new ConcurrentHashMap<String, AllocateRequest>();//分配请求的队列private PriorityBlockingQueue<AllocateRequest> requestQueue =new PriorityBlockingQueue<AllocateRequest>();//是否有异常的标识private volatile boolean hasException = false;//外层消息存储的组件private DefaultMessageStore messageStore;

四、核心方法

1.putRequestAndReturnMappedFile方法

方法概述

putRequestAndReturnMappedFile 方法的主要作用是把文件分配请求添加到请求队列中,并且等待文件分配完成,最终返回分配好的 MappedFile 实例。该方法通过线程同步机制,确保调用者在请求文件分配后,能够等待分配操作完成并获取到可用的文件。

方法源码分析

 /*** 把一个分配mappedfile请求放入进来 以及返回一个分配出来的mappedfile* @param nextFilePath 下一个文件的路径* @param nextNextFilePath 下下一个文件的路径* @param fileSize 文件的大小* @return*/public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {//能够请求分配线程的数量 默认是2//一次分配请求,是针对两个磁盘文件申请的MappedFile,nextFilePath,nextNextFilePathint canSubmitRequests = 2;//是否启用了瞬时存储的池化技术 默认是不开启的 必须手动开启 必须是异步刷盘 必须是master节点if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//如果存储池中没有可用的buffer 是否启用了fast fail 而且broker角色不能使slaveif (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool//获取到瞬时存储池中可用的buffer数量,减去请求队列中数量,获取到可以进行提交的请求数量canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();}}//构建一个分配请求AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);//把请求放入到请求映射表boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;//写入映射表成功if (nextPutOK) {//如果可以提交请求的数量小于等于0if (canSubmitRequests <= 0) {//此时队列中剩余请求数量过多,此时就会移除请求//移除请求映射表log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());this.requestTable.remove(nextFilePath);return null;}//如果可以提交请求,那么请求入队boolean offerOK = this.requestQueue.offer(nextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}//提交请求的数量减1canSubmitRequests--;}//针对下下个文件进行创建分配请求AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);//把下下个文件给添加到请求映射表 并进行入队boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());this.requestTable.remove(nextNextFilePath);} else {boolean offerOK = this.requestQueue.offer(nextNextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}}}if (hasException) {log.warn(this.getServiceName() + " service has exception. so return null");return null;}//从请求映射表里获取第一个文件的分配请求,如果此时不为null,那么说明第一个文件分配请求都没有完成AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {//通过countDownLatch 等待5秒钟 如果5秒内还没分配完成 直接返回null//但是呢,上面代码刚把第一个请求文件分配请求写入到队列里去,立马就被当前服务线程拿出来分配好了,映射表里不会移除这个请求//此时拿到分配请求的结果,此时的countDownLatch应该已经写入了1 此时你才可以countdown一下boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {//说明第一个文件已经处理完毕了 从映射表里移除这个请求 给返回对应的MappedFilethis.requestTable.remove(nextFilePath);return result.getMappedFile();}} else {log.error("find preallocate mmap failed, this never happen");}} catch (InterruptedException e) {log.warn(this.getServiceName() + " service has exception. ", e);}return null;}

2.mmapOperation

方法功能概述

mmapOperation方法的主要功能是根据传入的文件分配请求AllocateRequest,创建一个新的内存映射文件,并对其进行必要的初始化操作,最后返回创建好的MappedFile对象。

方法代码分析

 private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {//从队列中获取分配到请求req = this.requestQueue.take();//从请求映射表里获取到分配请求AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize());return true;}if (expectedRequest != req) {log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);return true;}//还没分配if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();MappedFile mappedFile;//是否开启了瞬时存储池化技术if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {try {//通过jdk提供的ServiceLoader加载一些MappedFilemappedFile = ServiceLoader.load(MappedFile.class).iterator().next();//还需要对拿到的MappedFile进行初始化mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());} catch (RuntimeException e) {log.warn("Use default implementation.");mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());}} else {//直接创建 MappedFile 常规化的操作mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());}long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);if (elapsedTime > 10) {int queueSize = this.requestQueue.size();log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize+ " " + req.getFilePath() + " " + req.getFileSize());}// pre write mappedFile//如果满足了一定的条件之后 会对MappedFile进行预热 提前将磁盘数据加载到内存区域里if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()&&this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} catch (InterruptedException e) {log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");this.hasException = true;return false;} catch (IOException e) {log.warn(this.getServiceName() + " service has exception. ", e);this.hasException = true;if (null != req) {requestQueue.offer(req);try {Thread.sleep(1);} catch (InterruptedException ignored) {}}} finally {if (req != null && isSuccess)req.getCountDownLatch().countDown();}return true;}

五、工作流程

  1. 请求生成:当 DefaultMessageStore 检测到需要新的文件来存储消息时,会创建一个 AllocateRequest 对象,并将其放入 AllocateMappedFileService 的 requestQueue 中。

  2. 文件分配AllocateMappedFileService 的线程会不断从 requestQueue 中取出请求,调用 mmapOperation 方法进行文件分配。

  3. 同步返回:调用 putRequestAndReturnMappedFile 方法的线程会等待文件分配完成,分配成功后会返回 MappedFile 实例供消息写入使用。

  4. 服务停止:当 DefaultMessageStore 关闭时,会调用 AllocateMappedFileService 的 destroy 方法,停止服务并清理资源。

六、总结

AllocateMappedFileService 作为 DefaultMessageStore 中的重要属性,通过预先分配物理文件的方式,大大提高了 RocketMQ 消息存储的性能和效率。其核心属性和方法协同工作,确保了文件分配操作的有序进行和资源的合理利用。深入理解 AllocateMappedFileService 的工作原理,有助于我们更好地优化 RocketMQ 的性能和解决相关的存储问题。在实际应用中,我们可以根据具体的业务场景和系统需求,对 AllocateMappedFileService 的参数和配置进行调整,以达到最佳的性能表现。

http://www.dtcms.com/a/489207.html

相关文章:

  • 北京网站推广排名公司大学生网站的设计风格
  • 珠海建设工程备案网站网站301在哪做
  • 小程序怎么赚钱的快推达seo
  • 易语言做网站简单教程wordpress 自动分享
  • 做淘宝客怎么做官方网站手机版网站模板下载
  • 做网站外包好吗微信分销网站建设多少钱
  • 北京建网站公司哪家便宜网站做好了怎么和域名
  • 怎么制作网站?无锡市住房和城乡建设局网站
  • 南昌网站建设联系方式html 音乐网站
  • 响应式网站建站工具河北网站设计推荐柚米科技
  • iis7 默认网站目录帮企业做网站赚钱
  • 木木科技 网站艰涩阿里wordpress怎么安装教程
  • dw网站根目录怎么做dede门户网站模版
  • 做英文网站赚钱重庆住房和城乡建设部网站的打印准考证
  • 网站地图制作方法潍坊企业网站设计
  • 全球建站免费网站建设推广服务
  • 中文html网站模板下载建设厅网站上报名
  • c 网站开发实战好大夫在线网站官网做提眉的医生
  • 网站seo的关键词排名怎么做的手机可以做软件开发吗
  • 做搜狗手机网站优化软软件开发包括哪些阶段
  • 优化百度seo云南网站优化排名
  • 电子商城网站建设参考文献淘客网站做弹窗广告
  • 西宁网站托管WordPress书籍插件
  • node.js做网站织梦dedecms网站更换域名后文章图片路径批量修改
  • 遵义网站建设oadmin石狮网站建设哪家好
  • 成都网站制作公司怎样用盒子做汽车视频网站
  • asp.net网站转php建筑网络图片
  • 无备案网站 阿里联盟庆阳网红
  • 自适应网站做多大尺寸平面设计的大专学校
  • 珠海高端网站制作微商软件