当前位置: 首页 > news >正文

【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)

文章目录

  • 1. 前言
  • 2. DefaultMQProducer 构造器
  • 3. DefaultMQProducerImpl 构造器
  • 3. DefaultMQProducer#start 启动生产者
    • 3.1 withNamespace 包装生产者组
    • 3.2 getNamespace 获取命名空间
    • 3.3 从 NameServer 中解析出命名空间
    • 3.4 wrapNamespace 使用命名空间包装生产者组
      • 3.4.1 isSystemResource 判断是否是系统内置的 topic
      • 3.4.2 withOutRetryAndDLQ 获取原始 topic
    • 3.5 DefaultMQProducerImpl#start 启动生产者
      • 3.5.1 服务启动状态
      • 3.5.2 checkConfig 检查生产者配置
      • 3.5.3 changeInstanceNameToPID 修改实例名称
      • 3.5.4 getOrCreateMQClientInstance 创建 MQClientInstance
      • 3.5.5 registerProducer 注册生产者到本地缓存
      • 3.5.6 存储默认 topic 的配置到本地 topic 缓存 topicPublishInfoTable 中
  • 4. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息

在文章 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息 中介绍了 RocketMQ 的几种消息发送方式和消息的消费方法。
在这里插入图片描述
这个架构图中 NameServer 的启动和 Broker 的启动源码在前几篇文章已经探讨过了,那么这篇文章开始就从生产者的启动开始,一步步深入生产者和消费者的源码。


2. DefaultMQProducer 构造器

首先从生产者的构造器入手,也就是 DefaultMQProducer。

/**
* 生产者的构造器
* @param producerGroup
*/
public DefaultMQProducer(final String producerGroup) {this(null, producerGroup, null);
}/*** DefaultMQProducer 构造器** @param namespace 生产者的命名空间* @param producerGroup 生产者组* @param rpcHook rpc 远程调用的钩子*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {// 命名空间this.namespace = namespace;// 生产者组this.producerGroup = producerGroup;// 生产者defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

DefaultMQProducer 的构造器就是设置命名空间、生产者组,然后创建生产者实现类 DefaultMQProducerImpl。


3. DefaultMQProducerImpl 构造器

/*** 生产者实例,一个生产者对应一个 DefaultMQProducerImpl* @param defaultMQProducer* @param rpcHook*/
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {// DefaultMQProducer 实例this.defaultMQProducer = defaultMQProducer;// RPC 钩子,发送 RPC 请求的时候可以对发送的前后进行增强this.rpcHook = rpcHook;// 异步发送的阻塞队列this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);// 异步发送的线程池,默认就是当前的 CPU 核数this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());}});
}

DefaultMQProducerImpl 的构造器设置了四个属性:

  • DefaultMQProducer 实例
  • RPC 钩子,发送 RPC 请求的时候可以对发送的前后进行增强
  • 异步发送的阻塞队列
  • 异步发送的线程池,默认核心线程和最大线程是当前系统的 CPU 核数

3. DefaultMQProducer#start 启动生产者

/*** 启动生产者* @throws MQClientException*/
@Override
public void start() throws MQClientException {// 设置生产者组,生产者组是同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生// 产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费this.setProducerGroup(withNamespace(this.producerGroup));// 默认生产者实现启动this.defaultMQProducerImpl.start();// 消息轨迹追踪服务, 默认是空if (null != traceDispatcher) {try {// 记录启动信息traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
}

生产者的启动逻辑比较简单,可以看到核心还是调用 defaultMQProducerImpl.start,但是在启动默认生产者之前,如果设置了命名空间,还会通过命名空间包装生产者组。


3.1 withNamespace 包装生产者组

