当前位置: 首页 > news >正文

【RelayMQ】基于 Java 实现轻量级消息队列(六)

目录

一. 数据存储方式

二. 交换机管理

三. 队列管理

四. 绑定管理

五. 消息管理 ! ! !

六.数据加载


文件负责数据的存储, 内存负责数据的管理, 为了保证MQ的传输效率, 所以注定无法使用硬盘管理数据

一. 数据存储方式

这里主要使用哈希表, 链表, 嵌套结构存储和管理数据

交换机管理

第一个key为交换名字(exchangeName), value为交换机对象,

    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();

队列管理

第一个key为队列名(queueName), value为队列对象

private ConcurrentHashMap<String, MSGQueue> msgQueueMap = new ConcurrentHashMap<>();

绑定关系管理

第一个key为交换机名(exchangeName), value为HashMap,HashMap中存储的key为队列名(queueName),value为Binding对象

private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();

消息管理

第一个Key为消息Id(messageId),  value为消息对象 

  private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

可以使用消息Id快速的查找对象

队列中的所有消息

第一个Key为队列名(queueName),  value为链表(链表中的每一个对象都是message对象)

private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

便于对队列中的对象进行管理,适用于队列这种先来先服务模式

队列中未应答消息

第一个key为队列名(queueName),value为hashMap, 其中key为MessageId, value为消息对象

 private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();

便于维护队列中已发送给消费者但还未被确认(ACK)的消息


总体上采用HashMap的方式, 可以快速的进行索引查询

二. 交换机管理

这里的添加操作和之前的不同, 这里是添加进入内存中的容器HashMap中, 并非数据库中的容器

//增加交换机public void insertExchange(String exchangeName,Exchange exchange){exchangeMap.put(exchangeName, exchange);System.out.println("[MemoryDataCenter] 交换机添加成功");}
//删除交换机public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功");}//查找交换机public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);}

三. 队列管理

这里也是在内存中进行增加, 删除, 查找操作

  public void insertQueue(String queueName,MSGQueue queue){msgQueueMap.put(queueName, queue);System.out.println("[MemoryDataCenter] 队列添加成功");}public void deleteQueue(String queueName){msgQueueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功");}public MSGQueue getQueue(String queueName){return msgQueueMap.get(queueName);}

四. 绑定管理

这里采用由内到外的方式

添加绑定

  1. 先检查内部的HashMap中是否存在数据
  2. 不存在则创建, 存在则检查最外层的是否存在
  3. 不存在则创建, 存在则抛出异常,无法添加
  public void insertBinding(Binding binding) throws MqException {
//        1.检查这个交换机的value是否存在,如果不存在则需要创建(根据key检查value是否存在)
/*        ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(binding.getExchangeName());if(bindMap == null){bindMap = new ConcurrentHashMap<>();bindingsMap.put(binding.getExchangeName(),bindMap);}*///另一种写法ConcurrentHashMap<String, Binding> bindMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindMap) {
//        2.如果根据交换机名和队列名,查询出来的关系存在,则抛出异常,因为已经两者之间已经存在关系,所以不能插入if (bindMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定关系已经存在,不能继续绑定");}bindMap.put(binding.getQueueName(), binding);System.out.println("[MemoryDataCenter] 绑定添加成功");}}

删除绑定

  1. 先检查内部绑定是否存在
  2. 不存在则无法删除,存在则删除里面的HashMap
  3. 然后删除外面的hashMap
  public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null){throw new MqException("[MemoryDataCenter]  绑定关系不存在,无法删除");}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功");}

获取绑定(唯一)

这里采用从外到内的方式查询

  1. 先检查交换机是否存在
  2. 不存在则返回,存在则继续查看交换机
  3. 存在返回数据,不存在返回 null
  public Binding getBinding(String exchangeName ,String queueName){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);//判断最外层value是否为空if(bindingMap==null){return null;}return bindingMap.get(queueName);}

获取所有的绑定

  1. 直接根据交换机进行查找并返回
  public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){return bindingsMap.get(exchangeName);}

五. 消息管理 ! ! !

添加消息

    public void insertMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 添加消息成功");}

查询消息(根据MessageID查询消息)

    //根据messageID查询消息public Message getMessage(String messageId){return messageMap.get(messageId);}

