当前位置: 首页 > news >正文

RocketMQ 中的 ProducerManager 组件剖析

一、引言

在分布式系统的消息传递领域,RocketMQ 以其高性能、高可用性和强大的扩展性脱颖而出。ProducerManager 作为 RocketMQ 中的一个关键组件,在消息生产环节发挥着至关重要的作用。它负责管理消息生产者(Producer)的生命周期、配置和操作,为系统的稳定运行和高效消息传递提供了坚实的基础。

二、ProducerManager 的核心功能

2.1 核心属性


    //网络连接过期超时时间
    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
    //获取可用网络连接重试次数 默认3次
    private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
    //生产组-》网络连接-》客户端信息
    private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
        new ConcurrentHashMap<>();
    //每个生产者网络客户端id到网络连接的映射关系
    private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
    //正数计数器
    private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();

主要数据结构为groupChannelTable,里面进行存放的是key为生产者组的名字,value为ConcurrentHashMap<Channel, ClientChannelInfo>,这个Map中Channel为与客户端通信的channel,value为ClientChannelInfo 是客户端的信息,主要属性为:

// 消费者客户端网络连接信息
public class ClientChannelInfo {
    private final Channel channel;
    //消费者客户端网络连接id
    private final String clientId;
    //编程语言
    private final LanguageCode language;
    //版本语言
    private final int version;
    //最后更新时间戳
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
    
   //.....省略代码
}

2.2 核心方法

  1. 自动扫描方法,会每隔一段时间进行针对groupChannelTable中的数据进行扫描,将Map中Channel最后更新时间,超过2分钟没有进行更新的连接从groupChannelTable中进行移除。

代码如下:

    /**
     * 扫描生产者过期的网络连接
     */
    public void scanNotActiveChannel() {
        for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
                .entrySet()) {
            final String group = entry.getKey();
            final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();

            Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Channel, ClientChannelInfo> item = it.next();
                // final Integer id = item.getKey();
                final ClientChannelInfo info = item.getValue();

                long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                    it.remove();
                    clientChannelTable.remove(info.getClientId());
                    log.warn(
                            "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
                            RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
                    RemotingUtil.closeChannel(info.getChannel());
                }
            }
        }
    }

 2.处理生产者连接关闭的事件

  //处理生产者连接关闭的事件
    public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
        if (channel != null) {
            for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
                    .entrySet()) {
                final String group = entry.getKey();
                final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
                        entry.getValue();
                final ClientChannelInfo clientChannelInfo =
                        clientChannelInfoTable.remove(channel);
                if (clientChannelInfo != null) {
                    clientChannelTable.remove(clientChannelInfo.getClientId());
                    log.info(
                            "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
                            clientChannelInfo.toString(), remoteAddr, group);
                }

            }
        }
    }

3.生产者的注册与下线

注册方法:

    //注册生产者
    public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
        ClientChannelInfo clientChannelInfoFound = null;

        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
        if (null == channelTable) {
            channelTable = new ConcurrentHashMap<>();
            this.groupChannelTable.put(group, channelTable);
        }

        clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
        if (null == clientChannelInfoFound) {
            channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
            clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
            log.info("new producer connected, group: {} channel: {}", group,
                    clientChannelInfo.toString());
        }


        if (clientChannelInfoFound != null) {
            clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
        }
    }

下线方法

   //生产者的下线
    public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
        ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
        if (null != channelTable && !channelTable.isEmpty()) {
            ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
            clientChannelTable.remove(clientChannelInfo.getClientId());
            if (old != null) {
                log.info("unregister a producer[{}] from groupChannelTable {}", group,
                        clientChannelInfo.toString());
            }

            if (channelTable.isEmpty()) {
                this.groupChannelTable.remove(group);
                log.info("unregister a producer group[{}] from groupChannelTable", group);
            }
        }
    }

4. 根据生产者的groupId获取可用的连接


    //获取当前生产者组可用的连接
    public Channel getAvailableChannel(String groupId) {
        if (groupId == null) {
            return null;
        }
        List<Channel> channelList;
        ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
        if (channelClientChannelInfoHashMap != null) {
            channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
        } else {
            log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
            return null;
        }

        int size = channelList.size();
        if (0 == size) {
            log.warn("Channel list is empty. groupId={}", groupId);
            return null;
        }

        Channel lastActiveChannel = null;
        //轮询算法,依次获取生产者组的每一个生产者的连接地址
        int index = positiveAtomicCounter.incrementAndGet() % size;
        Channel channel = channelList.get(index);
        int count = 0;
        boolean isOk = channel.isActive() && channel.isWritable();
        while (count++ < GET_AVAILABLE_CHANNEL_RETRY_COUNT) {
            if (isOk) {
                return channel;
            }
            if (channel.isActive()) {
                lastActiveChannel = channel;
            }
            index = (++index) % size;
            channel = channelList.get(index);
            isOk = channel.isActive() && channel.isWritable();
        }

        return lastActiveChannel;
    }

三、总结

总的来看,ProducerManager主要是通过类中的方法对groupChannelTable集合中的属性进行操作

http://www.dtcms.com/a/111192.html

相关文章:

  • 【Java Stream详解】
  • 提高:图论:强连通分量 图的遍历
  • Nginx功能及应用全解:从负载均衡到反向代理的全面剖析
  • OpenAI:人工智能领域的探索者与变革者
  • 黑马点评redis改 part 1
  • T-SQL语言的链表查找
  • eventEmitter实现
  • 网络建设与运维神州数码DCN MAC地址表操作
  • TypedDict和dataclass的优缺点对比
  • 前馈控制与反馈控制融合算法详解及python案例分析
  • JavaWeb学习--MyBatis-Plus整合SpringBoot的ServiceImpl方法(增加,修改与删除部分)
  • 深入解析:使用Python爬取Bilibili视频
  • 如何用DeepSeek进行SWOT分析?以CSDN的“C知道”为例
  • k8s的StorageClass存储类和pv、pvc、provisioner、物理存储的链路
  • 做一个Andriod系统应用的方法
  • 软件设计师之设计模式
  • 第七章 Python基础进阶-异常、模块与包(其五)
  • 手撕AVL树
  • 模运算核心性质与算法应用:从数学原理到编程实践
  • Julia语言的测试覆盖率
  • 卷积神经网络CNN 经典模型 — GoogleLeNet、ResNet、DenseNet算法原理与模型构造
  • Visual Basic语言的网络协议栈
  • AIGC时代Kubernetes企业级云原生运维实战:智能重构与深度实践指南
  • SpringAI整合Ollama集成DeepSeek
  • 搜索树——AVL、红黑树、B树、B+树
  • WinForm真入门(5)——控件的基类Control
  • 使用 Swift 实现 LRU 缓存淘汰策略
  • React编程模型:Project Reactor深度解析
  • Java的基本语法
  • 006贪心——算法备赛