/*** 生产者组的生成方法* @param resource* @return*/
public String withNamespace(String resource) {// 使用命名空间包装传入的 resourcereturn NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}

实际上这个方法也不单单是对生产者组包装了,如果设置了命名空间,这个方法也会用于对 topic 包装,如下图。
在这里插入图片描述


3.2 getNamespace 获取命名空间

public String getNamespace() {// 检查命名空间是否已经初始化if (namespaceInitialized) {// 如果已经初始化,直接返回命名空间return namespace;}// 这里就是初始化不为空的命名空间了if (StringUtils.isNotEmpty(namespace)) {// 直接返回return namespace;}// namesrv 的地址如果设置了,一般来说我们创建生产者都会设置 namesrv 的地址// producer.setNamesrvAddr("localhost:9876");if (StringUtils.isNotEmpty(this.namesrvAddr)) {// 如果符合规定的格式if (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) {// 从 namesrv 中解析出命名空间,不过这里的 namesrvAddr 必须包含 MQ_INST_,可以看里面的正则表达式namespace = NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);}}// 初始化成功namespaceInitialized = true;// 这里我们一般就是返回 namespace,因为 namesrvAddr 也很少会设置成上面这种正则表达式规定的格式return namespace;
}

首先就是获取命名空间,获取命名空间的方法中会判断命名空间是否已经初始化,这里的初始化标记 namespaceInitialized 会在 setNamespace 中设置为 true,也就是说如果通过生产者设置了 namespace 属性,就会初始化。
在这里插入图片描述
因此如果用户自己设置了 namespace 属性,这个方法就会返回设置的 namespace 属性,如果没有初始化呢,就从 namesrvAddr 连接地址中解析出命名空间,然后将 namespaceInitialized 设置为 true,所以 namespace 的来源有两个:

  • 通过 setNamespace 方法设置
  • 从 NameServer 连接地址中解析出来

下面就看下如何从 NameServer 中解析出命名空间。


3.3 从 NameServer 中解析出命名空间

getNamespace 方法中的 namesrvAddr 也是在创建出生产者的时候就设置的。如果需要从 NameServer 连接地址中解析出 namespace,就需要判断下连接地址是否符合特定的正则表达式。

/*** 正则表达式匹配* @param endpoint* @return*/
public static boolean validateInstanceEndpoint(String endpoint) {return INST_ENDPOINT_PATTERN.matcher(endpoint).matches();
}

可以看到这个正则表达式如下所示:

public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");

其中 ENDPOINT_PREFIX 和 INSTANCE_REGEX 如下所示:

public static final String INSTANCE_PREFIX = "MQ_INST_";// 以固定前缀 MQ_INST_ 开头,同时格式是 MQ_INST_ + + _ +
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";// endpoint 的匹配规则
// \\w+://:匹配一个或多个单词字符(字母、数字或下划线),后面跟着 ://,例如 http://、https://、ftp:// 等
// |:表示或者空串
// 允许匹配的字符串可以有协议前缀,也可以没有
public static final String ENDPOINT_PREFIX = "(\\w+://|)";

通过上面的正则表达式大概可以看出符合条件的格式是:前缀是 ENDPOINT_PREFIX,中间是 INSTANCE_REGEX 格式,结尾以 .* 结尾,* 是任意字符串,比如:

  • http://MQ_INST_abc_def.example.com
  • MQ_INST_123_xyz.test.org
  • MQ_INST_123_xyz.
  • MQ_INST_123_xyz.test

当 NameServer 符合正则表达式的校验,就会调用 parseInstanceIdFromEndpoint 方法获取出 namespace。

/*** 从 endpoint 中解析出实例 ID* @param endpoint* @return*/
public static String parseInstanceIdFromEndpoint(String endpoint) {if (StringUtils.isEmpty(endpoint)) {return null;}// 假设传入的是 http://MQ_INST_abc_def.example.com,那么结果就是 MQ_INST_abc_defreturn endpoint.substring(endpoint.lastIndexOf("/") + 1, endpoint.indexOf('.'));
}

可以看出来,这里获取的逻辑就是从第一个 / 开始截取,直到 . 结尾,比如 http://MQ_INST_abc_def.example.com,截取出来的结果就是 MQ_INST_abc_def。


3.4 wrapNamespace 使用命名空间包装生产者组

/*** 包装命名空间* @param namespace 命名空间,一般都是空* @param resourceWithOutNamespace producer 初始化的时候传入的 producerGroup* @return*/
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {// 这里如果 namespace 和 resourceWithOutNamespace 有一个为空会直接返回 resourceWithOutNamespaceif (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {return resourceWithOutNamespace;}// 这里就是两个参数都不为空// 1.判断是不是系统内置的一些 topic 或者 group// 2.判断是不是已经包括了 namespaceif (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {// 满足也直接返回return resourceWithOutNamespace;}// 将 resourceWithOutNamespace 前置的 %RETRY% 或者 %DLQ% 给去掉,获取原始 topicString resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);StringBuilder stringBuilder = new StringBuilder();// 重传 topic 拼接上 %RETRY% 前缀if (isRetryTopic(resourceWithOutNamespace)) {stringBuilder.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);}// 死信 topic 拼接上 %DLQ% 前缀if (isDLQTopic(resourceWithOutNamespace)) {stringBuilder.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);}// 这里相当于在 resourceWithoutRetryAndDLQ 前面拼接上一个 namespace// 比如 %RETRY%namespace%resourceWithoutRetryAndDLQ// 比如 %DLQ%namespace%resourceWithoutRetryAndDLQreturn stringBuilder.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();}

3.4.1 isSystemResource 判断是否是系统内置的 topic

这个方法就是用于判断 resourceWithOutNamespace 是否是系统内置的 topic,系统内置 topic 就是内部使用的一些 topic,比如延时消息 topic-SCHEDULE_TOPIC_XXXX,系统默认 topic-TBW102 …

/*** 是否是系统相关的资源* @param resource* @return*/
private static boolean isSystemResource(String resource) {// 不能为空if (StringUtils.isEmpty(resource)) {return false;}// 1. 判断传进来的 source 是不是系统相关的 topic//    public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable//    public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";//    public static final String RMQ_SYS_BENCHMARK_TOPIC = "BenchmarkTest";//    public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";//    public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";//    public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";//    public static final String RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";//    public static final String RMQ_SYS_SELF_TEST_TOPIC = "SELF_TEST_TOPIC";//    public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";//// 2. 或者说这个 resource 是系统消费者组,以 CID_RMQ_SYS_ 开头if (TopicValidator.isSystemTopic(resource) || MixAll.isSysConsumerGroup(resource)) {return true;}return false;
}

3.4.2 withOutRetryAndDLQ 获取原始 topic

对于死信队列 topic 和重试 topic,要获取原始 同批次,就需要将前置的 %RETRY% 或者 %DLQ% 给去掉。

/*** 获取原始 topic* @param originalResource* @return*/
private static String withOutRetryAndDLQ(String originalResource) {if (StringUtils.isEmpty(originalResource)) {// 传入的参数为空return STRING_BLANK;}if (isRetryTopic(originalResource)) {// 重传 topic, 截取 %RETRY% 后面的内容return originalResource.substring(RETRY_PREFIX_LENGTH);}if (isDLQTopic(originalResource)) {// 死信 topic, 截取 %DLQ% 后面的内容return originalResource.substring(DLQ_PREFIX_LENGTH);}return originalResource;
}

3.5 DefaultMQProducerImpl#start 启动生产者

经过上面使用 namespace 对生产者组的处理, 下一步就是启动生产者。

/*** 启动生产者* @param startFactory 是否启动 MQClientInstance* @throws MQClientException*/
public void start(final boolean startFactory) throws MQClientException {// 生产者的服务状态,默认是 CREATE_JUST,也就是刚创建出来还没有启动switch (this.serviceState) {// 1.服务启动状态case CREATE_JUST:// 先设置成 START_FAILEDthis.serviceState = ServiceState.START_FAILED;// 检查配置信息this.checkConfig();// CLIENT_INNER_PRODUCER 是 MQClientInstance#DefaultMQProducer 的生产者组,算是一个内部的 group,这个生产// 者会用来做一些内部的操作,比如当消息消费失败的时候发送消息到重试队列if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {// 这里就是修改当前 Producer 里面的 instanceName 改成进程 PIDthis.defaultMQProducer.changeInstanceNameToPID();}// 获取 MQClientInstance,在里面会根据 clientId 获取或者创建 MQClientInstance 实例,注意这里如果生产者和消费者// 运行在同一个进程中,比如启动一个 jar 包,这个 jar 包里面生产者消费者会共用一个 MQClientInstance,因为对于生产者// 和消费者都会有一些公共的操作,比如向 broker 发送心跳,拉取最新 NameServer 等等,同时在里面也封装了一些发送和消费// 的 API,所以总的来说就是生产者和消费者的很多逻辑都会通过这个 MQClientInstance 来完成this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 将当前生产者注册到 MQClientInstance 的 producerTable 中,key 是生产者组,这里也透露出一件事,一个进程里面的生// 产者必须都是不同生产者组的,如果说一个进程里面有多个相同生产者组的生产者,那么就会报错boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {// 这里就是发现了有多个 DefaultMQProducer 往一个生产者组里面配置this.serviceState = ServiceState.CREATE_JUST;// 排除异常throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 存储 TBW102 -> TopicPublishInfo 存储到 topicPublishInfoTable 中,TopicPublishInfo 存储了 topic 默认的// 路由消息,这里面存储了 topic 的相关信息,比如 topic 下面的队列集合,topic 存储的 broker 信息,broker 的集群信息 ...this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());// 启动 MQClientInstance 客户端实例if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());// 标记服务状态是 RUNNINGthis.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 立即发送心跳到所有 brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 启动定时任务,移除超时的请求并且执行异步回调RequestFutureHolder.getInstance().startScheduledTask(this);}

3.5.1 服务启动状态

上面的源码可以看到首先会判断下生产者的服务状态,生产者或者是消费者都有四个服务状态,代码如下。

public enum ServiceState {/*** Service just created,not start*/CREATE_JUST,/*** Service Running*/RUNNING,/*** Service shutdown*/SHUTDOWN_ALREADY,/*** Service Start failure*/START_FAILED;
}
  • CREATE_JUST:就是生产者或者消费者刚创建出来,并没有启动,也就是初始化就是这个状态。
  • RUNNING:当生产者或者消费者调用 start 之后就会将状态修改成 RUNNING,表示服务正在运行。
  • SHUTDOWN_ALREADY:当生产者或者消费者调用 shutdown 关闭服务之后状态就会被修改成 SHUTDOWN_ALREADY,表示服务关闭了。
  • START_FAILED:服务启动失败。

3.5.2 checkConfig 检查生产者配置

/*** 检查生产者的配置* @throws MQClientException*/
private void checkConfig() throws MQClientException {// 校验生产者组属性是否合法(长度合法、字符合法)Validators.checkGroup(this.defaultMQProducer.getProducerGroup());// 生产者组不能为空if (null == this.defaultMQProducer.getProducerGroup()) {throw new MQClientException("producerGroup is null", null);}// 同时生产者组也不能是 DEFAULT_PRODUCERif (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);}
}/*** 校验组信息*/
public static void checkGroup(String group) throws MQClientException {// 如果组信息为空,那么抛出异常if (UtilAll.isBlank(group)) {throw new MQClientException("the specified group is blank", null);}// 长度不超过 255if (group.length() > CHARACTER_MAX_LENGTH) {throw new MQClientException("the specified group is longer than group max length 255.", null);}// topic 里面是否包含不合法的字符,合法的字符只有下面中括号里面这些 [%|a-zA-Z0-9_-]if (isTopicOrGroupIllegal(group)) {throw new MQClientException(String.format("the specified group[%s] contains illegal characters, allowing only %s", group,"^[%|a-zA-Z0-9_-]+$"), null);}
}

这个方法就是检查生产者的配置,主要就是检测以下几方面:

  • 如果组信息为空,那么抛出异常
  • 生产者组长度不难超过 255,也不能是 DEFAULT_PRODUCER
  • topic 里面是否包含不合法的字符,合法的字符只有下面中括号里面这些 [%|a-zA-Z0-9_-]

3.5.3 changeInstanceNameToPID 修改实例名称

public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();}
}

这里就是修改实例名称为当前进程 + 当前时间,意味者每个生产者和消费者的实例名称都不相同。


3.5.4 getOrCreateMQClientInstance 创建 MQClientInstance

这个方法比较重要,就是创建 MQClientInstance,生产者和消费者内部都会有一个 MQClientInstance,因为有很多底层逻辑是类似的,所以同一用 MQClientInstance 封装,当然这个创建方法涉及到 clientID 的设置,所以下一篇文章会重点说下这个方法,这里先不多说。


3.5.5 registerProducer 注册生产者到本地缓存

这个方法是将当前生产者注册到 MQClientInstance 的 producerTable 中,key 是生产者组,这里也透露出一件事,一个进程里面的生产者必须都是不同生产者组的,如果说一个进程里面有多个相同生产者组的生产者,那么就会报错。

public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {if (null == group || null == producer) {return false;}MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true;
}

3.5.6 存储默认 topic 的配置到本地 topic 缓存 topicPublishInfoTable 中

这里就是存储 TBW102 -> TopicPublishInfo 存储到 topicPublishInfoTable 中,TopicPublishInfo 存储了 topic 默认的路由消息,这里面存储了 topic 的相关信息,比如 topic 下面的队列集合,topic 存储的 broker 信息,broker 的集群信息等等。

而这个 TBW102 就是用来当需要创建 topic 的时候可以以这个默认的 topic 为模板来创建 topic,比如设置 topic 的读写队列数。

// 存储 TBW102 -> TopicPublishInfo 存储到 topicPublishInfoTable 中,TopicPublishInfo 存储了 topic 默认的
// 路由消息,这里面存储了 topic 的相关信息,比如 topic 下面的队列集合,topic 存储的 broker 信息,broker 的集群信息 ...
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

4. 小结

好了,这篇文章就先到这里,下一篇文章再探讨启动流程中的其他的重要方法。





如有错误,欢迎指出!!!

相关文章:

  • 选新手机的参考:CPU型号、内存、外存、屏幕、摄像头以及电池等。
  • 如何测试JWT的安全性:全面防御JSON Web Token的安全漏洞
  • AI Agent开发第73课-预训练qwen3-如何加入自己的语料
  • ElasticSearch操作
  • 边缘计算是什么?逻辑视域下的边缘计算:分布式计算范式的理论基础与逻辑结构分析
  • 二分算法的补充说明
  • TMP1827认证流程
  • 从法律视角看湖北理元理律师事务所的债务优化实践
  • 数据结构与算法-线性表-双向链表(Double Linked List)
  • C++ 中的 **常变量** 与 **宏变量** 比较
  • 25.5.22学习总结
  • window 显示驱动开发-指定 GDI 硬件加速渲染操作
  • Python-标准库
  • 浅谈测试驱动开发TDD
  • 微服务架构的演变过程
  • 关于大语言模型的问答?
  • spring boot启动报错:2002 - Can‘t connect to server on ‘192.168.10.212‘ (10061)
  • 咬合配准算法文献推荐
  • 电子电路:为什么会产生电流超前或者滞后于电压的情况?
  • CUDA 加速的稀疏矩阵计算库cuSPARSE
  • 天津百度网站排名优化/广州seo招聘
  • 网站建设备案优化之看/网页设计案例
  • 写作网站vir/北京网站优化托管
  • 还有哪些网站做产品众筹/吉林百度查关键词排名
  • 外国网站加速器/软件外包公司有哪些
  • 有什么网站可以做设计赚钱吗/百度学术论文官网入口