当前位置: 首页 > wzjs >正文

网站建设目录结构doc天猫seo搜索优化

网站建设目录结构doc,天猫seo搜索优化,jquery+html5 网站后台管理页面模板,江苏省建设集团是国企吗九. 内存数据结构设计 硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构. 对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发. 创建 MemoryDataCenter 用来管理内存里的数据 使⽤四个哈希表, 分别…

九. 内存数据结构设计

硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.

创建 MemoryDataCenter

用来管理内存里的数据
在这里插入图片描述

  • 使⽤四个哈希表, 分别管理 Exchange, Queue, Binding, Message.
  • 再使⽤⼀个哈希表 + 链表管理 队列 -> 消息 之间的关系.
  • 再使⽤⼀个哈希表 + 哈希表管理所有的未被确认的消息.

/*** 关于内存中的数据结构:刚接触这个数据结构的设计可能有点不太理解,忍一忍,多往下看看** • 使⽤四个哈希表, 管理 Exchange, Queue, Binding(嵌套的hash表), Message.* • 使⽤⼀个哈希表 + 链表管理 队列 -> 消息 之间的关系.** • 使⽤⼀个哈希表 + 哈希表管理所有的未被确认的消息.*  此处的最后一个数据结构创建的原因:*  咱们的MQ,支持两种应答模式,*  1,自动应答:消费者取了这个元素,这个消息就算是被应答了,此时这个消息就能被删掉了*  2,手动应答:消费者取了这个元素,这个消息还不算被应答,需要消费者主动再调用一个basicAck方法,此时才认为是真正应答了这个消息,才能删除这个消息*  而此处我们设计的这个数据结构就是为了第二种应答方式,保存未被应答的消息。*/
public class MemoryDataCenter {//由于会涉及到线程安全问题,所以我们用的是ConcurrentHashMap来new hash表//储存Exchange  key:exchangeName value:Exchange对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//储存Queue     key:queueName   value:queue对象private ConcurrentHashMap<String, MESGQueue> queueMap = new ConcurrentHashMap<>();//储存Message  key: messageId    value:Message对象:private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();//储存绑定 使用嵌套的hash表    key:exchangeName  value:hash表  key2:queueName  value2:Binding对象private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingMap = new ConcurrentHashMap<>();//储存队列->消息之间的关系,使用hash表嵌套链表   key:queueName value:LinkedList  链表:保存Message对象private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();//储存未被确认的消息,使用hash表嵌套hash表:key :queueName value:hash表 key2:messageId  value2:Message对象private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAck = new ConcurrentHashMap<>();}

封装 Exchange ⽅法

//封装Echange方法://删除交换机:public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);}//获取交换机:public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}//删除交换机:public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);}

封装 Queue ⽅法

/封装queue方法:public void insertQueue(MESGQueue queue) {queueMap.put(queue.getName(), queue);}public MESGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);}

封装binding方法

     //封装binding方法:public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String,Binding> hasBindingMap = bindingMap.get(binding.getExchangeName());//这里代码其实有一个错误,就是锁对象不能为空,判空要在加锁外面
