2023年每日少儿新闻暴风seo论坛
前言:
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(三)-Producer启动和发送流程(上)
目录
- 基础接口
- MQAdmin接口字段详解
- MQProducer接口详解
- producer发送底层消息分析
- 同步消息
- 异步消息
- 延时消息(实现在broker)
- 事务消息发送流程
- 总结
- 消息request具体传递的信息
- 发送消息流程总结
- 选择消息队列的规避策略
基础接口
在上文可知我们都是用的DefaultMQProducer来创建的生产者,现在我们来看看DefaultMQProducer的继承关系
可见 MQAdmin接口是基础接口,下面就来看看 MQAdmin有哪些字段。
MQAdmin接口字段详解
/*** 创建主题* key、newTopic、queueNum*/void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;/*** 创建主题* key、newTopic、queueNum、topicSysFlag*/void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException;/*** 根据时间戳从队列中查找消息偏移量*/long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;/*** 查找消息队列中最大偏移量*/long maxOffset(final MessageQueue mq) throws MQClientException;/*** 查找消息队列中最小的偏移量*/long minOffset(final MessageQueue mq) throws MQClientException;/*** 查找消息队列中最早的存储消息时间*/long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;/*** 根据消息id查找消息*/MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;/*** 根据条件查找消息*/QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;/*** 根据主题和消息id查找消息*/MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
这些方法主要提供了对 RocketMQ 消息队列的管理能力,包括主题创建、消息偏移量查询、消息查找等功能。以下是这些方法的主要用途分类:
- 主题管理:
createTopic:创建新的主题。 - 消息队列管理:
searchOffset:根据时间戳查找偏移量。
maxOffset 和 minOffset:获取队列的最大和最小偏移量。
earliestMsgStoreTime:获取最早存储消息的时间。 - 消息查询:
viewMessage:根据消息 ID 查找消息。
queryMessage:根据条件批量查询消息。
MQProducer接口详解
public interface MQProducer extends MQAdmin {/*** 启动生产者*/void start() throws MQClientException;/*** 关闭生产者*/void shutdown();/*** 查找指定主题下的所有消息*/List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;/*** 发送同步消息*/SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueue mq, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;/*** 发送异步消息*/void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException;/*** 发送单向消息*/void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException;/*** 发送事务消息*/TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;/*** 批量发送消息*/SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,MQBrokerException, InterruptedException;void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,MQBrokerException, InterruptedException;void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//for rpcMessage request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final RequestCallback requestCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,InterruptedException;void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback,final long timeout) throws MQClientException, RemotingException,InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueue mq, final long timeout)throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
MQProducer 是 RocketMQ 中生产者的核心接口,继承自 MQAdmin,提供了消息生产和管理的能力。总的来说有一下特点:
-
生产者生命周期管理
- 启动生产者: void start() throws MQClientException; 启动生产者实例,初始化与 Broker
的连接。 - 关闭生产者:oid shutdown();关闭生产者实例,释放资源。
- 启动生产者: void start() throws MQClientException; 启动生产者实例,初始化与 Broker
-
主题相关操作
- 查找指定主题下的所有消息队列:List fetchPublishMessageQueues(final String topic) throws MQClientException;获取指定主题下的所有消息队列列表。
-
消息发送方式
-
同步消息发送
- 发送消息后会等待服务器返回确认结果
- 提供多种参数组合,支持指定消息队列、选择器、超时时间等。
-
异步消息发送
- 发送消息后不会阻塞线程,通过回调函数处理发送结果。
- 提供多种参数组合,支持指定消息队列、选择器、超时时间等。
-
单向消息发送
- 不关心发送结果,只负责将消息发送到 Broker。
- 适用于对可靠性要求不高的场景。
-
事务消息发送
- 支持分布式事务,确保消息发送与本地事务的一致性。
-
批量消息发送(支持一次发送多条消息,减少网络开销,提高吞吐量。)
- 同步批量发送:
- 异步批量发送:
-
RPC 请求(请求-响应模式)[提供基于消息的请求-响应模式,适用于需要即时反馈的场景]
- 同步请求:
- 异步请求
-
producer发送底层消息分析
上面我们已经对 1 消息的的检查、2 查找路由 3、选择队列 4、消息发送流程做了大概了解,现在我们看看消息如何发送的。
1、sendKernelImpl 方法:这是实际执行消息发送的核心逻辑所在。它会根据消息类型(普通消息、事务消息等)和同步/异步模式来决定如何处理消息。
private SendResult sendDefaultImpl(Message msg,//消息final CommunicationMode communicationMode,//通讯方式final SendCallback sendCallback, //回调函数final long timeout//超时时间) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
2、在 sendKernelImpl 方法内部,会进一步调用 MQClientAPIImpl.sendMessage() ,这方法负责与Broker建立网络连接,并将消息数据序列化后通过Netty或其他网络通信框架发送给Broker。
SendResult sendMessage(String addr, Message msg,boolean block,SendCallback sendCallback,long timeout,CommunicationMode communicationMode,SendMessageContext context,SocketAddress storeHost,Boolean checkImmunity)------------------------具体的方法-----------------public SendResult sendMessage(final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,//请求头final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);// 是否是回应消息//在RocketMQ的请求-响应模式中,生产者发送一个带有特定标记的消息到Broker,// 然后等待Broker返回一个带有相同标记的回复消息。这个 isReply 变量就是用来在消息处理逻辑中识别回复消息的关键标志。boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) {if (sendSmartMsg) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);} else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);}} else {//非回复消息,普通消息if (sendSmartMsg || msg instanceof MessageBatch) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {// 并将Producer要发送的消息信息填充进去,如Topic、消息Key、消息Tag等。request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}}request.setBody(msg.getBody());//根据通信方式选择switch (communicationMode) { case ONEWAY: //单向消息this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;//单向消息不用返回case ASYNC: //异步消息final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;//异步消息直接反悔null case SYNC: //同步消息long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//同步消息会返回return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;}
根据消息是回复类型还是 普通消息,然后看启用智能发送功能没有,最后根据这些不同来构造不同的请求头,然后把消息放进请求体。最终,根据消息是单向,异步,还是同步消息来进行发送。
这里请求头作为参数传进来的: requestHeader请求头封装,查看DefaultMQproducerImpl的sendKernelIlmpl方法可知requestHeader有如下参数
1.设置生产者组名,2.主题名称,3.目标消息队列的ID,4.系统标志位,5.是否批量消息
6.消息重试次数,7.消息最大重试次数 8.Properties(比如延迟消息)
9.标志位(事务消息=4)
然后将上面的信息写入 请求标头中 封装成request
同步消息
最终调用的如下代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {/*** 每个发送到Broker的请求都会携带一个唯一的 opaque 值,Broker在处理请求后,* 会将该值原封不动地返回到Producer或Consumer端,这样客户端就可以通过 opaque 值* 匹配到对应的请求和响应,进而进行后续的处理工作*/// opaque 请求idfinal int opaque = request.getOpaque();try {//创建一个ResponseFuture对象,用于保存本次请求的响应信息,包括响应的Channel、// 请求的不透明标识符opaque、超时时间、以及将来填充的响应结果和异常原因final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);//将ResponseFuture对象放入responseTable中,以opaque作为键,方便后续根据opaque找到对应的响应this.responseTable.put(opaque, responseFuture);//获取远程服务器地址final SocketAddress addr = channel.remoteAddress();/*** 使用Netty的writeAndFlush方法异步发送请求,并添加一个ChannelFutureListener监听器。* 监听器的operationComplete方法会在写操作完成后被调用。*/channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {//如果写操作成功,设置responseFuture的sendRequestOK为true,表示请求已成功发送至远程服务器。responseFuture.setSendRequestOK(true);return;} else {//如果写操作失败,从responseTable中移除对应opaque的ResponseFuture,并设置错误原因和响应结果为空,同时输出日志警告。responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});// ··等待服务器的响应··,如果在指定超时时间内没有收到响应,则返回nullRemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);}}
最后rpc远程调用的时候,发送的还是request,请求体里面只有msg.body
总的来说,这段代码实现了客户端向服务器发送请求,并等待服务器响应的过程,其中包括了异步发送、超时处理、异常处理等功能。
异步消息
进入带有回调对象作为参数的 …client.impl.MQclientAPIImpl.sendMessage(…)
可以发现 在netty异步调用完成时会把结果写入注册的回调方法。
延时消息(实现在broker)
在消息里面设置延时等级
可见延迟消息的字段在property中
然后会把这个property(map结构转换为String)然后传给Broker
主要实现在Broker端的处理消息存储的过程
中
事务消息发送流程
旧是调用的 defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);方法。详细看下面代码
事务消息中不支持延迟发送。即使设置了也会把延迟等级的属性给删除
//检查是否有事务监听器TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameter// 忽略延迟等级 删除msg中Property的延时keyif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}// 检查消息Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;//添加msg中Property的 TRAN_MSG 为true (半消息为 true)MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 添加生产者组MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {//发送消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}//发送完毕 设置localTransactionState的状态为 UNKNOWLocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {// 其他代码...//todo 执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);// 其他代码...}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE://只要不是SEND_OK 就 设置 localTransactionState=ROLLBACK_MESSAGE回滚消息localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {//todo 方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;
上面代码总结:
● 检查是否有本地事务执行器和事务监听器
● 忽略延迟消息
● 把消息设置为半事务消息 (properties的k=TRAN_MSG , V=true)和生产者组
● 发送消息
● 根据消息的结果 选择执行本地事务操作或者本地事务的回滚操作。
● 根据本地事务执行的结果向RocketMQ Broker报告事务的最终状态,以便Broker根据事务结果决定消息的最终处理
然后发送消息时调用如下代码
事务消息 是同步消息
标志位设置为4
方法的主要作用是根据本地事务的执行结果向RocketMQ Broker报告事务的最终状态
总结
消息request具体传递的信息
● 请求头:
- 1.设置生产者组名,
- 2.主题名称,
- 3.目标消息队列的ID,
- 4.系统标志位,
- 5.是否批量消息
- 6.消息重试次数,
- 7.消息最大重试次数
- 8.Properties(比如延迟消息)
- 9.标志位(事务消息=4)
● 请求体:msg.body消息的字节码
发送消息流程总结
-
1、检查消息
● topic的名称和长度(不大于127)是否符合规范
● 消息是否不为空和大小有没有超过4m
● 当前topic是否允许生产者发送消息 -
2、查找路由
● 现在本地缓存中查找,如果没有再从namesrv查找。
● 如果找打的路由信息不可用则强制再从namesrv查找
● 返回路由 -
3、选择队列
● 如果没有开启发送延迟容忍(默认没开启)则走轮询,但是会规避上次发送失败的broker
● 如果开启了,就会根据当前broker的可用时间发送,如果都不可用,则选择一个适合的broker。如果没有适合的broker则走默认的轮询机制。 -
4、发送消息
使用netty实现网络通信,发送消息,它的通信协议是 自定义的协议。消息传递的基本协议单元包含以下重要信息
● 命令标识code:用于标识具体的命令
● 语言标记languang:表示使用的语言
● 版本信息version:协议版本
● 唯一标识opaque:请求id
● 序列化类型 serializeTypeCurrentRPC
● 消息正文Body
● 自定义标头customHeader:生产者组、主题、消息队列、properties、标志位
选择消息队列的规避策略
- 消息发送成功后会记录发送时间,然后会将当前broker加了故障表故障表记录了broker名称,延迟时间、broker可用时间
- broker可用时间,会根据当前延时时间来选择 固定的间隔时间(mq内部有一个数组,根据延迟时间返回对应的间隔时间),然后broker可用时间=存入故障表的时间+间隔时间
- 如果消息发送失败,会用延迟时间为30000ms来计算可用时间。
默认轮询有参数传入的是 上次不可用的broker名
- 在选择消息队列时,使用ThreadLocal来针对当前线程inde自增,然后用inde当前路由信息的总队列数取模运算,然后得到消息队列,在返回队列之前,会规避掉上次故障的broker(故障broker只能记录最后一次异常的roker)【循环找】
开启发送延迟故障容忍 :
- 先是使用ThreadLocal来针对当前线程inde自增,得到inde
- 然后也是循环用inde对总队列数取模运算,得到一个mq
- 然后根据当前mq所在的broker是否在可用时间(当前时间>=broker可用时间),如果可用就返回消息队列。
- 不可用,就循环找。都找不到就选择一个适合的Broker【在故障表中找到所有的Broker,然后根据一定的条件排序,组后返回中间附近的broker返回】。如果Broker有队列数,就随机找一个mq,如果没有队列数,走无参的轮询。