删除消息(根据MessageId删除消息)

  1. 先检查消息是否存在
  2. 如果存在才能进行删除操作
    //根据messageId删除消息public void deleteMessage(String messageId) throws MqException {Message message = messageMap.get(messageId);if(message==null){throw new MqException("[MemoryDataCenter] 消息不存在,无法删除");}messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 删除消息成功");}

队列中的消息管理

发送消息到队列

  1. 检查队列中是否存在数据列表
  2. 存在则向其中添加数据, 不存在则创建一个链表
  3. 将消息直接添加到队列中
  4. 在总消息统计中也需要添加
    public void sendMessage(MSGQueue queue,Message message){LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());synchronized (messages){messages.add(message);}//在队列中添加,在总的消息管理中也需要添加//因为messageID相同,所有也不用担心内容不一样insertMessage(message);System.out.println("[MemoryDataCenter] 消息成功放入队列中");}

从队列中取出消息

  1. 检查是否存在列表
  2. 如果不存在, 则直接返回 null, 如果存在判断里面是否存在数据
  3. 如果存在数据,则返回第一个,如果不存在则返回 null
    public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages==null){return null;}synchronized (messages){if(messages.size()==0){return null;}//采用从头取出(头插法)Message curmessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息成功从队列中取出");return curmessage;}}

获取队列中消息的个数

  • 判断链表是否存在
  • 如果不存在,直接返回0,
  • 如果存在则返回消息个数
    public int getQueueMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);//判断队列中有没有消息if(messages == null){return 0;}synchronized (messages){return messages.size();}}

队列中未确认的消息管理

添加未确认消息

  1. 检查HashMap是否存在,
  2. 如果不存在,则创建一个,如果存在,则向里面添加未处理的消息
    public void insertMessageWaitAck(MSGQueue queue,Message message) {ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.computeIfAbsent(queue.getName(), k -> new ConcurrentHashMap<>());messageWaitAckMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 未处理消息添加成功");}

删除未确认的消息

  1. 检查HashMap是否存在
  2. 如果不存在, 则返回null, 如果存在,则进行删除操作
    public void deleteMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return;}messageWaitAckMap.remove(messageId);System.out.println("[MemoryDataCenter] 未处理消息删除成功");}

获取指定的未确认消息

  1. 检查HashMap是否存在
  2. 如果不存在, 则返回null, 如果存在, 则返回其中具体消息(根据MessageId返回)
    public Message getMessageWaitAck(String queueName,String messageId){ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);if(messageWaitAckMap==null){return null;}return messageWaitAckMap.get(messageId);}

六.数据加载