//        synchronized (hasBindingMap){
//            //如果不存在才能插入:
//            if(hasBindingMap == null){
//                hasBindingMap = new ConcurrentHashMap<>();
//                hasBindingMap.put(binding.getQueueName(),binding);
//                bindingMap.put(binding.getExchangeName(),hasBindingMap);
//            }else{
//                //存在则抛出异常:
//                throw new MqException("[MemoryDataCenter] 绑定已经存在!!不可再插入绑定!!! bindingExchangeName:"+binding.getExchangeName()
//                        +"bindingQueueName:"+binding.getQueueName());
//            }
//        }//这一行是线程安全的,能代替上述判断空然后将hashBindingMap new出来的代码。//bindingMap.computeIfAbsent 作用:如果第一个参数为空,则将第二个参数插入,并返回第二个参数ConcurrentHashMap<String, Binding> bindingMapSon = bindingMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());synchronized (bindingMapSon) {if (bindingMapSon.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在!!不可再插入绑定!!! bindingExchangeName:" + binding.getExchangeName()+ "bindingQueueName:" + binding.getQueueName());}bindingMapSon.put(binding.getQueueName(), binding);}}//写两个版本 的绑定://1,根据exchangeName 和 queueName 确定唯一一个Binding//2,根据exchangeName 获取到所有的 Binding哈希表,后面会用到public Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMapIn = bindingMap.get(exchangeName);if (bindingMapIn == null) return null;synchronized (bindingMapIn) {return bindingMapIn.get(queueName);}}public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingMap.get(exchangeName);}//删除绑定;public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMapIn = bindingMap.get(binding.getExchangeName());if (bindingMapIn == null) {throw new MqException("[MemoryDataCenter] 绑定不存在!!!exchangeName:" + binding.getExchangeName()+ "queueName:" + binding.getQueueName());}synchronized (bindingMapIn) {Binding toDelete = bindingMapIn.get(binding.getQueueName());if (toDelete == null) {throw new MqException("[MemoryDataCenter] 绑定不存在!!!exchangeName:" + binding.getExchangeName()+ "queueName:" + binding.getQueueName());}bindingMapIn.remove(binding.getQueueName());}}

封装 Message ⽅法

//对消息的操作://查询消息:public Message getMessage(String messageId) {return messageMap.get(messageId);}//添加消息:public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] message插入成功!!! messageId:" + message.getMessageId());}//删除消息:public void deleteMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] message删除成功!!! messageId:" + messageId);}// 发送消息到指定队列中public void sendMessage(MESGQueue queue, Message message) {//这些代码可以用一行代码代替:这两行应该都行:经过验证:第一行是错的。
//        LinkedList<Message> messages = queueMessageMap.putIfAbsent(queue.getName(), new LinkedList<>());LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());
//        //先根据队列的名字,找到对应的消息链表:
//        LinkedList<Message> messages = queueMessageMap.get(queue.getName());
//        //如果指定的消息链表不存在,则new出来:
//        if(messages == null){
//            messages = new LinkedList<>();
//        }synchronized (messages) {//将消息添加到链表里:messages.add(message);}//别忘了将消息添加到消息中心,假设message已经在消息中心存在,重复插入也没有关系,毕竟是hash表嘛//另外,这一句也不用考虑线程安全问题,因为,就算多线程多插了几次,可他是hash表啊,插入的对象都一样,不就相当于覆盖了吗messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter]  消息send到队列中成功!!!messageId:" + message.getMessageId());}//从队列中取消息(需要删除的那一种):public Message pollMessage(String queueName) throws MqException {//先根据队列的名字获取对应的消息链表:LinkedList<Message> messages = queueMessageMap.get(queueName);//必须将messages == null写在外边,因为加锁操作的锁对象不能为空。if (messages == null) {throw new MqException("[MemoryDataCenter] 消息不存在! 取消息失败!!! queueName=" +queueName);}synchronized (messages) {//如果非空,则将表头的元素返回:if (messages.size() == 0) return null;Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出成功!!!messageId:" + currentMessage.getMessageId());return currentMessage;}}//获取消息的数量:public int countMessages(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if (messages == null) {// 如果队列不存在, 则直接返回⻓度 0, 说明该 queueName 下还没有消息return 0;}synchronized (messages) {return messages.size();}}

针对未确认的消息处理

 //对于未被确认的消息://添加未被确认的消息:public void addMessageWaitAck(String queueName, Message message) {//先依据queueName 获取对应的消息hash表;由于不存在还是需要new出来一个,这里就直接使用cmputeIfAbsent了ConcurrentHashMap<String, Message> messageWaitMap = queueMessageWaitAck.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>());//由于ConcurrentHashMap是线程安全的,所以这里就不加锁了。messageWaitMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 添加 未被确认的消息成功!!!messageId:" + message.getMessageId());}//删除未被确认的消息:public void deleteMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageWaitMap = queueMessageWaitAck.get(queueName);if (messageWaitMap == null) return;messageWaitMap.remove(messageId);System.out.println("[MemoryDataCenter] 删除 未被确认的消息成功!!!messageId:" + messageId);}//查询未被确认的消息:public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageWaitMap = queueMessageWaitAck.get(queueName);if (messageWaitMap == null) return null;return messageWaitMap.get(messageId);}

