RocketMQ延迟消息机制
两种延迟消息
RocketMQ中提供了两种延迟消息机制
- 指定固定的延迟级别
通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别 - 指定时间点的延迟级别
通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后,RocketMQ会自动发送消息。
在brokerController初始化时对两种延迟消息的处理进行了初始化
public boolean recoverAndInitService() throws CloneNotSupportedException {...if (messageStore != null) {registerMessageStoreHook();//注册一系列的钩子其中有处理指定固定的延迟级别的钩子result = this.messageStore.load();}if (messageStoreConfig.isTimerWheelEnable()) {result = result && this.timerMessageStore.load();//处理指定时间点的延迟消息}...
}
固定延迟级别的延迟消息
这类延迟消息由一个很重要的后台服务scheduleMessageService来管理。 他会在broker启动时也一起加载。
public void start() {if (started.compareAndSet(false, true)) {//cas防止重复启动this.load();this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));if (this.enableAsyncDeliver) {this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));}for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {//循环所有级别的延迟来开启任务Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {if (this.enableAsyncDeliver) {//是否异步投递默认falsethis.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}//处理固定延迟级别的消息this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);}}//延迟消息持久化scheduledPersistService.scheduleAtFixedRate(() -> {try {ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);}}
Broker在处理消息之前,会注册一系列的钩子,类似于过滤器,对消息做一些预处理。其中就会对延迟消息做处理。
HookUtils中有一个方法,就会在Broker处理消息之前对延迟消息做一些特殊处理。
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,final MessageExtBrokerInner msg) {final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {if (!isRolledTimerMessage(msg)) {if (checkIfTimerMessage(msg)) {if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {//wheel timer is not enabled, reject the messagereturn new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);}PutMessageResult transformRes = transformTimerMessage(brokerController, msg);//转移指定时间点的延迟消息if (null != transformRes) {return transformRes;}}}// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {//获取延迟级别大于零代表设置了transformDelayLevelMessage(brokerController, msg);}}return null;}
对于固定延迟级别的延迟消息会转移到系统内置的Topic中。转移到SCHEDULE_TOPIC_XXXX Topic中,对列对应延迟级别。
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());}//保留消息的原始Topic和队列以属性形式来存储// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//修改成系统内置的Topic和队列msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));}
scheduleMessageService.start()延迟启动了任务DeliverDelayedMessageTimerTask在里面再设置下一次延时任务。
public void executeOnTimeUp() {//获取从CommitLog中获取RMQ_SYS_SCHEDULE_TOPIC这个topic下的队列ConsumeQueueInterface cq =ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);if (bufferCQ == null) {long resetOffset;if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}long nextOffset = this.offset;try {while (bufferCQ.hasNext() && isStarted()) {CqUnit cqUnit = bufferCQ.next();long offsetPy = cqUnit.getPos();int sizePy = cqUnit.getSize();long tagsCode = cqUnit.getTagsCode();if (!cqUnit.isTagsCodeValid()) {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//消息的tagsCode,它在延时消息中被复用为原始消息的投递时间戳long currOffset = cqUnit.getQueueOffset();assert cqUnit.getBatchNum() == 1;nextOffset = currOffset + cqUnit.getBatchNum();long countdown = deliverTimestamp - now;if (countdown > 0) {//如果大于零说明还没时间,直接进入下一轮延迟任务this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);return;}MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeUp(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {//如果是事务消息则不支持延迟,跳过此消息log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {//异步投递,默认falsedeliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);} else {//同步投递消息,获取当前broker是否为主节点,是则投递到当前节点对应topic的queuedeliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);}if (!deliverSuc) {this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);return;}}} catch (Exception e) {log.error("ScheduleMessageService, messageTimeUp execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}
指定时间点的延迟消息
这类延迟消息通过时间轮算法进行定时计算核心类就是TimerMessageStore对象中加载的六个核心线程
public void initService() {//五个核心服务线程enqueueGetService = new TimerEnqueueGetService();//定时从TIMER_TOPIC的消费队列中扫描消息,将延迟消息封装为 TimerRequest 放入enqueuePutQueue(enqueue方法)enqueuePutService = new TimerEnqueuePutService();//从enqueuePutQueue消费TimerRequest,判断消息是否已过,未过期消息写入 TimerLog 和时间轮,过期消息直接转发给消费服务dequeueGetService = new TimerDequeueGetService();//调用 dequeue() 扫描当前时间槽(currReadTimeMs)放入dequeuePutQueue,通过 moveReadTime() 推进时间指针timerFlushService = new TimerFlushService();//定时刷盘 TimerLog 和 TimerWheelint getThreadNum = Math.max(storeConfig.getTimerGetMessageThreadNum(), 1);dequeueGetMessageServices = new TimerDequeueGetMessageService[getThreadNum];for (int i = 0; i < dequeueGetMessageServices.length; i++) {dequeueGetMessageServices[i] = new TimerDequeueGetMessageService();//负责从延迟消息队列中获取消息,触发消息的消费流程。}int putThreadNum = Math.max(storeConfig.getTimerPutMessageThreadNum(), 1);dequeuePutMessageServices = new TimerDequeuePutMessageService[putThreadNum];for (int i = 0; i < dequeuePutMessageServices.length; i++) {dequeuePutMessageServices[i] = new TimerDequeuePutMessageService();//负责将发送的延迟消息从延迟队列(Timer Queue)中取出,然后放入真正的消息队列中,供消费者消费。}}
这五个核心Service会结合TimeMeessageStore中的几个核心队列来进行操作。
protected final BlockingQueue<TimerRequest> enqueuePutQueue;protected final BlockingQueue<List<TimerRequest>> dequeueGetQueue;protected final BlockingQueue<TimerRequest> dequeuePutQueue;private final TimerWheel timerWheel;//时间轮private final TimerLog timerLog;//持久化到磁盘用于崩溃恢复protected volatile long currReadTimeMs;//读指针,TimerDequeueGetService找出到时间的延迟消息并且更新时间protected volatile long currWriteTimeMs;//写指针,在TimerEnqueuePutService把延迟消息放入时间轮的时候更新时间
TimeMeessageStore内部类TimerFlushService就负责每隔1s处理checkpoint和时间轮刷盘
public void run() {...timerLog.getMappedFileQueue().flush(0);//延迟消息的实际数据文件刷盘timerWheel.flush();//时间轮刷盘timerCheckpoint.flush();//checkpoint...waitForRunning(storeConfig.getTimerFlushIntervalMs());//休眠1000...}
他们之间的关系
TimeWheel时间轮组件算法有两个核心:
- 数据按照预设的过期时间,放到对应的slot上(时钟表上的每个秒钟刻度)。
public MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) {if (enqueueTime != -1) {MessageAccessor.putProperty(messageExt, TIMER_ENQUEUE_MS, enqueueTime + "");}if (needRoll) {if (messageExt.getProperty(TIMER_ROLL_TIMES) != null) {//记录滚动了的次数MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + "");} else {MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, 1 + "");}}MessageAccessor.putProperty(messageExt, TIMER_DEQUEUE_MS, System.currentTimeMillis() + "");MessageExtBrokerInner message = convertMessage(messageExt, needRoll);return message;}
- 时间轮上设置一个指针变量(钟表上的秒钟),指针会按固定时间前进。指针指向的Slot(指向的刻度),就是当前已经到期的数据。
对于指定时间点的延迟消息也会转移到系统内置的Topic中。转移到TIMER_TOPIC(rmq_sys_wheel_timer) Topic中,队列ID固定为0。
private static PutMessageResult transformTimerMessage(BrokerController brokerController,MessageExtBrokerInner msg) {//do transformint delayLevel = msg.getDelayTimeLevel();long deliverMs;try {if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;} else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));} else {deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));}} catch (Exception e) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}if (deliverMs > System.currentTimeMillis()) {if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000L) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();if (deliverMs % timerPrecisionMs == 0) {deliverMs -= timerPrecisionMs;} else {deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;}if (brokerController.getTimerMessageStore().isReject(deliverMs)) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(TimerMessageStore.TIMER_TOPIC);msg.setQueueId(0);//固定0号队列} else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);}return null;}
数据结构:
- TimerWheel整体是一个数组,工作原理可以理解为一个时钟盘。盘上的每个刻度是一个slot。每个slot记录一条数据的索引。所有具体的消息数据都是放到一个LocalBuffer缓存数组中的。每个Slot就描述一条或多条LocalBuffer上的具体消息数据。Slot存放了firstPos(第一个消息在timerLog的起始地址)和lastPos(最后一个消息在在timerLog的起始地址)
public class Slot {public static final short SIZE = 32;//槽位总大小(字节数),固定为32字节public final long timeMs; //该槽位对应的延迟时间(毫秒级时间戳),表示这个槽位中的消息应该在这个时间点被投递public final long firstPos;//该槽位中第一条消息在TimerLog中的物理偏移量(起始位置)public final long lastPos;//该槽位中最后一条消息在TimerLog中的物理偏移量(结束位置) public final int num;//该槽位中当前包含的消息数量public final int magic; //no use now, just keep it.标志位(目前未使用,保留字段) 可能的用途:标记特殊类型的消息(如需要滚动的消息)...
}
- TimerLog存储了所有指定时间点的延时消息元数据信息,每个延迟消息元数据信息存储了上一个元数据信息的起始地址形成链表,通过元数据信息可以到commitLog获取消息数据
public class TimerLog {...public final static int UNIT_SIZE = 4 //size+ 8 //prev pos上一个元信息的起始地址+ 4 //magic value(消息类型标记,滚动标记和删除标记)+ 8 //curr write time, for trace(写入时间)+ 4 //delayed time, for check(延时时间差)+ 8 //offsetPy(物理偏移量)+ 4 //sizePy(消息大小)+ 4 //hash code of real topic(主题哈希)+ 8; //reserved value, just in case of (保留字段)...
- 在TimerMessageStore中有两个变量currReadTimeMs 和 currReadTimeMs。 这两个指针就类似于时钟上的指针。其中,currWriteTimeMs指向当前正在写入数据的slot。 而currReadTimeMs指向当前正在读取数据的slot。这两个变量不断往前变化,就可以像时钟的指针一样依次读取每一秒上的数据。这时候读到的slot是可以表示当前这一秒的数据 ,还有 时间轮转过多轮后的数据。