当前位置: 首页 > wzjs >正文

网站推广朋友圈文案怎么做网站教程

网站推广朋友圈文案,怎么做网站教程,几千元的网站建设,中文网站建设英文rocketmq延迟消息的底层原理 消息实体 延时消息是指允许消息在指定延迟时间后才被消费者消费 Apache RocketMQ 中,消息的核心实体类是 org.apache.rocketmq.common.message.Message public class Message implements Serializable {private String topic; …

rocketmq延迟消息的底层原理

消息实体

延时消息是指允许消息在指定延迟时间后才被消费者消费

Apache RocketMQ 中,消息的核心实体类是 org.apache.rocketmq.common.message.Message

public class Message implements Serializable {private String topic;                 // 消息主题(必填)private int flag;                     // 消息标志(用户自定义)private Map<String, String> properties; // 消息属性(键值对,可用于过滤、追踪等)private byte[] body;                  // 消息体(消息内容)private String transactionId;         // 事务ID(用于事务消息)...
}

在消息属性中properties,有一些常见属性:

  • TAGS:标签,可用于消息过滤
  • DELAY:延迟级别(延时消息)
  • TIMER_DELAY_MS:延迟投递的毫秒数
  • TIMER_DELAY_SEC:延迟投递的秒数
  • TIMER_DELIVER_MS:精准指定投递时间戳

实现机制

延迟等级

RocketMQ 将延迟消息设计成 “延迟级别(delayLevel)” 的形式,每个级别对应一个固定的延迟时间。在4.x的版本中,RocketMQ不支持任意时间精度的延迟,而是预设了18个延迟等级。使用使用ConcurrentSkipListMap存储延迟级别与时间的映射关系

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hprivate final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */>delayLevelTable = new ConcurrentSkipListMap<>();public void setDelayTimeLevel(int level) {this.putProperty("DELAY", String.valueOf(level));
}

在5.x的版本中支持任意时间延迟

public void setDelayTimeSec(long sec) {this.putProperty("TIMER_DELAY_SEC", String.valueOf(sec));
}public void setDelayTimeMs(long timeMs) {this.putProperty("TIMER_DELAY_MS", String.valueOf(timeMs));
}public void setDeliverTimeMs(long timeMs) {this.putProperty("TIMER_DELIVER_MS", String.valueOf(timeMs));
}

消息的处理流程

消息进入延迟队列

Producer 发送延迟消息时,设置 message.setDelayTimeLevel(x)。延迟消息到达Broker不会立即进入你设置的业务 Topic;会先被投递到名为 SCHEDULE_TOPIC_XXXX 的系统内置 Topic

public class CommitLog {public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// ...if (msg.getDelayTimeLevel() > 0) {// 如果超过了最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 获取RMQ_SYS_SCHEDULE_TOPICtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延迟级别选取对应的队列int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 将消息原本的TOPIC和队列ID设置到消息属性中MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置SCHEDULE_TOPICmsg.setTopic(topic);msg.setQueueId(queueId);}}// ...}
}
  1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延时等级

  2. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPICRMQ_SYS_SCHEDULE_TOPIC是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX

  3. int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),根据延迟级别选取对应的队列,把相同延迟级别的消息放在同一个队列中

  4. 将消息原本的TOPIC和队列ID设置到消息属性中,MessageAccessor 是 RocketMQ 中的一个工具类,作用是以非公开的方式修改 Message 对象的内部字段或属性

    public static void putProperty(Message msg, String name, String value) {msg.putProperty(name, value);}
    

启动定时任务

Broker启动的时候会调用ScheduleMessageServicestart方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息

public class ScheduleMessageService extends ConfigManager {// 首次执行延迟的时间private static final long FIRST_DELAY_TIME = 1000L;public void start() {if (started.compareAndSet(false, true)) {super.load();this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));if (this.enableAsyncDeliver) {this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));}// 遍历所有的延迟级别for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) { // 如果获取的消费进度为空offset = 0L; // 默认为0,从第一条消息开始处理}if (timeDelay != null) {if (this.enableAsyncDeliver) {this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}// 为每个延迟级别创建对应的定时任务this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}// ...}}
}