实现重启后恢复内存

// 实现重启后恢复内存,从硬盘上恢复数据public void recovery(DiskDataCenter diskDataCenter) throws IOException, ClassNotFoundException, MqException {//在恢复数据之前,先将内存中的数据结构清空:exchangeMap.clear();queueMap.clear();bindingMap.clear();messageMap.clear();queueMessageMap.clear();//恢复交换机:List<Exchange> exchanges= diskDataCenter.selectExchanges();for(Exchange x :exchanges){exchangeMap.put(x.getName(),x);}//恢复消息队列:List<MESGQueue> queues = diskDataCenter.selectMESGQueues();for (MESGQueue x:queues) {queueMap.put(x.getName(),x);}//恢复绑定(注意绑定是嵌套的数据结构)List<Binding> bindings = diskDataCenter.selectBindings();for(Binding x :bindings){
//            ConcurrentHashMap<String,Binding> bindingMap2 = bindingMap.computeIfAbsent(x.getExchangeName(),k->new ConcurrentHashMap<>());
//            bindingMap2.put(x.getQueueName(),x);String exchangeName = x.getExchangeName();String queueName = x.getQueueName();ConcurrentHashMap<String,Binding> hash = new ConcurrentHashMap<>();bindingMap.put(exchangeName,hash);hash.put(queueName,x);}//恢复消息://这一步就要将所有的队列都枚举出来,挨个恢复消息(说白了也就是loadAllMessagesFromQueue缺少queueName参数,所以才一个个枚举队列,拿到queueName)//上面已经将所有的队列拿出来过了,可以使用现成的方法:for(MESGQueue queue:queues){LinkedList<Message> messages = diskDataCenter.loadAllMessagesFromQueue(queue.getName());//这个恢复消息表面上只有一个恢复消息的操作,可是他却需要将两个数据结构都填上://messageMap和queueMessageMapqueueMessageMap.put(queue.getName(),messages);//然后再将消息一一填入消息中心:for(Message message:messages){messageMap.put(message.getMessageId(),message);}}//    queueMessageWaitAck 则不必恢复. 未被确认的消息只是在内存存储. 如果这个时候//       broker 宕机了, 则消息视为没有被消费过.//另一个原因也就是我们根本没有准备任何关于queueMessageWaitAck的代码。}

测试 MemoryDataCenter

在这里插入图片描述


