当前位置: 首页 > news >正文

【RocketMQ 生产者和消费者】- 生产者发送同步、异步、单向消息源码分析(1)

文章目录

  • 1. 前言
  • 2. send 方法发送同步消息
  • 3. sendDefaultImpl 发送消息
  • 4. sendKernelImpl 发送同步、异步、单向消息
  • 5. sendMessage 发送消息
  • 6. 同步发送 sendMessageSync
    • 6.1 invokeSyncImpl 同步调用
  • 7. 异步发送 sendMessageAsync
    • 7.1 invokeAsyncImpl 异步调用
  • 8. 单向发送 invokeOneway
    • 8.1 invokeOnewayImpl 单向调用
  • 9. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息

上面几篇文章我们介绍了生产的启动流程,这篇文章就来介绍下生产者发送消息的流程,首先就是最常见的同步消息了。


2. send 方法发送同步消息

同步消息会通过 send 方法来发送,需要传入的参数就是 Message,下面就是源码的入口。

@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 设置消息要发送的 topicmsg.setTopic(withNamespace(msg.getTopic()));// 发送消息return this.defaultMQProducerImpl.send(msg);
}/*** 默认的同步发送消息的方法*/
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 同步发送的超时时间默认是 3sreturn send(msg, this.defaultMQProducer.getSendMsgTimeout());
}/*** 同步发送消息的方法* @param msg* @param timeout 发送超时时间,默认 3s* @return* @throws MQClientException* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

可以看到里面层层调用之后,最终调用到 sendDefaultImpl 方法,且默认的同步方法超时时间是 3s。


3. sendDefaultImpl 发送消息

sendDefaultImpl 是消息发送的逻辑,消息发送包括同步、异步、单向,都是走的这个方法,由于内容比较多,所以直接慢慢看。

在发送之前要做一些预校验工作,首先就是确保确保生产者服务状态是正常的,也就是 RUNNING 状态,否则不能发送消息,也就是 makeSureStateOK

// 1.确保生产者服务状态是正常的,也就是 RUNNING 状态
this.makeSureStateOK();private void makeSureStateOK() throws MQClientException {if (this.serviceState != ServiceState.RUNNING) {throw new MQClientException("The producer service state not OK, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}
}

然后校验要发送的消息的合法性,消息的校验主要分为下面的校验逻辑:

  • 首先要发送的消息不能为空,如果为空就抛出 MESSAGE_ILLEGAL 异常,异常提示是 the message is null

  • 确保 topic 是否合法。

    • topic 不能为空,如果为空就抛出异常 MQClientException。
    • 如果 topic 长度超过 127,就抛出异常 MQClientException。
    • topic 不能包含除了 “[%|a-zA-Z0-9_-]+” 这些字符除外的其他字符,否则抛出异常 MQClientException。
  • 看下发送的消息的 topic 是不是系统内部使用的 topic,如果是也不允许发送,目前只判断了是不是 SCHEDULE_TOPIC_XXXX。

  • 消息体内容也不能为空,为空抛出 MQClientException 异常。

  • 消息大小不能超过 4M,如果超过就抛出 MQClientException 异常。

// 2.校验要发送的消息的合法性
Validators.checkMessage(msg, this.defaultMQProducer);/*** 检查要发送的消息的合法性* @param msg* @param defaultMQProducer* @throws MQClientException*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {// 首先是消息不能为空if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}// 校验 topic 合法性Validators.checkTopic(msg.getTopic());// 看下发送的消息的 topic 是不是系统内部使用的 topic, 如果是也不允许发送, 目前只判断了 SCHEDULE_TOPIC_XXXXValidators.isNotAllowedSendTopic(msg.getTopic());// 消息体不能为空if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}// 消息体内容也不能为空if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}// 发送的消息大小不能超过 4Mif (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());}
}

接下来开始发送,发送需要获取到要发送的 topic 配置信息,所以首先需要通过 tryToFindTopicPublishInfo 来获取。

// 3.获取消息要发送的 topic 的配置消息,用来发送消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

如果能查询到 topic 配置信息,并且这个 topic 下面的 MessageQueue 队列是不为空的,也就是有队列可以发送消息,这种情况下就可以接着发送。

if (topicPublishInfo != null && topicPublishInfo.ok()) {...
}

发送前,首先计算出发送消息的总最大发送次数,也就是失败重试次数,同步发送是 1+2 = 3 次,其中默认重试 2 次,异步发送是 1 次,也就是异步发送重试次数是 1。

// 4.计算发送消息的总最大发送次数
//   - 同步发送是 1+2 = 3 次,其中默认重试 2 次
//   - 异步发送是 1 次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
// 每一次发送的时候发送到哪个 broker
String[] brokersSent = new String[timesTotal];

然后循环遍历发送次数来发送,什么情况下会重新发送呢?当抛出异常的时候。

// 5.循环发送,最大次数是 timesTotal
for (; times < timesTotal; times++) {// 上一次发送的 brokerName,如果是空就是第一次选择String lastBrokerName = null == mq ? null : mq.getBrokerName();// 6.选择一个消息队列发送消息,这个消息将会存入这个消息队列的 MappedFileQueue 中MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {// 设置本次要发送的消息队列所属的 brokerNamemq = mqSelected;brokersSent[times] = mq.getBrokerName();try {... } catch (RemotingException e) {// RemotingException 异常会重试endTimestamp = System.currentTimeMillis();// 更新延迟故障容错集合this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {// MQClientException 异常会重试endTimestamp = System.currentTimeMillis();// 更新延迟故障容错集合this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {// MQBrokerException 异常endTimestamp = System.currentTimeMillis();// 更新延迟故障容错集合this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;// 如果返回的状态码属于一下几种,则支持重试:// ResponseCode.TOPIC_NOT_EXIST,// ResponseCode.SERVICE_NOT_AVAILABLE,// ResponseCode.SYSTEM_ERROR,// ResponseCode.NO_PERMISSION,// ResponseCode.NO_BUYER_ID,// ResponseCode.NOT_IN_CURRENT_UNITif (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}
}

首先来看下下面的大体发送流程,首先 lastBrokerName 代表了上一次发送的 brokerName,当然选择了这个 brokerName 也不代表一定要给这个 broker 集群发送,只是说获取了 brokerName 之后会有一个兜底的策略,RocketMQ 提供了发送的故障延时策略,这个 brokerName 会作为一个兜底的配置。selectOneMessageQueue 的逻辑可以看这篇文章:【RocketMQ 生产者和消费者】- 生产者发送故障延时策略。

然后将发送的 broker 的 brokerName 设置到 brokersSent 队列中,brokersSent 代表第 times 发送的 broker 的 brokerName 是什么。如果发送失败有异常,那么使用 updateFaultItem 更新故障容错集合,注意这里设置的 isolation 是 true,在文章 《【RocketMQ 生产者和消费者】- 生产者发送故障延时策略》 中我们也说过,如果 isolation = true,那么这个 broker 固定 30s 内不可用。

在处理异常的时候有一个异常比较特殊,就是 MQBrokerException,如果是出现了下面几种异常:

  • ResponseCode.TOPIC_NOT_EXIST:找不到 topic 信息
  • ResponseCode.SERVICE_NOT_AVAILABLE:服务不可用
  • ResponseCode.SYSTEM_ERROR:系统错误,比较泛
  • ResponseCode.NO_PERMISSION:没有权限,比如这个 topic 下面的队列不允许对队列读写
  • ResponseCode.NO_BUYER_ID:现在还没用到
  • ResponseCode.NOT_IN_CURRENT_UNIT:也没找到用的地方

出现这几种异常,就可以继续重传,否则如果是 sendResult 返回结果不为空了同时又出现异常就直接结束了,而且这里的重传上面也可以看到同步发送情况下消息发送次数是 3 次,异步是 1 次,而 for 循环重传就意味着重新调用 selectOneMessageQueue,因此重传会重新选择一个 broker,因此重传就代表有可能发送到不同的 broker 上。

回到源码,继续往下看,在调用 sendKernelImpl 发送前需要校验下是否超时了,如果超时了就直接退出 for 循环,并设置标记 callTimeout = true,最后抛出异常 RemotingTooMuchRequestException。

// 发送的起始时间
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {// 如果还有发送次数,使用命名空间重置要发送的 topicmsg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 判断下在发起调用之前是不是超时了,如果超时了那么就算还剩下重试次数也不发送
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {callTimeout = true;break;
}

然后调用 sendKernelImpl 发送同步、异步、单向消息,并且根据不同发送模式走不同的处理结果,如果是异步和单向直接返回 null,不需要等待返回结果,如果是同步模式则需要等待返回结果并处理是否发送成功。

// 7.核心的发送逻辑,同步、异步、单向发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// 发送结束的时间
endTimestamp = System.currentTimeMillis();
// 8.更新本地的延迟故障容错列表
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
// 异步和单向模式直接返回空
case ASYNC:return null;
case ONEWAY:return null;
case SYNC:// 同步模式,retryAnotherBrokerWhenNotStoreOK 意思是如果发送失败了但是支持重试发送消息到其他 brokerif (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {// 继续发送continue;}}// 发送成功就返回return sendResult;
default:break;

4. sendKernelImpl 发送同步、异步、单向消息

sendKernelImpl 是同步、异步、单向消息发送的公共方法,下面是方法的参数。

/*** 同步、异步、单向发送消息* @param msg               发送的消息* @param mq                发送到哪个队列* @param communicationMode 发送模式(同步、异步、单向)* @param sendCallback      发送的回调* @param topicPublishInfo  topic 信息* @param timeout           超时时间* @return 发送结果*/
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {...
}

