启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
前言:
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(三)-Producer启动和发送流程(上)
目录
- 基础接口
- MQAdmin接口字段详解
- MQProducer接口详解
- producer发送底层消息分析
- 同步消息
- 异步消息
- 延时消息(实现在broker)
- 事务消息发送流程
- 总结
- 消息request具体传递的信息
- 发送消息流程总结
- 选择消息队列的规避策略
基础接口
在上文可知我们都是用的DefaultMQProducer来创建的生产者,现在我们来看看DefaultMQProducer的继承关系
可见 MQAdmin接口是基础接口,下面就来看看 MQAdmin有哪些字段。
MQAdmin接口字段详解
/**
* 创建主题
* key、newTopic、queueNum
*/
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
/**
* 创建主题
* key、newTopic、queueNum、topicSysFlag
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException;
/**
* 根据时间戳从队列中查找消息偏移量
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
/**
* 查找消息队列中最大偏移量
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
/**
* 查找消息队列中最小的偏移量
*/
long minOffset(final MessageQueue mq) throws MQClientException;
/**
* 查找消息队列中最早的存储消息时间
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
/**
* 根据消息id查找消息
*/
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
/**
* 根据条件查找消息
*/
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
/**
* 根据主题和消息id查找消息
*/
MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
这些方法主要提供了对 RocketMQ 消息队列的管理能力,包括主题创建、消息偏移量查询、消息查找等功能。以下是这些方法的主要用途分类:
- 主题管理:
createTopic:创建新的主题。 - 消息队列管理:
searchOffset:根据时间戳查找偏移量。
maxOffset 和 minOffset:获取队列的最大和最小偏移量。
earliestMsgStoreTime:获取最早存储消息的时间。 - 消息查询:
viewMessage:根据消息 ID 查找消息。
queryMessage:根据条件批量查询消息。
MQProducer接口详解
public interface MQProducer extends MQAdmin {
/**
* 启动生产者
*/
void start() throws MQClientException;
/**
* 关闭生产者
*/
void shutdown();
/**
* 查找指定主题下的所有消息
*/
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
/**
* 发送同步消息
*/
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
/**
* 发送异步消息
*/
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;
/**
* 发送单向消息
*/
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException;
/**
* 发送事务消息
*/
TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
/**
* 批量发送消息
*/
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
MQProducer 是 RocketMQ 中生产者的核心接口,继承自 MQAdmin,提供了消息生产和管理的能力。总的来说有一下特点:
-
生产者生命周期管理
- 启动生产者: void start() throws MQClientException; 启动生产者实例,初始化与 Broker
的连接。 - 关闭生产者:oid shutdown();关闭生产者实例,释放资源。
- 启动生产者: void start() throws MQClientException; 启动生产者实例,初始化与 Broker
-
主题相关操作
- 查找指定主题下的所有消息队列:List fetchPublishMessageQueues(final String topic) throws MQClientException;获取指定主题下的所有消息队列列表。
-
消息发送方式
-
同步消息发送
- 发送消息后会等待服务器返回确认结果
- 提供多种参数组合,支持指定消息队列、选择器、超时时间等。
-
异步消息发送
- 发送消息后不会阻塞线程,通过回调函数处理发送结果。
- 提供多种参数组合,支持指定消息队列、选择器、超时时间等。
-
单向消息发送
- 不关心发送结果,只负责将消息发送到 Broker。
- 适用于对可靠性要求不高的场景。
-
事务消息发送
- 支持分布式事务,确保消息发送与本地事务的一致性。
-
批量消息发送(支持一次发送多条消息,减少网络开销,提高吞吐量。)
- 同步批量发送:
- 异步批量发送:
-
RPC 请求(请求-响应模式)[提供基于消息的请求-响应模式,适用于需要即时反馈的场景]
- 同步请求:
- 异步请求
-
producer发送底层消息分析
上面我们已经对 1 消息的的检查、2 查找路由 3、选择队列 4、消息发送流程做了大概了解,现在我们看看消息如何发送的。
1、sendKernelImpl 方法:这是实际执行消息发送的核心逻辑所在。它会根据消息类型(普通消息、事务消息等)和同步/异步模式来决定如何处理消息。
private SendResult sendDefaultImpl(
Message msg,//消息
final CommunicationMode communicationMode,//通讯方式
final SendCallback sendCallback, //回调函数
final long timeout//超时时间
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
2、在 sendKernelImpl 方法内部,会进一步调用 MQClientAPIImpl.sendMessage() ,这方法负责与Broker建立网络连接,并将消息数据序列化后通过Netty或其他网络通信框架发送给Broker。
SendResult sendMessage(
String addr,
Message msg,
boolean block,
SendCallback sendCallback,
long timeout,
CommunicationMode communicationMode,
SendMessageContext context,
SocketAddress storeHost,
Boolean checkImmunity)
------------------------具体的方法-----------------
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,//请求头
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
// 是否是回应消息
//在RocketMQ的请求-响应模式中,生产者发送一个带有特定标记的消息到Broker,
// 然后等待Broker返回一个带有相同标记的回复消息。这个 isReply 变量就是用来在消息处理逻辑中识别回复消息的关键标志。
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
//非回复消息,普通消息
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
// 并将Producer要发送的消息信息填充进去,如Topic、消息Key、消息Tag等。
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
//根据通信方式选择
switch (communicationMode) {
case ONEWAY: //单向消息
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;//单向消息不用返回
case ASYNC: //异步消息
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;//异步消息直接反悔null
case SYNC: //同步消息
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
//同步消息会返回
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
根据消息是回复类型还是 普通消息,然后看启用智能发送功能没有,最后根据这些不同来构造不同的请求头,然后把消息放进请求体。最终,根据消息是单向,异步,还是同步消息来进行发送。
这里请求头作为参数传进来的: requestHeader请求头封装,查看DefaultMQproducerImpl的sendKernelIlmpl方法可知requestHeader有如下参数
1.设置生产者组名,2.主题名称,3.目标消息队列的ID,4.系统标志位,5.是否批量消息
6.消息重试次数,7.消息最大重试次数 8.Properties(比如延迟消息)
9.标志位(事务消息=4)
然后将上面的信息写入 请求标头中 封装成request
同步消息
最终调用的如下代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
/**
* 每个发送到Broker的请求都会携带一个唯一的 opaque 值,Broker在处理请求后,
* 会将该值原封不动地返回到Producer或Consumer端,这样客户端就可以通过 opaque 值
* 匹配到对应的请求和响应,进而进行后续的处理工作
*/
// opaque 请求id
final int opaque = request.getOpaque();
try {
//创建一个ResponseFuture对象,用于保存本次请求的响应信息,包括响应的Channel、
// 请求的不透明标识符opaque、超时时间、以及将来填充的响应结果和异常原因
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//将ResponseFuture对象放入responseTable中,以opaque作为键,方便后续根据opaque找到对应的响应
this.responseTable.put(opaque, responseFuture);
//获取远程服务器地址
final SocketAddress addr = channel.remoteAddress();
/**
* 使用Netty的writeAndFlush方法异步发送请求,并添加一个ChannelFutureListener监听器。
* 监听器的operationComplete方法会在写操作完成后被调用。
*/
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
//如果写操作成功,设置responseFuture的sendRequestOK为true,表示请求已成功发送至远程服务器。
responseFuture.setSendRequestOK(true);
return;
} else {
//如果写操作失败,从responseTable中移除对应opaque的ResponseFuture,并设置错误原因和响应结果为空,同时输出日志警告。
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// ··等待服务器的响应··,如果在指定超时时间内没有收到响应,则返回null
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
最后rpc远程调用的时候,发送的还是request,请求体里面只有msg.body
总的来说,这段代码实现了客户端向服务器发送请求,并等待服务器响应的过程,其中包括了异步发送、超时处理、异常处理等功能。
异步消息
进入带有回调对象作为参数的 …client.impl.MQclientAPIImpl.sendMessage(…)
可以发现 在netty异步调用完成时会把结果写入注册的回调方法。
延时消息(实现在broker)
在消息里面设置延时等级
可见延迟消息的字段在property中
然后会把这个property(map结构转换为String)然后传给Broker
主要实现在Broker端的处理消息存储的过程
中
事务消息发送流程
旧是调用的 defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);方法。详细看下面代码
事务消息中不支持延迟发送。即使设置了也会把延迟等级的属性给删除
//检查是否有事务监听器
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
// 忽略延迟等级 删除msg中Property的延时key
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
// 检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//添加msg中Property的 TRAN_MSG 为true (半消息为 true)
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 添加生产者组
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
//发送完毕 设置localTransactionState的状态为 UNKNOW
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
// 其他代码...
//todo 执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
// 其他代码...
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//只要不是SEND_OK 就 设置 localTransactionState=ROLLBACK_MESSAGE回滚消息
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//todo 方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
上面代码总结:
● 检查是否有本地事务执行器和事务监听器
● 忽略延迟消息
● 把消息设置为半事务消息 (properties的k=TRAN_MSG , V=true)和生产者组
● 发送消息
● 根据消息的结果 选择执行本地事务操作或者本地事务的回滚操作。
● 根据本地事务执行的结果向RocketMQ Broker报告事务的最终状态,以便Broker根据事务结果决定消息的最终处理
然后发送消息时调用如下代码
事务消息 是同步消息
标志位设置为4
方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态
总结
消息request具体传递的信息
● 请求头:
- 1.设置生产者组名,
- 2.主题名称,
- 3.目标消息队列的ID,
- 4.系统标志位,
- 5.是否批量消息
- 6.消息重试次数,
- 7.消息最大重试次数
- 8.Properties(比如延迟消息)
- 9.标志位(事务消息=4)
● 请求体:msg.body消息的字节码
发送消息流程总结
-
1、检查消息
● topic的名称和长度(不大于127)是否符合规范
● 消息是否不为空和大小有没有超过4m
● 当前topic是否允许生产者发送消息 -
2、查找路由
● 现在本地缓存中查找,如果没有再从namesrv查找。
● 如果找打的路由信息不可用则强制再从namesrv查找
● 返回路由 -
3、选择队列
● 如果没有开启发送延迟容忍(默认没开启)则走轮询,但是会规避上次发送失败的broker
● 如果开启了,就会根据当前broker的可用时间发送,如果都不可用,则选择一个适合的broker。如果没有适合的broker则走默认的轮询机制。 -
4、发送消息
使用netty实现网络通信,发送消息,它的通信协议是 自定义的协议。消息传递的基本协议单元包含以下重要信息
● 命令标识code:用于标识具体的命令
● 语言标记languang:表示使用的语言
● 版本信息version:协议版本
● 唯一标识opaque:请求id
● 序列化类型 serializeTypeCurrentRPC
● 消息正文Body
● 自定义标头customHeader:生产者组、主题、消息队列、properties、标志位
选择消息队列的规避策略
- 消息发送成功后会记录发送时间,然后会将当前broker加了故障表故障表记录了broker名称,延迟时间、broker可用时间
- broker可用时间,会根据当前延时时间来选择 固定的间隔时间(mq内部有一个数组,根据延迟时间返回对应的间隔时间),然后broker可用时间=存入故障表的时间+间隔时间
- 如果消息发送失败,会用延迟时间为30000ms来计算可用时间。
默认轮询有参数传入的是 上次不可用的broker名
- 在选择消息队列时,使用ThreadLocal来针对当前线程inde自增,然后用inde当前路由信息的总队列数取模运算,然后得到消息队列,在返回队列之前,会规避掉上次故障的broker(故障broker只能记录最后一次异常的roker)【循环找】
开启发送延迟故障容忍 :
- 先是使用ThreadLocal来针对当前线程inde自增,得到inde
- 然后也是循环用inde对总队列数取模运算,得到一个mq
- 然后根据当前mq所在的broker是否在可用时间(当前时间>=broker可用时间),如果可用就返回消息队列。
- 不可用,就循环找。都找不到就选择一个适合的Broker【在故障表中找到所有的Broker,然后根据一定的条件排序,组后返回中间附近的broker返回】。如果Broker有队列数,就随机找一个mq,如果没有队列数,走无参的轮询。