@SpringBootTest
public class MemoryDataCenterTests {private MemoryDataCenter memoryDataCenter = null;@BeforeEachpublic void setUp(){memoryDataCenter = new MemoryDataCenter();memoryDataCenter.init();}@AfterEachpublic void tearDown(){memoryDataCenter = null;}//一个单元测试不一定只能测试一个方法,也可以测试多个方法,视情况而定://交换机测试思路:先插入一个交换机,检测一下拿到的交换机是否是同一个;再将这个交换机删掉,检测一下是否查不到了@Testpublic void testExchange(){Exchange exchange = new Exchange();exchange.setName("111");exchange.setAutoDelete(false);exchange.setDurable(false);exchange.setType(ExchangeType.DIRECT);HashMap<String,Object> hash = new HashMap<>();hash.put("111",111);hash.put("222",222);hash.put("333",333);exchange.setArguments(hash);memoryDataCenter.insertExchange(exchange);Exchange realExchange = memoryDataCenter.getExchange("111");Assertions.assertEquals("111",realExchange.getName());Assertions.assertEquals(ExchangeType.DIRECT,realExchange.getType());Assertions.assertEquals(111,realExchange.getArguments("111"));Assertions.assertEquals(222,realExchange.getArguments("222"));memoryDataCenter.deleteExchange("111");Exchange realExchange2 = memoryDataCenter.getExchange("111");Assertions.assertEquals(null,realExchange2);System.out.println("[testExchange] 测试成功!!!");}//测试队列,和测试交换机思想差不多:@Testpublic void testQueue(){//先创建队列:MESGQueue queue = new MESGQueue();queue.setName("testQueue");queue.setExclusive(false);queue.setDurable(false);HashMap<String,Object> hash = new HashMap<>();hash.put("111",111);hash.put("222",222);hash.put("333",333);queue.setArguments(hash);//测试:memoryDataCenter.insertQueue(queue);MESGQueue realQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals("testQueue",realQueue.getName());Assertions.assertEquals(111,realQueue.getArguments("111"));Assertions.assertEquals(222,realQueue.getArguments("222"));memoryDataCenter.deleteQueue("testQueue");Assertions.assertEquals(null,memoryDataCenter.getQueue("testQueue"));System.out.println("[testQueue] 测试成功!!!");}//测试绑定:一样的思路,只不过因为绑定是嵌套的,所以略有不同:@Testpublic void testBinding() throws MqException {Binding binding = new Binding();binding.setExchangeName("testExchange");binding.setQueueName("testQueue");binding.setBindingKey("bindingKey");//先插入绑定:memoryDataCenter.insertBinding(binding);Binding realBinding  = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(binding,realBinding);ConcurrentHashMap<String,Binding> hash = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(binding,hash.get("testQueue"));memoryDataCenter.deleteBinding(binding);Assertions.assertNull(memoryDataCenter.getBinding("testExchange","testQueue"));System.out.println("[testBinding] 测试成功!!!");}//测试消息:@Testpublic void testMessage(){Message message = new Message();message.setMessageId("123");message.setBody("abcd".getBytes());memoryDataCenter.addMessage(message);Message realMessage = memoryDataCenter.getMessage("123");Assertions.assertEquals(message,realMessage);memoryDataCenter.deleteMessage("123");Assertions.assertNull(memoryDataCenter.getMessage("123"));System.out.println("[testMessage] 测试成功!!!");}//为了测试这个方法,我们先将创建队列和消息的方法写出来:private MESGQueue createMESGQueue(String queueName){MESGQueue queue = new MESGQueue();queue.setName(queueName);queue.setExclusive(false);queue.setDurable(false);HashMap<String,Object> hash = new HashMap<>();hash.put("111",111);hash.put("222",222);return queue;}private Message createMessage(String messageId){Message message = new Message();message.setMessageId(messageId);message.setBody("abcd".getBytes());return message;}//由于测试这个方法需要 queue和message,先将他们创建出来//这个send方法的测试思路:先将10消息发送到指定队列,再将10条消息拿出来,逐个对比,@Testpublic void testSendMessage() throws MqException {MESGQueue queue = createMESGQueue("testQueue1234");LinkedList<Message> expectedMessages = new LinkedList<>();//发送消息:for(int i =0;i<10;i++){Message message = createMessage("testMessage"+i);memoryDataCenter.sendMessage(queue,message);expectedMessages.add(message);}Assertions.assertEquals(10,memoryDataCenter.countMessages("testQueue1234"));//拿出消息验证个数:LinkedList<Message> realMessages = new LinkedList<>();while(true){Message message = memoryDataCenter.pollMessage(queue.getName());if(message == null) break;realMessages.add(message);}Assertions.assertEquals(10,realMessages.size());for(int i =0;i<10;i++){Assertions.assertEquals(expectedMessages.get(i),realMessages.get(i));}System.out.println("[testSendMessage] 测试成功!!!");}@Testpublic void testMessageWaitAck(){//这里我们只是测试,由于MemoryDataCenter子给自足,直接不用new队列传入队列名字也是一样的,反正内部代码不存在终究会new出来:Message expectedMessage = createMessage("messageId123");memoryDataCenter.addMessageWaitAck("queueName123",expectedMessage);Message realMessage = memoryDataCenter.getMessageWaitAck("queueName123","messageId123");Assertions.assertEquals(expectedMessage,realMessage);memoryDataCenter.deleteMessageWaitAck("queueName123","messageId123");Assertions.assertNull(memoryDataCenter.getMessageWaitAck("queueName123","messageId123"));System.out.println("[testMessageWaitAck] 测试成功!!!");}@Testpublic void testRecovery() throws IOException, MqException, ClassNotFoundException {//由于后续要进行数据库操作,依赖MyBatis,就需要启动SpringApplication,这样才能进行后续的数据库操作:MqApplication.context = SpringApplication.run(MqApplication.class);//构造硬盘数据:DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();//交互换机:Exchange expectedExchange = new Exchange();HashMap<String,Object> hash = new HashMap<>();hash.put("111",111);expectedExchange.setName("testExchange");expectedExchange.setArguments(hash);diskDataCenter.insertExchange(expectedExchange);//队列:MESGQueue expectedQueue = createMESGQueue("testQueue");diskDataCenter.insertMESGQueue(expectedQueue);//绑定:Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");diskDataCenter.insertBinding(expectedBinding);//消息:Message expectedMessage = createMessage("testMessage");diskDataCenter.sendMessage(expectedQueue,expectedMessage);//执行恢复操作:memoryDataCenter.recovery(diskDataCenter);//对比结果://交换机:Exchange realExchange = memoryDataCenter.getExchange("testExchange");//此处不能直接判断expectedExchange和realExchange,因为从文件中拿出来的不是同一个对象,和在内存中拿不一样。
//        Assertions.assertEquals(expectedExchange,realExchange);Assertions.assertEquals(expectedExchange.getName(),realExchange.getName());Assertions.assertEquals(expectedExchange.getType(),realExchange.getType());Assertions.assertEquals(expectedExchange.getArguments("111"),realExchange.getArguments("111"));//队列:MESGQueue realQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue.getName(),realQueue.getName());Assertions.assertEquals(expectedQueue.getArguments("222"),realQueue.getArguments("222"));//绑定:Binding realBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding.getExchangeName(),realBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(),realBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(),realBinding.getBindingKey());//消息:Message realMessage = memoryDataCenter.getMessage("testMessage");Assertions.assertEquals(expectedMessage.getMessageId(),realMessage.getMessageId());Assertions.assertArrayEquals(expectedMessage.getBody(),realMessage.getBody());Message realMessage2 = memoryDataCenter.pollMessage("testQueue");Assertions.assertEquals(expectedMessage.getMessageId(),realMessage2.getMessageId());Assertions.assertArrayEquals(expectedMessage.getBody(),realMessage2.getBody());//清理硬盘的数据,把整个data目录里的内容都删了(包含了meta.db)//FileUtils.deleteDirectory方法,即使目录里面有文件,会递归的删除所有。//记得将SpringApplicatioin关掉,运行着会删除文件失败。MqApplication.context.close();File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);System.out.println("[testRecovery] 测试成功!!!");}}

测试结果:通过

在这里插入图片描述

http://www.dtcms.com/wzjs/152508.html

相关文章:

