RocketMQ 中的 ProducerManager 组件剖析
一、引言
在分布式系统的消息传递领域,RocketMQ 以其高性能、高可用性和强大的扩展性脱颖而出。ProducerManager 作为 RocketMQ 中的一个关键组件,在消息生产环节发挥着至关重要的作用。它负责管理消息生产者(Producer)的生命周期、配置和操作,为系统的稳定运行和高效消息传递提供了坚实的基础。
二、ProducerManager 的核心功能
2.1 核心属性
//网络连接过期超时时间
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
//获取可用网络连接重试次数 默认3次
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
//生产组-》网络连接-》客户端信息
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
new ConcurrentHashMap<>();
//每个生产者网络客户端id到网络连接的映射关系
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
//正数计数器
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
主要数据结构为groupChannelTable,里面进行存放的是key为生产者组的名字,value为ConcurrentHashMap<Channel, ClientChannelInfo>,这个Map中Channel为与客户端通信的channel,value为ClientChannelInfo 是客户端的信息,主要属性为:
// 消费者客户端网络连接信息
public class ClientChannelInfo {
private final Channel channel;
//消费者客户端网络连接id
private final String clientId;
//编程语言
private final LanguageCode language;
//版本语言
private final int version;
//最后更新时间戳
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
//.....省略代码
}
2.2 核心方法
- 自动扫描方法,会每隔一段时间进行针对groupChannelTable中的数据进行扫描,将Map中Channel最后更新时间,超过2分钟没有进行更新的连接从groupChannelTable中进行移除。
代码如下:
/**
* 扫描生产者过期的网络连接
*/
public void scanNotActiveChannel() {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
RemotingUtil.closeChannel(info.getChannel());
}
}
}
}
2.处理生产者连接关闭的事件
//处理生产者连接关闭的事件
public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
if (channel != null) {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
entry.getValue();
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
}
}
}
}
3.生产者的注册与下线
注册方法:
//注册生产者
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo clientChannelInfoFound = null;
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
}
下线方法
//生产者的下线
public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
}
if (channelTable.isEmpty()) {
this.groupChannelTable.remove(group);
log.info("unregister a producer group[{}] from groupChannelTable", group);
}
}
}
4. 根据生产者的groupId获取可用的连接
//获取当前生产者组可用的连接
public Channel getAvailableChannel(String groupId) {
if (groupId == null) {
return null;
}
List<Channel> channelList;
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
if (channelClientChannelInfoHashMap != null) {
channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
int size = channelList.size();
if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId);
return null;
}
Channel lastActiveChannel = null;
//轮询算法,依次获取生产者组的每一个生产者的连接地址
int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (count++ < GET_AVAILABLE_CHANNEL_RETRY_COUNT) {
if (isOk) {
return channel;
}
if (channel.isActive()) {
lastActiveChannel = channel;
}
index = (++index) % size;
channel = channelList.get(index);
isOk = channel.isActive() && channel.isWritable();
}
return lastActiveChannel;
}
三、总结
总的来看,ProducerManager主要是通过类中的方法对groupChannelTable集合中的属性进行操作