当前位置: 首页 > news >正文

【项目篇之统一内存操作】仿照RabbitMQ模拟实现消息队列

在这里插入图片描述

我们的操作分为两种,一种是在内存上进行统一的操作,一种是在硬盘上面操作,今天我写的文章是编写了一个MemoryDataCenter类来实现了 在内存上面的统一操作:

实现统一内存操作

    • 如何使用内存来组织数据
  • 创建一个类来统一管理内存上的所有数据
    • 针对交换机的操作
    • 针对队列进行操作
    • 针对绑定进行操作
      • 新增绑定
      • 获取绑定
      • 删除绑定
    • 针对消息进行操作
      • 添加消息
      • 查询信息
      • 删除消息
      • 发送消息到指定队列
      • 从队列中取出到消息
      • 获取指定队列中消息的个数
    • 针对未确认消息进行操作
      • 添加未确认消息
      • 删除未确认的消息
      • 获取指定的未确认消息
    • 恢复数据到内存中去
    • 总代码如下所示:
    • 内存管理数据的总结

对于MQ来说,是以在内存上存储数据为主的,
在硬盘上存储数据为辅的
在硬盘上存储数据主要是为了进行持久化保存,重启之后,数据不会丢失

但是我们在真正去进行消息转发的的过程中,各种核心的逻辑都还是以内存上存储的数据为主的

因为访问内存比访问硬盘要快得多,所以我们还是要使用内存来组织数据

如何使用内存来组织数据

在交换机上使用一个数据结构:HashMap来组织数据:key是交换机的name,value是交换机对象

队列: 直接使用HashMap来组织数据: key是队列的name,value是队列对象


绑定: 使用一个嵌套的HashMap来组织数据:key是exchangeName,value是一个HashMap2,这个HashMap2中也有一个key1:key1是队列的name,HashMap2中的value2是绑定对象。

去查询这个绑定的时候,先根据交换机名字去查询,查询到的结果也还是一个HashMap2,表示的是该交换机都绑定了哪些队列,然后再进一步根据队列名字去HashMap2中查询绑定对象


消息: 也使用一个HashMap去进行管理数据:key是messageId,value是Message对象

消息是存放在队列中的,消息和队列之间还有一个归属关系,
通过MessageId可以查询到是哪一个Message对象,同时我们还得知道当前这个消息对象是放到哪一个队列中的


映射:

所以再做出一个映射:表示队列和消息之间的关联:
表示出每个队列中都有哪些消息:

如何表示:
使用嵌套的HashMap来组织,key是queueName,value是一个LinkedList,这个LinkedList里面的每一个元素都是一个Message对象


表示“未被确认”的消息:

首先,为什么会有这个未被确认的消息,是和我们的MQ的特性有关:
在这里插入图片描述

表示“未被确认”的消息:这里面就存储了当前队列中哪些消息被消费者取走了,但是还没有应答

我们使用嵌套的HashMap来组织这个"未被确认的消息"数据:key是queueName,value是一个HashMap
这个value的HashMap中的key是messageId,value是Message对象
后续实现消息确认的逻辑是
需要根据ACK相应的内容,这里会提供一个确认的messageId,根据这个messageId来把上述结构中的Message对象找到并进行移除

所以这里使用HashMap去查找更好一些

创建一个类来统一管理内存上的所有数据

创建一个类来统一管理内存上的所有数据