  • 平台搭建不武汉seo网站管理
  • 茂名做网站dyiee磁力搜索器
  • 网站模板交易网络seo是什么意思
  • 网站建设精英引流最好的推广方法
  • 为外国人做非法网站聚名网
  • 顺义区专业网站制作网站建设免费sem工具
  • 宁国网站建设外链代发软件
  • o2o网站建设如何竞价托管代运营公司
  • 手机网站怎么切图关键词推广方法
  • 哪些网站可以做招商广告语百度广告上的商家可靠吗
  • 网站跳出率一般是多少广告推广平台代理
  • 黑河网站seo软文写作公司
  • 揭阳制作公司网站seo网站优化工具
  • 潍坊哪里能找到做网站的宣传软文是什么
  • 哪个网站做供求信息电商培训课程
  • 百度做的网站最近几天的新闻
  • 网站建设的规划和设计潮州seo建站
  • 做名片上什么网站百度怎么做网站
  • 网站怎么做万词杭州网站定制
  • 阿里云做私服网站企业网页设计制作
  • 如何自己做软件网站深圳高端seo公司助力企业
  • 跨境电商网站开发公司网络营销成功案例ppt
  • 长沙有哪些做的好一点的网站推广的几种方式
  • 游戏推广网站怎么做公司产品推广文案
  • 怎么在电脑上自己做网站吗手机上怎么制作网页
  • 国内外创意网站欣赏威海seo优化公司
  • 中国建设银行保函查询网站软文代写
  • 十堰做网站百度文库网页版登录入口
  • 外贸公司的网站建设模板下载重庆seo按天收费
  • 苏州外贸营销网站建设深圳关键词排名推广