【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)
文章目录
- 1. 前言
- 2. MQClientInstance 介绍
- 3. getOrCreateMQClientInstance 创建 MQClientInstance
- 4. clientId
- 5. MQClientInstance 构造器
- 6. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
上一篇文章我们探讨了生产者启动流程的大纲,当然还遗留了一些方法没有深入,这篇文章就来讲一下 MQClientInstance。
2. MQClientInstance 介绍
在生产者启动的时候会去创建一个 MQClientInstance,而这个方法是生产者和消费者都会去调用的。
可以说生产者和消费者都存在一个 MQClientInstance 对象,这个对象用于执行一些公共的方法,比如上报心跳到 Broker,从 NameServer 中获取 topic 路由,可以理解是是把生产者和消费者的一些公共方法提取出来到这个对象中了,相当于一个工具类。
3. getOrCreateMQClientInstance 创建 MQClientInstance
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;
}
这个方法就是用于创建 MQClientInstance 的,可以看到这里面其实就是在 factoryTable
集合中获取这个实例是否已创建了,如果没有创建才去创建,如果已经创建了就返回已经创建的,但是在这个版本应该是不会存在并发问题的,因为可以确保 clientId 的唯一性,所以创建出 instance 之后应该可以直接 put 到 factoryTable 中。
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {// 构建出实例 IDString clientId = clientConfig.buildMQClientId();// 获取实例MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {// 如果不存在就创建实例instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);// 再次设置到 factoryTable 中, 如果已存在就返回之前的MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;
}
4. clientId
说到 clientId,这个属性就是用于表示一个 MQClientInstance,需要确保每一个实例的 ID 不同,相当于一个唯一标识,创建 clientId 的逻辑如下。
public String buildMQClientId() {// 获取客户端地址StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());// 拼接上实例名称sb.append("@");sb.append(this.getInstanceName());// 拼接上单元名称if (!UtilAll.isBlank(this.unitName)) {sb.append("@");sb.append(this.unitName);}return sb.toString();
}
unitName 这个属性我也没搞懂是干什么用的,不过看源码基本都没怎么设置,就直接当这个属性是空的就行了,不过如果需要设置也可以在创建出 DefaultMQProducer 之后通过 set 方法手动设置。
回到 buildMQClientId 方法,我们继续看创建 clientId 的时候 instanceName 是如何获取的,这个属性其实在上一篇文章的生产者启动方法中已经有过设置了。
可以看到这个属性可以从系统变量的 rocketmq.client.name 中获取到,如果获取不到就是 DEFAULT,也就是说如果用户不设置这个属性就是 DEFAULT,但是在启动生产者时会判断如果当前的生产者组不是一个默认系统内部使用的生产者组,就会使用 changeInstanceNameToPID 方法修改 instanceName 属性。
从下面的方法也可以看到,这里面就是获取了进程的 PID 拼接上当前系统时间,为的就是确保实例名称的唯一性。
public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();}
}
确保 instanceName 是唯一的,自然而然 clientId 也就是唯一的了,但是在旧版本是没有这个 System.nanoTime()
的,比如我们可以看下 4.7.1 版本的这个方法。
可以看到 4.7.1 版本的只是使用进程 ID 作为实例名称,这样就会有问题,比如 docker 启动的不同消费者在特定场景下就有可能生成的实例名称相同进而导致 clientId 相同,假设一个消费者组里面的两个消费者都相同就有问题了,在负载均衡的时候有可能导致分区分配不成功的情况。当然为什么会出现这种情况就留到后面探讨消费者负载均衡的时候再去看了。
5. MQClientInstance 构造器
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {// 客户端配置this.clientConfig = clientConfig;// 实例下标this.instanceIndex = instanceIndex;// 回调线程池的线程数, 默认是当前系统的 CPU 核数this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端远程通信处理器, 比如事务回查请求就会通过这个处理器来处理this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 客户端 API 处理类this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);// 如果配置里面设置了 NameServer 地址, 就更新本地的 namesrvAddrListif (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());}// 客户端 ID, 唯一标识 IDthis.clientId = clientId;// RocketMQ 控制台实现类, 用于控制台命令方法处理, 比如控制台新增 topicthis.mQAdminImpl = new MQAdminImpl(this);// push 模式下消费者拉取消息的服务this.pullMessageService = new PullMessageService(this);// 消费者负载均衡服务this.rebalanceService = new RebalanceService(this);// 实例化一个内部生产者, 当消息发送失败时会通过这个生产者将消费失败的消息发送到 broker 中的重传队列再次重试this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);// 消费者状态管理器edthis.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());}
构造器其实就是初始化了一堆配置,包括生产者和消费者的,这也能看到 MQClientInstance 就是生产者和消费者公用的。可以看到对于生产者,初始化了一个内部生产者,这个生产者的生产者组是 CLIENT_INNER_PRODUCER,可以用于发送消费失败的消息用于消息重试。
6. 小结
本文到这里就结束了,这篇文章中我们简单介绍了 MQClientInstance 的创建以及生产者和消费者的 clientId 的创建,可以看到的是对于生产者和消费者都会使用进程 PID 和当前时间来构建出 clientId,确保客户端唯一。
如有错误,欢迎指出!!!!