/*  
使用这个类来统一管理内存上的所有数据  */
public class MemoryDataCenter {  //key是exchangeName,value是exchange对象  private HashMap<String, Exchange> exchangeMap = new HashMap<>();     }

注意:这个类之后会提供一些增删改查的操作,让上一层代码去进行调用,也就是我们的brokerServer要处理很多的请求,此时有些请求涉及到去创建交换机,创建绑定,创建队列,此时服务器处理多个请求,

这个类提供的一些方法,会在多线程环境下使用,所以就需要去处理线程不安全的问题,
此时就会涉及到多线程的问题,也就会涉及到线程安全的问题,但是HashMap是一个线程不安全的数据结构,所以就需要换一个数据结构来操作,使用另一个线程安全的数据结构:ConcurrentHashMap
所以代码修改如下:

//key是exchangeName,value是exchange对象  
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
//存储交换机  
//key是exchangeName,value是exchange对象  
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();  //存储队列  
//key是queueName,value是MSGQueue对象  
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();  //存储绑定  
//第一个key是exchangeName,第二个key是queueName  
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();  //存储消息  
//key是messageId,value是Message对象  
private ConcurrentHashMap<String, Message>  messageMap = new ConcurrentHashMap<>();  //存储: 消息和队列之间的从属关系  
//key是队列的名字queueName,value是一个Message的链表  
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();  //待确认的消息  
//第一个key是queueName,第二个key是messageId  
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();

针对交换机的操作

//插入交换机  
public void insertExchange(Exchange exchange){  //把交换机插入到HashMap表中即可  exchangeMap.put(exchange.getName(),exchange);  System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName  = "+exchange.getName());  
}  //查找交换机: 根据交换机名字查找交换机  
public Exchange getExchange(String exchangeName){  //在HashMap表中查找交换机  return exchangeMap.get(exchangeName);  
}  //删除交换机:  
public void deleteExchange(String exchangeName){  exchangeMap.remove(exchangeName);  System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName =  "+exchangeName);  
}

针对队列进行操作

//插入队列  
public void insertQueue(MSGQueue queue){  queueMap.put(queue.getName(), queue);  System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName());  
}  //查找队列  
public  MSGQueue getQueue(String queueName){  return queueMap.get(queueName);  
}  //删除队列  
public void deleteQueue(String queueName){  queueMap.remove(queueName);  System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName);  
}

针对绑定进行操作

新增绑定

  1. 查询:根据exchangeName查询哈希表是否存在
  2. 查询:根据queueName查询绑定是否存在
  3. 插入:在内置的哈希表中插入queueName和绑定

多个线程去进行插入绑定的方法时候,要保证是线程安全的

第一步的查询操作中是从哈希表中查询出数据,本身是线程安全的

但是第二步和第三步的操作有线程安全的风险:
会出现下面的情况:

虽然哈希表本身是线程安全的,同时哈希表的查询和插入单独拎出来一个也是线程安全的,但是这两步查询操作和插入操作放在一起进行联动,那就需要将这两步合在一起,成为一个原子性的操作,进行加锁

//新增绑定  public void insertBinding(Binding binding) throws MqException {  //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个  ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());  if(bindingMap == null){  //不存在就创建出来  bindingMap =new ConcurrentHashMap<>();  //创建完毕之后就放入bindingsMap中去  bindingsMap.put(binding.getExchangeName(), bindingMap);  }  //上面这段代码有点多,我们也可以使用下面这段代码来代替  //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),  // k -> new ConcurrentHashMap<>());  synchronized (bindingMap){  //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定  if(bindingMap.get(binding.getQueueName()) != null){  throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() +  ", queueName=" + binding.getQueueName());  }  //3.查询到绑定不存在,就可以插入绑定了  bindingMap.put(binding.getQueueName(), binding);  }  System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName()  +", queueName = " +binding.getQueueName());  }  

获取绑定

//获取绑定:写两个版本  
// 1. 根据exchangeName和queueName确定唯一一个Binding  
// 2. 根据exchangeName获取到所有的Binding  // 1. 根据exchangeName和queueName确定唯一一个Binding  
public Binding getBinding(String exchangeName, String queueName){  ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);  if(bindingMap == null){  return null;  }  return bindingMap.get(queueName);  
}  // 2. 根据exchangeName获取到所有的Binding  
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){  return bindingsMap.get(exchangeName);  
}

删除绑定

//删除绑定:
public void deleteBinding(Binding binding) throws MqException {  ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());  if(bindingMap == null){  //该交换机没有绑定任何队列,抛出异常  throw  new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName()  + ", queueName=" + binding.getQueueName());  }  bindingMap.remove(binding.getQueueName());  System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName()  + ", queueName = " + binding.getQueueName());  
}