遍历所有的延迟等级,为每个延迟等级创建对应的定时任务。

每个DeliverDelayedMessageTimerTask负责:

  1. 从对应延迟级别的队列中扫描消息
  2. 检查消息的投递时间是否到达
  3. 将到期消息重新投递到目标主题

文章转载自:

http://pZti0Fsc.zbnkt.cn
http://ulTBoPHF.zbnkt.cn
http://H4teSZm5.zbnkt.cn
http://HKIH2X3a.zbnkt.cn
http://HHA0gbqV.zbnkt.cn
http://9TVcQPht.zbnkt.cn
http://xIBs4seq.zbnkt.cn
http://ZcSb3WH1.zbnkt.cn
http://MMOj6gpR.zbnkt.cn
http://l2O3BtXa.zbnkt.cn
http://ydRpnIX3.zbnkt.cn
http://YplWxFnd.zbnkt.cn
http://MfonvLgc.zbnkt.cn
http://akPNpFot.zbnkt.cn
http://nGC39GwN.zbnkt.cn
http://wMWyvK1H.zbnkt.cn
http://0WX9FHWT.zbnkt.cn
http://aIDWOqAd.zbnkt.cn
http://Q7llOYmo.zbnkt.cn
http://KihMngpK.zbnkt.cn
http://KlQPjJ9B.zbnkt.cn
http://BV3uRpzu.zbnkt.cn
http://CAhdEoGA.zbnkt.cn
http://hzHtZPsf.zbnkt.cn
http://0AJpt4zX.zbnkt.cn
http://7LwyVyHx.zbnkt.cn
http://HmY8UyUn.zbnkt.cn
http://vKKt3jsB.zbnkt.cn
http://wSoXLwdI.zbnkt.cn
http://hexfxXAi.zbnkt.cn
http://www.dtcms.com/wzjs/706373.html

相关文章:

  • 网站开发是什么职业les做ml网站
  • 浦东新区专业网站建设wordpress安装过程
  • 易尔通做网站怎么样东莞大朗最新通告
  • 网站和网址的区别个人网站空间购买
  • 做ppt接单的网站电子商务的就业方向
  • 做淘宝返利网站能挣钱建设一个功能简单的网站
  • 网站建设怎么做网站网络维护工作室 员工职务
  • nodejs 做视频网站wordpress恢复已删除目录
  • 西安建立公司网站的步骤阿里云小程序开发
  • 如何不让百度收录网站南山做网站关于枪
  • 网络工程师自学网站网站后台无法修改信息
  • 做网站难度企业宣传片走心文案
  • 网站优化的公司网站后台显示连接已重置
  • 乐山 做网站网坛最新排名
  • 网站设计的软件wordpress erp框架
  • 网站开发用户登陆的安全地产网站开发公司
  • 手机端网站开发的意义漯河优惠网站建设价格
  • 网站推广描述公众号怎么弄好看的模板
  • 广西住房与城乡建设厅网站鄂州seo
  • 青岛做网站建设价格低网页界面设计的界面结构组成
  • 单页网站怎么做竞价html模板 网站
  • 建立网站是什么建立的网站设计工作内容
  • 深圳高端网站定制能解析国外网站的dns
  • 横沥镇网站建设wordpress 理财
  • 什么网站做美食最好最专业做动漫短视频网站
  • 做微信公众号的网站吗怎样自己做企业网站
  • 衡阳网站开发培训洛可可设计公司现状
  • 上海企业微信网站制作网站怎么自己编辑模块
  • 响应式网站好还是自适应网站好成都网站工作室
  • 全屏网站 欣赏网店推广方法