RocketMQ 底层原理
一、RoekctMQ存储设计
1、消费并发度
通过给主题划分更多的queue,将不同的queue分散到更多服务器上,从而提高并发度。
2、顺序消费问题
- 乱序的应用实际上大量存在。
- 队列无序不代表消息无序。
3、重复消费问题
- RocketMQ不保证消费不重复。
- 消费端处理消息的业务逻辑保持幂等性。
- 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现。
二、RocketMQ消息存储结构
- commitlog:消息存储目录
- config:运行期间的配置信息
- consumequeue:消息消费队列存储目录
- index:消息索引文件存储目录
- abort:如果存在修改文件则Broker非正常关闭
- checkpoint:文件检查点,存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。
RocketMQ消息的存储是由consumequeue和commitlog配合完成的。commitlog文件是物理存储消息的。consumequeue是消息的逻辑队列,相当于索引文件,存储的是指向物理存储的地址。每个topic下的queue都有一个对应的consumequeue文件。
1、commitLog
commitlog是物理文件,存储着本broker的全部消息数据。在commitlog文件中,一个消息的存储长度不是固定的,RocketMQ向commitlog文件顺序写入数据,随机读取。
commitLog文件的默认大小是1G,写满一个文件后再写第二个,可以通过broker配置文件中的mappedFileSizeCommitLog属性修改默认大小。
每条消息的前面4个字节存储该条消息的总长度。
indexFile和consumeQueue文件中都存储消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个commitLog文件上。
2、consumeQueue
consumeQueue是消息的逻辑队列,相当于索引文件,存储的是指向物理存储的地址。每个topic下的queue都有一个对应的consumequeue文件。
consumeQueue中存储的是消息条目,为了加速consumeQueue消息条目的检索速度与节省磁盘空间,每一个consumeQueue条目不会存储消息的全部信息。总长度20字节。
consumeQueue的构建机制是当消息到达commitLog文件后,由专门的线程产生消息转发任务,从而构建消息消费队列文件consumeQueue与indexFile。
好处:
- commitLog文件顺序写,可以利用磁盘顺序写效率高的特性。
- 虽然是随机读取,但是利用操作系统的pagecache机制可以批量地从磁盘读取数据放到内存中,加速后续的读取速度。
- 为了保证完全的顺序写,就需要consumerQueue存储commitLog文件的物理偏移量,同时consumeQueue可以被完全加载到内存中,保证了commitLog文件的写效率。
- 为了保证commitLog文件和consumeQueue文件的一致性,commitLog文件存储了consumeQueues的所有信息,可以通过commitLog文件恢复consumeQueue文件。
3、index
RocketMQ支持通过MessageID和MessageKey查询消息。
- MessageID:
- 由broker+offset生成的。
- 可以很容易找到对应的commitLog文件读取消息。
- MessageKey:
- 通过构建indexFile提高读取效率。
index存储的是索引文件,在消息数据存入commitLog文件后,给indexFile插入一条内容是【header - commitLog offset - key】的数据,使用key查询消息时,可以直接获取对应的commotLog offset。
使用hash索引机制,具体是Hash槽与Hash冲突的链表结构。
4、config
存储topic和consumer相关的信息。
- topic.json:topic的配置属性
- subscriptionGroup.json:消费者群组配置属性
- delayOffset.json:延时消息队列拉取进度
- consumerOffset.json:集群消费模式消息消费进度
- consumerFilter.json:消息过滤信息
三、过期文件删除
- 为什么做删除?
- RocketMQ启动时会加载commitLog文件和consumeQueue文件映射到内存中。
- 为了避免内存、磁盘空间浪费。
- 删除哪些文件?
- commitLog和consumeQueue文件。
- 非当前写文件:不是最新的commitLog文件就叫“非当前写文件”,这些文件都写满1G了,不会再往里写数据了。
- 如何删除?
- rocketMQ不会关注某个commitLog文件中的数据是否被全部消费。
- 非当前写文件在一定时间内没有被写过,就认为该文件是非当前写文件。默认过期时间是42小时,可以通过broker.conf文件中的fileReservedTime属性修改。
- 通过定时任务10s一次的轮询操作来查找过期文件。
- 过期判断
- fileReservedTime:文件保留时间。
- deletePhysicFilesInterval:删除物理文件的时间间隔。
- destroyMapedFileIntervalForcibly:当删除文件时发现文件正在被引用,就会组织删除操作,同时将文件标记为不可用且记录当前时间戳。
- 删除条件
- deleteWhen:设置删除文件时间点,固定的一天删除一次,默认凌晨4点。
- diskSpaceCleanForciblyRatio:默认85,如果磁盘使用率超过85就会触发文件删除机制。
四、零拷贝与MMAP
1、零拷贝技术
CPU不需要先将数据从磁盘、套接字缓冲区拷贝到应用程序内存。
可以减少数据拷贝和共享线程操作的次数,消除不必要的拷贝次数。
减少【用户进程地址空间】和【内核地址空间】之间的上下文切换。
传统数据传输机制,例如读取文件再用socket发送出去:
- DMA拷贝。将磁盘文件读取到【操作系统内核缓冲区】
- CPU拷贝。将内核缓冲区的数据copy到【应用程序内存】。
- CPU拷贝。将应用程序内存的数据copy到【套接字缓冲区】。
- DMA拷贝。将套接字缓冲区数据copy到【网卡】,由网卡发送数据。
2、MMAP内存映射
mmap是内存映射机制,建立磁盘文件和应用程序内存缓冲区的映射关系,减少了拷贝到【内核缓冲区】的操作。
在【第三步】操作时,直接将应用程序的内存数据拷贝到磁盘。
五、分布式事务
RocketMQ的生产者有【半事务】和【事务回查】机制确保分布式事务。
- 提交半事务。半事务仅会写入commitLog文件,而不会写入consumerQueue文件。
- 执行本地事务。系统A提交完半事务就执行本地事务。
- 回查事务。事务回查机制是RocketMQ回查A系统的事务是否成功,如果成功就将【半事务】写入consumeQueue文件,投递给消费者。
消费者要确保正确消费消息,重复消费时要确保幂等性,消费失败时也要做死信消息的补偿。
1、分布式事务演示代码
消费者演示代码,没有特殊操作,简单消费即可。
package org.apache.rocketmq.example.quickstart.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 事务消费者
*/
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic-10", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : messageExtList) {
System.out.println("执行本地事务成功!transactionId = " + messageExt.getTransactionId());
}
} catch (Exception e) {
// 消费失败,放入重试队列
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消费成功,发送ACK确认
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
事务生产者,需要特殊处理:
package org.apache.rocketmq.example.quickstart.transaction;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
/**
* 事务生产者
*/
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 创建生产者回查线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("producer-group-1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService); // 设置生产者回查线程池
producer.setTransactionListener(new TransactionListenerImpl()); // 设置生产者监听器
producer.start();
// 发送半事务
try {
Message message = new Message("topic-10", null, "事务数据".getBytes());
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
// 解析发送结果
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sendResult.getSendStatus() + "-" + format.format(new Date()));
} catch (Exception e) {
// 发送失败,回滚事务
} finally {
// producer.shutdown();
}
}
}
事务回查监听器:
package org.apache.rocketmq.example.quickstart.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 事务监听器实现
*/
public class TransactionListenerImpl implements TransactionListener {
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 本地事务执行器
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("开始执行本地事务...");
// 情况1:事务执行成功了,直接提交半事务消息。
// return LocalTransactionState.COMMIT_MESSAGE;
// 情况2:事务执行失败了,直接回滚半事务消息
// return LocalTransactionState.ROLLBACK_MESSAGE;
// 情况3:使用异步操作或者等待很久,返回中间状态 - 交给事务回查来确认结果
return LocalTransactionState.UNKNOW;
}
/**
* 事务回查执行器 - 当本地事务返回中间状态触发
* 每隔60s左右重试一次
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("checkTransaction:" + msg.getTransactionId());
System.out.println("事务回查..." + format.format(new Date()));
// 通过查询执行结果或其他方式确认事务执行结果
// 提交事务
return LocalTransactionState.COMMIT_MESSAGE;
// 回滚事务
// return LocalTransactionState.ROLLBACK_MESSAGE;
// 还没处理完...
// return LocalTransactionState.UNKNOW;
}
}
六、RocketMQ的高可用机制
1、集群部署模式
- 单master模式
- 多master模式
- 集群方式部署,有多个broker,此时一个topic可以覆盖多个broker(一部分队列在brokerA,一部分在brokerB)。
- 多master + 多slave模式(同步)
- 只有消息发送到主节点并且同步到从节点中时,才会返回消息发送成功。
- 多master + 多slave模式(异步)
- 消息发送到主节点后,切一个子线程继续执行同步消息到从节点的操作。
2、刷盘(持久化)形式
- 同步刷盘
- 发送消息的主线程在将消息发送到主节点后,再执行写入磁盘操作,期间发送端一直在等待。
- 异步刷盘
- 消息到达主节点后,切一个子线程做持久化操作。
3、刷盘与主从同步
一般而言,使用多主多从架构 + 异步刷盘可以既满足高可用,又能有不错的执行效率。
4、消息生产默认队列选择策略
消息发送分三步走:【选择队列】、指定broker发送消息、检查是否发送成功。
如果发送失败就【重试】,在重试的时候默认启用【规避策略】,选择其他的broker进行发送。
5、消息生产的故障延迟机制策略
也是一种规避策略。
不会直接更换broker发送消息,而是通过失败响应的延迟时间确定等待的时长,在等待时间过了之后继续给原来的broker发送消息。
可以应对由于网络波动导致的可达性问题,更合理地分配消息。
6、生产与消费的负载均衡机制

producer:发送消息使用轮询策略 - broker、queue。

consumer:
1、平均分配 - 每一个consumer分配一部分queue,不在乎broker。
2、在broker内轮询消费queue,每一个consumer尽可能对接更多的broker。
七、面试八股文
1、为什么使用消息队列
答:结合场景。有个业务场景有什么技术挑战,如果不用MQ会很麻烦,使用MQ带来了很多好处,核心就是异步、解耦、削峰。
异步:提高A系统的响应速度,将数据丢给MQ之后B系统就自动处理了,不会阻塞A系统的线程。
解耦:系统之间的数据同步很麻烦,说不定哪天就需要加入一个新的系统也参数数据同步,这时候系统A就需要考虑同步失败了怎么办、加入新的系统还得改代码,耦合性太高。加入MQ之后将数据发送和管理的工作交给MQ,来了新的系统也只需要创建一个消费者即可。
削峰:短时间内涌入大量数据,但是服务器处理能力不够,这时候就需要MQ缓存一部分数据,消费端慢慢消费,避免服务器宕机。
2、消息队列的缺点
- 系统复杂性高
- 需要确保消息不丢失
- 需要确保幂等
- 需要确保消息顺序
- 系统可用性降低
- 功能越多故障点就越多,MQ本身也有故障风险。
- 一致性问题
- 异步处理带来的问题就是没法确认数据一致,需要额外的手段保持数据一致。
3、为什么选择RocketMQ
- 性能:高。
- 功能:功能完善,事务消息、消息重试、延时消息、死信消息...
- 易用:跨平台,支持多协议(MQTT等)。
- 生态发展健全:可以对接阿里云客服指导问题。