针对消息进行操作

添加消息

//添加消息  
public void addMessage(Message message){  messageMap.put(message.getMessageId(), message);  System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId());  
}

查询信息

我们是根据MessagId来查询消息的:

public Message getMessage(String messageId){  return  messageMap.get(messageId);  
}

删除消息

我们是根据MessageId来删除消息的


public void removeMessage(String messageId){  messageMap.remove(messageId);  System.out.println("[MemoryDataCenter] 消息被删除成功了 messageId= "+messageId);  
}

发送消息到指定队列

//发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message){  //把消息放到对应的数据结构中  //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中  LinkedList<Message> messages =queueMessageMap.get(queue.getName());  if(messages == null){  //如果没有这个消息链表,就创建出这个链表  messages = new LinkedList<>();  //创建出链表之后,就把这个链表放到队列中去  queueMessageMap.put(queue.getName(), messages);  }  //再把消息数据加到messages这个链表里面  //由于链表本身是线程不安全的,所以要加锁  synchronized (messages){  //把消息数据加到messages这个链表里面  messages.add(message);  }  //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入  //如果这个消息已经在消息中心中存在了,这里再次插入消息也没事,  // 主要是相同messageId对应的message内容是一样的  addMessage(message);  System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId());  }

从队列中取出到消息

//从队列中获取到消息  
public Message pollMessage(String queueName){  //先根据队列名去查找一下。对应的队列的消息链表  //如果没找到,说明队列中没有消息  LinkedList<Message> messages =  queueMessageMap.get(queueName);  if(messages == null){  return null;  }  synchronized (messages){  if(messages.size() == 0){  return null;  }  //如果链表中有元素,那就进行头删操作//消息取出来之后,就要在messages中删除,因为消息已经被取走了  Message currentMessage = messages.remove(0);  System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId());  return currentMessage;  }  
}

获取指定队列中消息的个数

//获取指定队列中消息的个数  
public int getMessageCount(String queueName){  LinkedList<Message> messages = queueMessageMap.get(queueName);  if(messages == null){  //队列中没有消息  return 0;  }  synchronized (messages){  return messages.size();  }  
}

针对未确认消息进行操作

添加未确认消息

public void addMessageWaitAck(String queueName, Message message){  ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName,  k -> new ConcurrentHashMap<>());  messageHashMap.put(message.getMessageId(),message);  System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId());  }

删除未确认的消息

//删除未确认的消息(消息已经确认了)  
public void removeMessageWaitAck(String queueName, String messageId){  ConcurrentHashMap<String, Message> messageHashMap  = queueMessageWaitMap.get(queueName);  if(messageHashMap == null){  return;  }  messageHashMap.remove(messageId);  System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId);  }

获取指定的未确认消息

//删除未确认的消息(消息已经确认了)  
public void removeMessageWaitAck(String queueName, String messageId){  ConcurrentHashMap<String, Message> messageHashMap  = queueMessageWaitMap.get(queueName);  if(messageHashMap == null){  return;  }  messageHashMap.remove(messageId);  System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId);  }

恢复数据到内存中去