首先就是获取要发送的 broker 地址,既然选择了队列,那么这个队列所在的 broker 就是要发送的 broker。

// 起始时间
long beginStartTime = System.currentTimeMillis();
// 1. 先根据 brokerName 获取这个集群下面的主节点
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {// 如果获取不到地址,从 NameServer 中查找指定的 topic 下面的路由信息tryToFindTopicPublishInfo(mq.getTopic());// 然后再次根据 brokerName 获取这个集群下面的主节点brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

接着判断下发送的时候是否需要用 VIP 通道来发送,如果需要就生成 VIP 端口地址,如果当前 broker 地址和监听的端口是 127.0.0.1:10911,那么 broker VIP 通道就是 127.0.0.1:10909,也就是端口 - 2,其他地址不变。

// 2.生成 VIP 通道,VIP 通道的端口就是 broker 监听的端口地址 -2
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

接下来就处理要发送的消息了,首先获取下没有压缩过的消息。

// 消息体,这里是没有压缩过的消息
byte[] prevBody = msg.getBody();

接着给消息生成唯一 ID,也就是消息的 UNIQ_KEY 属性,属于客户端生产者生成的唯一 ID 了,从逻辑上代表一条唯一消息。

// 3.如果不是批量消息,就根据消息生成唯一 ID,也就是消息的 UNIQ_KEY 属性,属于客户端生成的唯一 ID 了,从逻辑上代表一条唯一消息
if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);
}