将文件数据加载到内存中

  1. 保证内存存储数据为空
  2. 加载交换机数据
  3. 加载队列数据
  4. 加载绑定关系
  5. 加载消息数据 (1. 所有的消息  2. 队列下的消息)
    public void recovery(DiskDataCenter dataCenter) throws IOException, MqException, ClassNotFoundException {
//        1.清空之前的数据exchangeMap.clear();msgQueueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageWaitMap.clear();
//        2.加载交换机数据List<Exchange> exchanges = dataCenter.selectAllExchange();for (Exchange exchange :exchanges){exchangeMap.put(exchange.getName(),exchange);}
//        3.加载队列数据List<MSGQueue> msgQueues = dataCenter.selectAllMSGQueue();for (MSGQueue queue:msgQueues){msgQueueMap.put(queue.getName(),queue);}
//        4.加载绑定关系(将一个个绑定添加进里面)//先检查交互机是否存在,不存在则创建对应的hashMap,然后添加其中的内容List<Binding> bindings = dataCenter.selectAllBinding();for (Binding binding:bindings){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}
//        5.恢复所有的消息数据(添加每一个队列中的消息)(这里有两个,1.管理所有的消息,2.管理队列下的消息)for(MSGQueue queue: msgQueues){LinkedList<Message> messages = dataCenter.selectAllMessage(queue);//队列下的消息queueMessageMap.put(queue.getName(), messages);for (Message message:messages){messageMap.put(message.getMessageId(),message);}}
//        6.未被确认的消息//这里未被确认的消息,并不会被加载进入内存,考虑情况:服务器如果重启,那么未被确认的消息,再一次被加载,实际上被当做没有取出的消息//这时候只需要重新取出未被取出的消息,那么和方法5(加载数据)是一样的}

未被确认的消息怎么处理?

在本项目中, 数据加载的过程, 多数出现在重启服务器的场景中, 未被确认的消息并不会直接存储进入内存, 而是被当做普通消息加载进入内存,  已经被确认的数据会被直接删除掉, 未被确认的数据会被当做新数据加载进入内存

缺点: 造成消息的冗余发送  优点: 实现方便

优化建议: 

1. 数据的存储持久化

引入一个标志位, 表示消息是否处于发送且未确认状态,  在数据加载的时候, 根据标志位将数据加载放入内存存储

2. 超时重投机制

如果消息发送给消费者, 消费者消费数据时, 未发送确认就崩溃, 那么消息会一直处于发送未确认状态, 会造成资源浪费

检测超时未确认的消息, 超过一定时间间隔会触发重新发送消息

3. 重投次数限制和死信队列

避免由于消费者的持续异常状态(依赖不可用等 )造成的无限重投, 可以规定最多重投次数, 超过该次数, 则丢到死信队列中, 方便人为去分析处理这些未被确认的消息


文章转载自:

http://OS1ja1sn.hsjfs.cn
http://SPF30A4C.hsjfs.cn
http://VBlk14sQ.hsjfs.cn
http://UuGSG1jO.hsjfs.cn
http://dxz8KsvX.hsjfs.cn
http://75js9SQd.hsjfs.cn
http://v7eWYEYm.hsjfs.cn
http://8jcaOc1K.hsjfs.cn
http://4AIOcef3.hsjfs.cn
http://VvD10dST.hsjfs.cn
http://wgXTgx8d.hsjfs.cn
http://LuburEYa.hsjfs.cn
http://1fdAlzPh.hsjfs.cn
http://rQ1xKUgs.hsjfs.cn
http://M8aNHl7V.hsjfs.cn
http://PmaYkiug.hsjfs.cn
http://vKEklWYF.hsjfs.cn
http://F6cDZZBo.hsjfs.cn
http://CBWGBQE3.hsjfs.cn
http://U1kQFyhV.hsjfs.cn
http://YVNUDU9y.hsjfs.cn
http://sqB1r10N.hsjfs.cn
http://8Dq4RzfA.hsjfs.cn
http://Cn9ZAfUg.hsjfs.cn
http://wnFRRdc3.hsjfs.cn
http://AviQWWLc.hsjfs.cn
http://01tWXt5i.hsjfs.cn
http://wzClzoJa.hsjfs.cn
http://YfFlk0nQ.hsjfs.cn
http://GOfKbBoL.hsjfs.cn
http://www.dtcms.com/a/368613.html

相关文章:

  • 解锁 Claude Code 终极工作流:从基础到进阶的全流程指南
  • 深入浅出 全面剖析消息队列(Kafka,RabbitMQ,RocketMQ 等)
  • 工业HMI:人机交互的核心与智能制造的桥梁
  • 解决rt_pin_get返回错误码的问题
  • 基于单片机汽车防撞系统设计
  • Java 提取 PDF 文件内容:告别手动复制粘贴,拥抱自动化解析!
  • 【AI总结】Python BERT 向量化入门指南
  • 《sklearn机器学习——回归指标2》
  • 投资储能项目能赚多少钱?小程序帮你测算
  • 基于开源AI智能名片链动2+1模式S2B2C商城小程序的公益课引流策略研究
  • 医疗问诊陪诊小程序:以人性化设计构建健康服务新生态
  • modbus_tcp和modbus_rtu对比移植AT-socket,modbus_tcp杂记
  • 云手机的空间会占用本地内存吗
  • HTML 各种事件的使用说明书
  • docker 部署RustDesk服务
  • 【Python基础】 20 Rust 与 Python 循环语句完整对比笔记
  • 为什么后端接口不能直接返回数据库实体?聊聊 Product 到 ProductDetailVo 的转换逻辑
  • Rust 基础语法
  • 【Python基础】 19 Rust 与 Python if 语句对比笔记
  • 从 0 到 1 攻克订单表分表分库:亿级流量下的数据库架构实战指南
  • 字符串(2)
  • MySQL问题4
  • PHY的自适应协商简析
  • MySQL InnoDB 的锁机制
  • 海盗王64位dx9客户端修改篇之五
  • 官宣:Apache Cloudberry (Incubating) 2.0.0 发布!
  • SpringBoot 中 ThreadLocal 的妙用:原理、实战与避坑指南
  • Unity Hub 创建支持 Android iOS 的项目教程
  • LangGraph节点完整组成与要求详解
  • 【Qt开发】按钮类控件(三)-> QCheckBox