//这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中  
//通过DiskDataCenter来获取到硬盘上面的数据  
public  void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {  //0. 在恢复之间,先把里面的所有数据全部清空  exchangeMap.clear();  queueMap.clear();  bindingsMap.clear();  messageMap.clear();  queueMessageMap.clear();  //1.恢复所有的交换机数据  List<Exchange> exchanges = diskDataCenter.selectAllExchanges();  for(Exchange  exchange  : exchanges){  exchangeMap.put(exchange.getName(), exchange);  }  //2.恢复所有的队列数据  // 2. 恢复所有的队列数据  List<MSGQueue> queues = diskDataCenter.selectAllQueues();  for (MSGQueue queue : queues) {  queueMap.put(queue.getName(), queue);  }  //3.恢复所有的绑定数据  List<Binding> bindings = diskDataCenter.selectAllBindings();  for(Binding binding : bindings){  ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),  k -> new ConcurrentHashMap<>());  }  //4.恢复所有的消息数据  //遍历所有的队列,然后根据每个队列的名字获取到所有的消息  for(MSGQueue queue : queues){  LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());  queueMessageMap.put(queue.getName(),messages);  for(Message message : messages){  messageMap.put(message.getMessageId(), message);  }  }  //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块  //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息”  //这个消息在硬盘上存储的时候,就是当做了“未被取走”  }

总代码如下所示:

package org.example.mqtexxt.mqserver.datacenter;  import org.example.mqtexxt.common.MqException;  
import org.example.mqtexxt.mqserver.core.Binding;  
import org.example.mqtexxt.mqserver.core.Exchange;  
import org.example.mqtexxt.mqserver.core.MSGQueue;  
import org.example.mqtexxt.mqserver.core.Message;  import java.io.IOException;  
import java.util.*;  
import java.util.concurrent.ConcurrentHashMap;  /*  
使用这个类来统一管理内存上的所有数据  */public class MemoryDataCenter {  //存储交换机  //key是exchangeName,value是exchange对象  private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();  //存储队列  //key是queueName,value是MSGQueue对象  private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();  //存储绑定  //第一个key是exchangeName,第二个key是queueName  private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();  //存储消息  //key是messageId,value是Message对象  private ConcurrentHashMap<String, Message>  messageMap = new ConcurrentHashMap<>();  //存储: 消息和队列之间的从属关系  //key是队列的名字queueName,value是一个Message的链表  private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();  //待确认的消息  //第一个key是queueName,第二个key是messageId  private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();  //针对交换机进行操作:  //插入交换机  public void insertExchange(Exchange exchange){  //把交换机插入到HashMap表中即可  exchangeMap.put(exchange.getName(),exchange);  System.out.println("[MemoryDatCenter] 交换机创建成功了 exchangeName  = "+exchange.getName());  }  //查找交换机: 根据交换机名字查找交换机  public Exchange getExchange(String exchangeName){  //在HashMap表中查找交换机  return exchangeMap.get(exchangeName);  }  //删除交换机:  public void deleteExchange(String exchangeName){  exchangeMap.remove(exchangeName);  System.out.println("[MemoryDatCenter] 交换机被删除成功了 exchangeName =  "+exchangeName);  }  //针对队列进行操作  //插入队列  public void insertQueue(MSGQueue queue){  queueMap.put(queue.getName(), queue);  System.out.println("[MemoryDatCenter] 队列被成功了 queueName= "+queue.getName());  }  //查找队列  public  MSGQueue getQueue(String queueName){  return queueMap.get(queueName);  }  //删除队列  public void deleteQueue(String queueName){  queueMap.remove(queueName);  System.out.println("[MemoryDatCenter] 队列被删除成功了 queueName= "+queueName);  }  //针对绑定的操作  //新增绑定  public void insertBinding(Binding binding) throws MqException {  //1.先使用exchangeName查询一下,看看对应的哈希表是否存在,不存在就创建一个  ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());  if(bindingMap == null){  //不存在就创建出来  bindingMap =new ConcurrentHashMap<>();  //创建完毕之后就放入bindingsMap中去  bindingsMap.put(binding.getExchangeName(), bindingMap);  }  //上面这段代码有点多,我们也可以使用下面这段代码来代替  //ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),  // k -> new ConcurrentHashMap<>());  synchronized (bindingMap){  //2.接着根据queueName查询一下绑定是否存在,若存在,抛异常,不存在才可以插入绑定  if(bindingMap.get(binding.getQueueName()) != null){  throw new MqException("[MemoryDataCenter] 绑定已经存在,exchangeName="+binding.getExchangeName() +  ", queueName=" + binding.getQueueName());  }  //3.查询到绑定不存在,就可以插入绑定了  bindingMap.put(binding.getQueueName(), binding);  }  System.out.println("[MemoryDatCenter] 绑定添加成功了 exchangeName = " + binding.getExchangeName()  +", queueName = " +binding.getQueueName());  }  //获取绑定:写两个版本  // 1. 根据exchangeName和queueName确定唯一一个Binding  // 2. 根据exchangeName获取到所有的Binding  // 1. 根据exchangeName和queueName确定唯一一个Binding  public Binding getBinding(String exchangeName, String queueName){  ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);  if(bindingMap == null){  return null;  }  return bindingMap.get(queueName);  }  // 2. 根据exchangeName获取到所有的Binding  public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){  return bindingsMap.get(exchangeName);  }  //删除绑定  public void deleteBinding(Binding binding) throws MqException {  ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());  if(bindingMap == null){  //该交换机没有绑定任何队列,抛出异常  throw  new MqException("[MemoryDataCenter] 绑定不存在,exchangeName=" + binding.getExchangeName()  + ", queueName=" + binding.getQueueName());  }  bindingMap.remove(binding.getQueueName());  System.out.println("[MemoryDataCenter] 绑定删除成功了 exchangeName = " + binding.getExchangeName()  + ", queueName = " + binding.getQueueName());  }  //添加消息  public void addMessage(Message message){  messageMap.put(message.getMessageId(), message);  System.out.println("[MemoryDataCenter] 新消息添加成功!messageId = " + message.getMessageId());  }  //根据MessageId查询信息  public Message getMessage(String messageId){  return  messageMap.get(messageId);  }  //根据MessageId删除消息  public void removeMessage(String messageId){  messageMap.remove(messageId);  System.out.println("[MemoryDatCenter] 消息被删除成功了 messageId= "+messageId);  }  //发送消息到指定队列  public void sendMessage(MSGQueue queue, Message message){  //把消息放到对应的数据结构中  //先根据队列的名字找到该队列对应的消息链表,然后把消息放到消息链表中  LinkedList<Message> messages =queueMessageMap.get(queue.getName());  if(messages == null){  //如果没有这个消息链表,就创建出这个链表  messages = new LinkedList<>();  //创建出链表之后,就把这个链表放到队列中去  queueMessageMap.put(queue.getName(), messages);  }  //再把消息数据加到messages这个链表里面  //由于链表本身是线程不安全的,所以要加锁  synchronized (messages){  //把消息数据加到messages这个链表里面  messages.add(message);  }  //接着把该消息也插入到总的哈希表中,也就是消息数据中心中插入  //如果这个消息已经在消息中心中存在了在,这里再次插入消息也没事,  // 主要是相同messageId对应的message内容是一样的  addMessage(message);  System.out.println("[MemeoryDataCenter] 消息被投递到队列中了! messageId = " + message.getMessageId());  }  //从队列中获取到消息  public Message pollMessage(String queueName){  //先根据队列名去查找一下。对应的队列的消息链表  //如果没找到,说明队列中没有消息  LinkedList<Message> messages =  queueMessageMap.get(queueName);  if(messages == null){  return null;  }  synchronized (messages){  if(messages.size() == 0){  return null;  }  //如果链表中有元素,那就进行头删操作  Message currentMessage = messages.remove(0);  System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId= " +currentMessage.getMessageId());  return currentMessage;  }  }  //获取指定队列中消息的个数  public int getMessageCount(String queueName){  LinkedList<Message> messages = queueMessageMap.get(queueName);  if(messages == null){  //队列中没有消息  return 0;  }  synchronized (messages){  return messages.size();  }  }  //添加未确认消息  public void addMessageWaitAck(String queueName, Message message){  ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitMap.computeIfAbsent(queueName,  k -> new ConcurrentHashMap<>());  messageHashMap.put(message.getMessageId(),message);  System.out.println("[MemoryDataCenter] 消息进入了待确认队列! messageId = " +message.getMessageId());  }  //删除未确认的消息(消息已经确认了)  public void removeMessageWaitAck(String queueName, String messageId){  ConcurrentHashMap<String, Message> messageHashMap  = queueMessageWaitMap.get(queueName);  if(messageHashMap == null){  return;  }  messageHashMap.remove(messageId);  System.out.println("[MemoryDataCenter] 消息从待确认队列中删除 messageId = " +messageId);  }  //获取指定的未确认消息  public Message getMessageWaitAck(String queueName, String messageId){  ConcurrentHashMap<String,Message> messageConcurrentHashMap = queueMessageWaitMap.get(queueName);  if(messageId ==  null){  return null;  }  return messageConcurrentHashMap.get(messageId);  }  //这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中  //通过DiskDataCenter来获取到硬盘上面的数据  public  void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {  //0. 在恢复之间,先把里面的所有数据全部清空  exchangeMap.clear();  queueMap.clear();  bindingsMap.clear();  messageMap.clear();  queueMessageMap.clear();  //1.恢复所有的交换机数据  List<Exchange> exchanges = diskDataCenter.selectAllExchanges();  for(Exchange  exchange  : exchanges){  exchangeMap.put(exchange.getName(), exchange);  }  //2.恢复所有的队列数据  // 2. 恢复所有的队列数据  List<MSGQueue> queues = diskDataCenter.selectAllQueues();  for (MSGQueue queue : queues) {  queueMap.put(queue.getName(), queue);  }  //3.恢复所有的绑定数据  List<Binding> bindings = diskDataCenter.selectAllBindings();  for(Binding binding : bindings){  ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),  k -> new ConcurrentHashMap<>());  }  //4.恢复所有的消息数据  //遍历所有的队列,然后根据每个队列的名字获取到所有的消息  for(MSGQueue queue : queues){  LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());  queueMessageMap.put(queue.getName(),messages);  for(Message message : messages){  messageMap.put(message.getMessageId(), message);  }  }  //注意,针对“未确认消息”这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的时候,也没有设定这一块  //一旦在等待ack的过程中,服务器重启了,此时这些“未被确认的消息”就恢复成“未被取走的消息”  //这个消息在硬盘上存储的时候,就是当做了“未被取走”  }  }

内存管理数据的逻辑已经编写得差不多了,下面进行简单的总结

内存管理数据的总结

内存管理这一块主要是使用到了一系列的数据结构,保存和管理,交换机,队列,绑定,消息

我们广泛地使用到了哈希表,链表,嵌套的结构

线程安全问题

涉及到加锁操作

要不要加锁?
锁加到哪里?
使用哪个对象作为锁对象?

是没有统一的加锁规则的,只能要具体问题具体分析

总的加锁原则是:
如果代码不加锁,会造成什么样的后果和问题
后果是否很严重?
你能不能接受后果?

相关文章:

  • R语言traj包进行潜轨迹分析
  • 电气设备器件选型参数---断路器
  • 学习黑客 TCP/IP
  • 民法学学习笔记(个人向) Part.3
  • [方法论]软件工程中的软件架构设计:从理论到实践的深度解析
  • 碰撞检测学习笔记
  • 平衡二叉搜索树模拟实现1-------AVL树(插入,删除,查找)
  • C++入门小馆:继承
  • Java 集合线程安全
  • 爬虫的应用
  • P5937 [CEOI 1999] Parity Game 题解
  • Linux54 源码包的安装、修改环境变量解决 axel命令找不到;getfacl;测试
  • 力扣-字符串-468 检查ip
  • XGBoost算法原理及Python实现
  • 使用 Azure DevSecOps 和 AIOps 构建可扩展且安全的多区域金融科技 SaaS 平台
  • 网狐系列三网通新钻石娱乐源码全评:结构拆解、三端实测与本地部署问题记录
  • 软考-软件设计师中级备考 11、计算机网络
  • 数据结构与算法:回溯
  • Redis 数据类型详解(一):String 类型全解析
  • GateWay使用
  • 陈燮阳从艺60周年:指挥棒不停,心跳就不会老去
  • 习近平给谢依特小学戍边支教西部计划志愿者服务队队员回信
  • 澳大利亚大选今日投票:聚焦生活成本与“特朗普问题”
  • 前行中的“模速空间”:要攻克核心技术,也要成为年轻人创业首选地
  • 上海国际咖啡文化节开幕,北外滩集结了超350个展位
  • 软硬件企业集中发布未成年人模式使用手册