然后尝试去压缩消息,如果压缩成功就设置下标记 COMPRESSED_FLAG 标识消息已被压缩。

// 消息的标识符,用于标识这条消息的属性
int sysFlag = 0;
// 消息是否压缩了
boolean msgBodyCompressed = false;
// 4.尝试压缩消息
if (this.tryToCompressMessage(msg)) {// 如果压缩成功了,设置下标记 COMPRESSED_FLAG 标识这条消息被压缩过了sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;
}

压缩的逻辑并不复杂,但是如果需要消息压缩,就需要消息长度超过 4k,才会用 JDK 的 DeflaterOutputStream 去压缩,批量消息现在不支持压缩。

/*** 尝试去压缩消息* @param msg* @return*/
private boolean tryToCompressMessage(final Message msg) {if (msg instanceof MessageBatch) {//batch dose not support compressing right nowreturn false;}// 消息体byte[] body = msg.getBody();if (body != null) {// 如果消息长度超过了 4Kif (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {try {// 使用 JDK 的 DeflaterOutputStream 去压缩byte[] data = UtilAll.compress(body, zipCompressLevel);if (data != null) {// 设置压缩后的数据msg.setBody(data);return true;}} catch (IOException e) {log.error("tryToCompressMessage exception", e);log.warn(msg.toString());}}}return false;
}

接下来处理事务消息,如果是发的事务消息,就给消息加上 TRANSACTION_PREPARED_TYPE 的标记,标记这条消息是一条 PREPARE 状态的消息。

// 5. 获取事务消息的标记,这里就是获取消息中的 TRAN_MSG 属性
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {// 如果设置了事务标记,那么这里就设置下消息的标识,加上 TRANSACTION_PREPARED_TYPE,表示该消息是一条 PREPARE// 状态的事务消息sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

接下来判断下是否存在 CheckForbiddenHook 钩子,如果存在就执行里面的 checkForbidden 方法,在消息的发送、拉取等关键操作执行前,通过 CheckForbiddenHook 可以对客户端的操作权限进行验证,比如,判断某个客户端是否有向指定主题发送消息的权限,或者是否有从指定队列拉取消息的权限。除了权限检查,还可以根据自定义的业务规则对操作进行拦截。例如,限制某些时间段内某个主题的消息发送量,或者禁止特定客户端在特定场景下的消息操作。

// 6. 判断下是否存在 CheckForbiddenHook 钩子,如果存在就执行里面的 checkForbidden 方法
//    6.1 在消息的发送、拉取等关键操作执行前,通过 CheckForbiddenHook 可以对客户端的操作权限进行验证,比如,判断某个客
//        户端是否有向指定主题发送消息的权限,或者是否有从指定队列拉取消息的权限。
//    6.2 除了权限检查,还可以根据自定义的业务规则对操作进行拦截。例如,限制某些时间段内某个主题的消息发送量,或者禁止特定
//        客户端在特定场景下的消息操作。
if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();// 设置 NameServer 地址checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());// 设置生产者组checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());// 设置发送模式checkForbiddenContext.setCommunicationMode(communicationMode);// 设置要发送的 broker 地址checkForbiddenContext.setBrokerAddr(brokerAddr);// 设置发送的消息checkForbiddenContext.setMessage(msg);// 设置消息发送到哪个队列checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());// 调用这些钩子的 checkForbidden 方法this.executeCheckForbiddenHook(checkForbiddenContext);
}

接下来再判断如果存在 SendMessageHook,就调用 SendMessageHook#sendMessageBefore 方法,算是发送前的钩子方法。

// 7. 如果存在 SendMessageHook,就调用 sendMessageBefore 方法,算是发送前的钩子方法
if (this.hasSendMessageHook()) {context = new SendMessageContext();// 设置生产者context.setProducer(this);// 设置生产者组context.setProducerGroup(this.defaultMQProducer.getProducerGroup());// 设置发送模式(同步、异步、单向)context.setCommunicationMode(communicationMode);// 设置消息产生的地址,就是生产者地址context.setBornHost(this.defaultMQProducer.getClientIP());// 设置 broker 地址context.setBrokerAddr(brokerAddr);// 设置发送的消息context.setMessage(msg);// 设置要发送的队列context.setMq(mq);// 设置命名空间context.setNamespace(this.defaultMQProducer.getNamespace());// 获取 TRAN_MSG 属性,判断是不是一个 PREPARE 消息String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {// 当前消息类型是半消息context.setMsgType(MessageType.Trans_Msg_Half);}// 下面是延时消息的设置if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}// 调用 SendMessageHook#sendMessageBefore 方法this.executeSendMessageHookBefore(context);
}

前置工作都做完之后,开始设置消息发送的请求头。

// 8.设置发送消息的请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// topic
requestHeader.setTopic(msg.getTopic());
// 默认 topic(TBW102)
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// topic 默认的消息队列数量,默认是 4
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 设置要发送的队列 ID
requestHeader.setQueueId(mq.getQueueId());
// 设置消息标记
requestHeader.setSysFlag(sysFlag);
// 设置消息生成的时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 设置消息 flag
requestHeader.setFlag(msg.getFlag());
// 设置消息的一系列属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 消息重新消费的次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否是批量消息
requestHeader.setBatch(msg instanceof MessageBatch);
// 如果是重传消息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 获取消息重新消费次数属性值String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {// 重新设置消息重新消费的次数requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));// 清除这个属性MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}// 获取消息的最大重试次数String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {// 重新设置消息最大重试次数requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));// 清除该属性MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}
}

接下来发送阶段,根据不同模式走不同发送逻辑,分别是同步、异步、单向。

// 9.开始发送,根据不同的发送模式
SendResult sendResult = null;
switch (communicationMode) {/*** 9.1 异步发送*/case ASYNC:.../*** 10.单向发送、同步发送*/case ONEWAY:case SYNC:...}

可以看到上面其实 ONEWAY 和 SYNC 用的是同一个逻辑,而 ASYNC 异步发送时单独的逻辑,那么就先来看下同步和单向的。

// 10.1 发送消息前进行超时检查,如果超时就抛出异常
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 10.2 发送单向、同步消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);
break;

同步和单向都是检查下是否超时了,如果没有超时才调用 sendMessage 发送消息。下面再来看下异步发送的逻辑。

Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {// If msg body was compressed, msgbody should be reset using prevBody.// Clone new message using commpressed message body and recover origin massage.// Fix bug:https://github.com/apache/rocketmq-externals/issues/66// 下面是克隆一份消息出来进行发送,所以 tmpMessage 就是后面发送用到的消息,克隆出来之后就可以把原来的消// 息给还原了,也就是将 msg 里面的消息还原成没有压缩之前的。// 那么在这个方法的 finally 里面也还原了,就是 msg.setBody(prevBody),为什么这里还要还原呢?因为异步// 发送的重试是在下面 sendMessage 内部的 onExceptionImpl 来重试的,但是我们知道异步发送是不会等待返回// 结果的,这就有一个问题了,如果在第一次发送之后调用了这个方法的 finally 还原了消息,那么重传的时候// tmpMessage = MessageAccessor.cloneMessage(msg) 克隆出来的消息是没有压缩过的,发送的时候就会发送// 没有压缩过的消息了,所以这里要在克隆之后立刻通过 `msg.setBody(prevBody)` 还原。tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);
}// 如果 topic 使用了命名空间
if (topicWithNamespace) {// 下面就是使用命名空间//<br/>////#  2. send 方法发送同步消息重新处理 topicif (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}// 发送消息前进行超时检查,如果超时就抛出异常
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 9.2 异步发送的核心逻辑
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,// 异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);
break;

异步发送比较特殊,在发送之前需要克隆一份消息出来进行发送,就是 tmpMessage,克隆出来之后重新设置 msg.setBody(prevBody),这一步是还原原始消息的消息体,因为 prevBody 是没有压缩过的。为什么要还原呢?

前面我们说过异步可以重试,重试意思是往一个 broker 里面继续发送,但是像异步发送这种直接发送完之后 sendKernelImpl 就会执行 finally 方法,finally 里面就会还原消息。
在这里插入图片描述
而异步消息的重试不是在 sendKernelImpl 里面完成的,是在后面更里面的发送方法去重试的,这就导致了如果异步发送直接用的 msg,之后在 finally 里面消息被还原,这种情况下出现了异常重试,用的 msg 就是没有压缩的消息,这就有问题了,比如第一次发送的消息是压缩的,后面重试发送的消息就是没有压缩过的。因此需要克隆一个 tmpMessage 来发送,这样就算 msg 里面的消息被还原了,发送失败重传用的也是 tmpMessage 来重试。


5. sendMessage 发送消息

上面同步异步发送消息,最终都会走到 sendMessage,这个是三种发送模式最终都会调用到的,所以来看下这个方法,注意下消息发送的请求 Code 是 SEND_REPLY_MESSAGE。

/*** 同步、异步、单向发送消息* @param addr* @param brokerName* @param msg* @param requestHeader* @param timeoutMillis* @param communicationMode* @param sendCallback* @param topicPublishInfo* @param instance* @param retryTimesWhenSendFailed* @param context* @param producer* @return* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public SendResult sendMessage(final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {// 起始时间long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;// 1.首先处理下回复类型的消息,在 RocketMQ 的请求 - 应答模式里,生产者发送一条请求消息,消费者在接收到该请求消息并处理完相应//   业务逻辑后,会发送一条 reply 消息作为响应反馈给生产者,以此完成一次完整的请求 - 应答交互过程String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);if (isReply) {if (sendSmartMsg) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);} else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);}} else {if (sendSmartMsg || msg instanceof MessageBatch) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {// 普通单条消息会走这里,消息 Code 是 SEND_MESSAGErequest = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}}request.setBody(msg.getBody());/*** 2.根据不同类型发送消息*/switch (communicationMode) {case ONEWAY:// - 客户端发送请求到服务器// - 服务器处理请求// - 服务器向客户端返回应答// OneWay 一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应// 用,此类应用可以采用 oneway 形式调用,oneway 形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统// 系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒级this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:// 异步发送异常的次数final AtomicInteger times = new AtomicInteger();// 如果在发送前已经超时了,就抛出异常long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}// 异步发送的核心逻辑this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:// 同步发送,先检查超时long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}// 同步发送return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;
}

6. 同步发送 sendMessageSync

/*** 同步发送核心逻辑* @param addr* @param brokerName* @param msg* @param timeoutMillis* @param request* @return* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response, addr);
}

可以看到同步发送的逻辑就是最终调用 invokeSync,下面来看下这个方法。

/*** 执行发送消息给远端地址 addr 的流程* @param addr 远程调用地址* @param request 发送的请求* @param timeoutMillis 超时时间* @return* @throws InterruptedException* @throws RemotingConnectException* @throws RemotingSendRequestException* @throws RemotingTimeoutException*/
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 起始时间long beginStartTime = System.currentTimeMillis();// 获取和 broker 的连接通道,如果不存在就建立一个final Channel channel = this.getAndCreateChannel(addr);// 如果通道还是活跃的if (channel != null && channel.isActive()) {try {// 调用 RPC 钩子的 doBeforeRpcHooks 方法doBeforeRpcHooks(addr, request);// 消耗时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 如果已经超过超时时间,抛出异常throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");}// 执行同步的远程调用方法,获取调用结果,同步调用的超时时间是 3sRemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 调用 RPC 钩子的 doAfterRpcHooks 方法doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// 如果连接通道不存在或者说已经关闭了,就调用 closeChannel 方法,并抛出异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}

首先获取下要发送的 broker 的连接通道,然后如果连接通道没有关闭并且还是连接着的状态,然后再调用 RPC 钩子的 doBeforeRpcHooks 方法,判断消耗时间是否已经超时了,如果超时就抛出异常,接着再调用同步的远程调用方法,获取调用结果,同步调用的超时时间是 3s。


6.1 invokeSyncImpl 同步调用

/*** 同步执行远程请求** @param channel* @param request* @param timeoutMillis* @return* @throws InterruptedException* @throws RemotingSendRequestException* @throws RemotingTimeoutException*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {// 请求 IDfinal int opaque = request.getOpaque();try {// 将请求 ID 和 ResponseFuture 存储到 responseTable 集合中final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 发送请求,同时设置监听器channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 如果发送成功了,那么设置发送结果为 trueresponseFuture.setSendRequestOK(true);return;} else {// 如果发送失败了,那么设置发送结果为 falseresponseFuture.setSendRequestOK(false);}// 发送失败需要将这个请求从 responseTable 删掉responseTable.remove(opaque);responseFuture.setCause(f.cause());// 设置返回结果是空responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});// 这里就是阻塞等待请求的响应了RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {// 超时了throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 没有超时,发送失败了throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 请求的返回结果return responseCommand;} finally {// 最后将请求从 responseTable 中删掉this.responseTable.remove(opaque);}
}

首先将请求 ID 和 ResponseFuture 存储到 responseTable 集合中,用于标识一个请求,然后发送同步请求,发送完成之后调用里面的 operationComplete 设置 sendRequestOK = true,表示发送成功,当然发送失败了就会发送请求删掉。之所以有这么一个集合存储发送请求,是因为生产者启动的时候也会定时去扫描这个集合,处理里面过期的请求。最后就是阻塞等待请求的响应了,如果是发送成功但是没有按时响应,那么就抛出 RemotingTimeoutException 异常表示超时,如果是发送就失败了,这种情况下就会抛出异常 RemotingSendRequestException,不过这里如果是发送失败了,应该可以直接抛出异常,不需要再阻塞 timeoutMillis,如果这么写就是不管发送成功没有都需要阻塞 timeoutMillis 这么长的时间才能返回。


7. 异步发送 sendMessageAsync

第 6 小节介绍了同步调用,这里我们就再来看下异步调用 sendMessageAsync 的逻辑。

/*** 异步发送的核心逻辑* @param addr                          broker 地址* @param brokerName                    broker 名称* @param msg                           要发送的消息* @param timeoutMillis                 超时时间* @param request                       发送的消息请求* @param sendCallback                  发送回调* @param topicPublishInfo              发送的 topic 信息* @param instance                      客户端实例* @param retryTimesWhenSendFailed      发送失败之后可以重新发送多少次* @param times                         发送失败的次数* @param context                       消息发送上下文,可以用来回调一些钩子方法* @param producer                      生产者* @throws InterruptedException* @throws RemotingException*/
private void sendMessageAsync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final AtomicInteger times,final SendMessageContext context,final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {// 起始时间final long beginStartTime = System.currentTimeMillis();try {// 通过远程客户端来发送异步方法this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {// 消耗的时间long cost = System.currentTimeMillis() - beginStartTime;// 响应结果RemotingCommand response = responseFuture.getResponseCommand();// 如果 sendCallback 为空并且 response 不为空,如果是异步发送那么就必须要设置 sendCallback,所以这里就不会是异步的回调if (null == sendCallback && response != null) {try {// 这里就是根据 response 构建发送结果SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);if (context != null && sendResult != null) {// 将发送结果设置到上下文context.setSendResult(sendResult);// 同时回调 SendMessageHook 的 sendMessageAfter 方法context.getProducer().executeSendMessageHookAfter(context);}} catch (Throwable e) {}// 更新延迟故障容错集合producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);return;}if (response != null) {try {// 异步回调会走这里,同样和上面差不多,根据 response 构建发送结果SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);assert sendResult != null;if (context != null) {// 给上下文设置发送结果context.setSendResult(sendResult);// 同时回调 SendMessageHook 的 sendMessageAfter 方法context.getProducer().executeSendMessageHookAfter(context);}try {// 回调 sendCallback#onSuccess 方法sendCallback.onSuccess(sendResult);} catch (Throwable e) {}// 更新延迟故障容错集合producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);} catch (Exception e) {// 异常的情况下也会更新延迟故障容错集合producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);// 再次重试onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, e, context, false, producer);}} else {// 这里就是出问题了,返回结果为空,更新延迟故障容错集合producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);if (!responseFuture.isSendRequestOK()) {// 走异常重试逻辑MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else if (responseFuture.isTimeout()) {// 走异常重试逻辑MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",responseFuture.getCause());onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);} else {// 走异常重试逻辑MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);}}}});} catch (Exception ex) {// 出现异常,走异常逻辑,记录下发送的花费时间long cost = System.currentTimeMillis() - beginStartTime;// 这里 updateFaultItem 的时候设置了 isolation 标记为 true 表示当前 broker 的不可用时间是固定为 30sproducer.updateFaultItem(brokerName, cost, true);// 异常处理逻辑onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);}
}

既然是异步发送,肯定有一个方法能让生产者消费者收到 broker 的响应结果时就进行回调,所以这里的 InvokeCallback 就是用于回调异步方法的,当然这篇文章就简单看下这里的处理逻辑,篇幅有点长,留到下一篇文章再介绍异步如何处理回调的。


7.1 invokeAsyncImpl 异步调用

/*** 异步发送请求** @param channel        连接通道* @param request        发送的请求* @param timeoutMillis  发送的超时时间* @param invokeCallback 执行回调* @throws InterruptedException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 起始时间long beginStartTime = System.currentTimeMillis();// 请求 IDfinal int opaque = request.getOpaque();// 根据信号量来获取信号,判断当前还能不能发送异步消息,获取的超时时间是 timeoutMillis,默认只能同时发送 65536 个异步请求boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {// 封装 semaphoreAsyncfinal SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);long costTime = System.currentTimeMillis() - beginStartTime;// 如果到这里已经超时了,那么释放当前信号,抛出异常if (timeoutMillis < costTime) {once.release();throw new RemotingTimeoutException("invokeAsyncImpl call timeout");}// 构建响应结果,添加到 responseTable 集合中final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);this.responseTable.put(opaque, responseFuture);try {// 通过通道将请求发送过去channel.writeAndFlush(request).addListener(new ChannelFutureListener() {// 发送完成回调@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 这里就是发送请求成功了,设置下 sendRequestOK 标记responseFuture.setSendRequestOK(true);return;}// 发送请求失败了requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {// 如果发生移除,那么也释放一个信号responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {// 这里是时间太少了同时有没有获取到信号可以发送请求if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {// 这里就是在超时时间内都没有获取到信号,抛出移除String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}

因为异步发送需要考虑到发送消息数量,所以需要通过 Semaphore 获取信号,RocketMQ 限制了同时发送异步请求量最大 65536。

然后判断是否超时,如果超时了就抛出 RemotingTimeoutException 异常,接着构建响应结果,添加到 responseTable 集合中,然后设置发送回调,如果发送成功了,就设置下 sendRequestOK 标记,跟同步发送不同,异步如果发送失败不单单会把请求从 responseTable 中删掉,还会回调 invokeCallback 函数。

/*** 发送请求失败的时候会调用这个方法,逻辑就是下面两个:* 1. 从 responseTable 中移除对应的请求* 2. 设置发送结果为 false、响应结果为空* 3. 回调 invokeCallback#operationComplete 方法** @param opaque*/
private void requestFail(final int opaque) {// 从 responseTable 中删掉对应请求的响应回调ResponseFuture responseFuture = responseTable.remove(opaque);if (responseFuture != null) {// 设置响应回调的状态responseFuture.setSendRequestOK(false);responseFuture.putResponse(null);try {// 回调 invokeCallback#operationComplete 方法executeInvokeCallback(responseFuture);} catch (Throwable e) {log.warn("execute callback in requestFail, and callback throw", e);} finally {// 最后释放一个信号responseFuture.release();}}
}

8. 单向发送 invokeOneway

从第 5 小结也可以看到,单向发送只是调用了 invokeOneway 方法,单向发送一次耗时是下面三个步骤的总和,但是单向步骤不接受服务端应答。

  • 客户端发送请求到服务器
  • 服务器处理请求
  • 服务器向客户端返回应答

OneWay 一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用 oneway 形式调用,oneway 形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒级。

/*** OneWay 调用* @param addr* @param request* @param timeoutMillis* @throws InterruptedException* @throws RemotingConnectException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException*/
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 获取连接通道final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {// 发送请求前调用 RPCHook#doBeforeRequest 方法doBeforeRpcHooks(addr, request);// 调用 OneWay 发送逻辑this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;}} else {// 关闭不活跃的连接通道this.closeChannel(addr, channel);// 抛出异常throw new RemotingConnectException(addr);}
}

8.1 invokeOnewayImpl 单向调用

/*** OneWay 发送消息** @param channel* @param request* @param timeoutMillis* @throws InterruptedException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 标记 OneWay 发送request.markOnewayRPC();// 获取发送许可boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {// 发送消息channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {// 发送完成,释放型号once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {// 这里就是没获取到许可if (timeoutMillis <= 0) {// 1.时间太短了throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {// 2.正常超时String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}

OneWay 调用也是需要通过 Semaphore 获取信号,RocketMQ 限制了同时发送单向请求量最大 65536。可以看到获取到发送许可之后直接就发送了,发送不管成功失败,只要请求写出了就直接释放信号,因为不关心响应结果,所以请求也不用设置到 responseTable 集合中。


9. 小结

好了,这篇文章就先到这,大概讲述了下生产者的同步、异步、单向发送的大体流程,下一篇文章接着这篇文章开始讲解剩下的发送流程。





如有错误,欢迎指出!!!!

相关文章:

  • 2025——》NumPy中的np.random.randn使用/在什么场景下适合使用np.random.randn?NumPy标准正态分布生成全解析
  • 平移坐标轴 +奇偶性 简化二重积分
  • ​​技术深度解析:《鸿蒙5.0+:AI驱动的全场景功耗革命》​
  • 微软常用运行库合集(VisualC++)2025.04.22
  • Json详解
  • MyBatis-Plus高级用法:最优化持久层开发
  • 6.1 数学复习笔记 23
  • 工作流引擎-09-XState 是一个 JavaScript 和 TypeScript 的状态管理库,它使用状态机和状态图来建模逻辑。
  • QT中子线程触发主线程弹窗并阻塞等待用户响应
  • Spring是如何实现属性占位符解析
  • 《汇编语言》第13章 int指令
  • 6个月Python学习计划 Day 11 - 列表推导式、内置函数进阶、模块封装实战
  • vscode 连接远程服务器
  • leetcode0404. 左叶子之和-easy
  • ROS仓库GPG签名密钥过期问题
  • DAY 36 超大力王爱学Python
  • 车辆检测算法在爆炸事故应急响应中的优化路径
  • 邮件验证码存储推荐方式
  • 安卓jetpack compose学习笔记-UI基础学习
  • 【Redis】笔记|第4节|Redis数据安全性分析
  • 金华网站建设/网站快速收录
  • tornado 网站开发/苹果被曝开发搜索引擎对标谷歌
  • 做测算的网站/镇江推广公司
  • 门户型网站/网络营销公司简介
  • 外贸b2b免费发布平台/兰州seo技术优化排名公司
  • 网站开发语言学习/体验营销是什么