【RelayMQ】基于 Java 实现轻量级消息队列(七)
目录
一. Virtual
1.1 VirtualName
1.2 虚拟机中的属性
二. Virtual中的方法
2.1 虚拟机初始化
2.2 交换机操作
2.2.1 创建交换机
2.2.2 删除交换机
2.3 队列操作
2.3.1 增加队列
2.3.2 删除队列
2.4 绑定操作
2.4.1 添加绑定
2.4.2 删除绑定
三. Router类
3.1 判断BindingKey是否合法
3.2 判断RouteringKey是否合法
3.3 匹配规则
本篇文章介绍Broker中虚拟机对数据的管理, 封装硬件数据和内存数据, 实现解耦
一. Virtual
虚拟主机 类似于Mysql中的DataBase, 对交换机, 队列, 绑定进行逻辑上的隔离
我们在设计的时候, 不仅要考虑虚拟主机对数据的管理, 还需要提供一些API, 供上层调用
1.1 VirtualName
面临问题一: 虚拟主机是逻辑上的隔离, 那么如何管理虚拟主机和交换机的从属关系?
- 可以参考数据库的方式: 采用"1对多"的方案, 引入一个表, 表述虚拟主机名和交换机名,存储之间的对应关系
- 约定交换机的命名, 通过交换机的名字可以清楚知道属于哪个虚拟机 (比如Virtual1Exchange1, Virtual2Exchange1)
- 给每一个交换机分配一组数据库专门用来存储交换机
1.2 虚拟机中的属性
private String virtualName;private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private Router router = new Router();private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();private ConsumerManager consumerManager = new ConsumerManager(this);
在这个类中会封装内存和硬盘数据管理操作, 在部分核心操作中加入锁, 保障多线程安全
二. Virtual中的方法
2.1 虚拟机初始化
- 先开始硬盘数据的初始化 (如果不存在则创建, 存在则进行后续的数据恢复)
- 将硬盘中的数据加载到内存中
public VirtualHost(String virtualName) {this.virtualName = virtualName;
// 1.进行初始化//硬件数据的初始化diskDataCenter.init();
// 2.将硬件中的数据加载到内存中(这里只有存在数据恢复才有意义)try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败");}}
2.2 交换机操作
2.2.1 创建交换机
- 重新定义交换机名字
- 判断交换机是否存在 (避免重复)
- 如果存在则返回true, 如果不存在,则进行创建操作
- 将数据写入硬盘中
- 将数据写入内存中
// 创建交换机(如果存在就返回,不存在就创建)public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String,Object> arguments){// 1.重新定义交换机名exchangeName = virtualName+exchangeName;try{synchronized (exchangeLocker) {
// 2.判断交换机是否存在Exchange existExchange = memoryDataCenter.getExchange(exchangeName);if (existExchange != null) {System.out.println("交换机已存在");return true;}
// 3.交换机不存在,则需要创建Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);
// 4.数据写入硬盘(持久化存储)if (durable == true) {diskDataCenter.insertExchange(exchange);}
// 5.数据写入内存memoryDataCenter.insertExchange(exchangeName, exchange);System.out.println("交换机创建成功");}return true;}catch (Exception e){System.out.println("交换机创建失败");return false;}}
2.2.2 删除交换机
- 重新定义交换机名字
- 判断交换机是否存在
- 如果存在则进行删除操作, 如果不存在,则抛出异常
- 删除硬盘数据
- 删除内存数据
public boolean exchangeDelete(String exchangeName){exchangeName = virtualName+exchangeName;try{synchronized (exchangeLocker) {Exchange existExchange = memoryDataCenter.getExchange(exchangeName);if (existExchange == null) {throw new MqException("交换机不存在,无法删除");}if (existExchange.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println("交换机删除成功");}return true;} catch (MqException e) {System.out.println("交换机删除失败");return false;}}
2.3 队列操作
2.3.1 增加队列
- 重新定义队列名字
- 判断队列是否存在 (避免重复)
- 如果存在则返回true, 如果不存在,则进行创建操作
- 将数据写入硬盘中
- 将数据写入内存中
public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){queueName = virtualName+queueName;try{synchronized (queueLocker) {MSGQueue existQueue = memoryDataCenter.getQueue(queueName);if (existQueue != null) {System.out.println("队列已存在");return true;}MSGQueue newQueue = new MSGQueue();newQueue.setName(queueName);newQueue.setDurable(durable);newQueue.setExclusive(exclusive);newQueue.setAutoDelete(autoDelete);newQueue.setArguments(arguments);if (durable == true) {diskDataCenter.insertMSGQueue(newQueue);}memoryDataCenter.insertQueue(newQueue.getName(), newQueue);System.out.println("队列创建成功");}return true;} catch (IOException e) {System.out.println("队列创建失败");return false;}}
2.3.2 删除队列
- 重新定义队列名字
- 判断队列是否存在
- 如果存在则进行删除操作, 如果不存在,则抛出异常
- 删除硬盘数据
- 删除内存数据
public boolean queueDelete(String queueName){queueName = virtualName+queueName;try{synchronized (queueLocker) {MSGQueue existQueue = memoryDataCenter.getQueue(queueName);if (existQueue == null) {throw new MqException("队列不存在,无法删除");}if (existQueue.isDurable()) {diskDataCenter.deleteMSGQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("队列删除成功");}return true;} catch (MqException | IOException e) {System.out.println("删除失败");e.printStackTrace();return false;}}
2.4 绑定操作
2.4.1 添加绑定
- 重新定义交换机和队列名字
- 判断这个绑定是否存在 (避免重复)
- 如果存在则抛出异常, 如果不存在,则进行创建操作
- 判断内存中是否存在该交换机和队列
- 如果存在则继续创建, 不存在则抛出异常
- 将数据写入硬盘中
- 将数据写入内存中
public boolean queueBind(String queueName,String exchangeName,String bindingKey){
// 1.重命名queueName = virtualName+queueName;exchangeName = virtualName+exchangeName;try{synchronized (exchangeLocker){synchronized (queueLocker){
// 2.判断绑定是否存在Binding exitBinding = memoryDataCenter.getBinding(queueName,exchangeName);if(exitBinding != null){throw new MqException("binding 已存在,无法添加");}
// 3.添加绑定关系//验证binding是否合法if(!router.checkBindingKey(bindingKey)){throw new MqException("binding 非法,无法添加");}Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);
// 4.验证一下内存中交换机和队列是否存在Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange ==null){throw new MqException("绑定过程中内存交换机不存在");}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("绑定过程中内存队列不存在");}
// 5.写入硬盘(交换机和队列都为持久化)if(exchange.isDurable()&&queue.isDurable()){diskDataCenter.insertBinding(binding);}
// 6.写入内存memoryDataCenter.insertBinding(binding);System.out.println("绑定关系添加成功");}}return true;} catch (MqException e) {System.out.println("绑定关系添加失败");e.printStackTrace();return false;}}
2.4.2 删除绑定
- 重新定义交换机和队列名字
- 判断这个绑定是否存在
- 如果绑定存在, 则进行删除操作, 如果不存在, 则抛出异常
- 删除硬盘数据
- 删除内存数据
public boolean queueUnbind(String queueName,String exchangeName){exchangeName = virtualName+exchangeName;queueName = virtualName+queueName;try{synchronized (exchangeLocker) {synchronized (queueLocker) {
// 1.检查绑定是否存在Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("对应的队列不存在绑定,无法删除");}
/*// 2.检查交换机和队列是否存在MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("对应的绑定中队列不存在,无法删除");}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("对应的绑定中交换机不存在,无法删除");}
// 3.删除硬件中的绑定(交换机和队列都是持久化存储,那么绑定也是持久化存储)if(exchange.isDurable()&&queue.isDurable()){diskDataCenter.deleteBinding(binding);}*/// 3.省去校验,直接删除(就算删除失败,也没有副作用,好处就是方便快捷,避免出现了交换机和队列已经被删除了,但是绑定还在,删除失败)diskDataCenter.deleteBinding(binding);
// 4.删除内存中的绑定memoryDataCenter.deleteBinding(binding);System.out.println("删除绑定成功");}}return true;} catch (MqException e) {System.out.println("删除绑定失败");return false;}}
三. Router类
3.1 判断BindingKey是否合法
在MQ系统中,TOPIC交换机支持两种通配符:
- * (星号):匹配一个单词
- # (井号):匹配零个或多个单词
比如:
- aaa . * 匹配 aaa.bbb 、 aaa.ccc 等
- aaa . # 匹配 aaa 、 aaa.bbb 、 aaa.bbb.ccc 等
- aaa . # . ccc 匹配 aaa . ccc 、 aaa . bbb . ccc 、 aaa . bbb . ddd . ccc 等
代码步骤:
- 字符合法性校验
- 格式校验
- 相邻关系校验
public boolean checkBindingKey(String bindingKey){if(bindingKey.length() == 0){return true;}for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if(ch>='A'&& ch<='Z'){continue;}if(ch>='a'&& ch<='z'){continue;}if(ch>='0'&& ch<='9'){continue;}if(ch=='_'|| ch=='.'){continue;}if(ch=='*'|| ch=='#'){continue;}return false;}//检查格式是否合法(不合法aaa.bbb*.ccc|aaa.#b.ccc)String[] words = bindingKey.split("\\.");for (String word:words){if(word.length()>1&& (word.contains("*")||word.contains("#"))){return false;}}
// 判断通配符之间的相邻关系是否合法(下面的格式难实现,作用也不大)for (int i = 0; i < words.length - 1; i++) {if(words[i].equals("#") && words[i+1].equals("*")){return false;}if(words[i].equals("*") && words[i+1].equals("#")){return false;}if(words[i].equals("#") && words[i+1].equals("#")){return false;}}return true;}
BindingKey类似于出题方, *和# 具备特殊含义
3.2 判断RouteringKey是否合法
代码步骤:
- 字符长度校验
- 字符合法性校验
public boolean checkRoutingKey(String routingKey){if(routingKey.length()==0){return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);if(ch>='A'&& ch<='Z'){continue;}if(ch>='a'&& ch<='z'){continue;}if(ch>='0'&& ch<='9'){continue;}if(ch=='_'|| ch=='.'){continue;}if(ch=='*'|| ch=='#'){continue;}return false;}return true;}
RouteringKey类似于答题方, *和# 不具备特殊含义
3.3 匹配规则
如果是主题交换机则直接true
如果是扇形交换机则单独判断
public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {if(type == ExchangeType.FANOUT){return true;}else if (type==ExchangeType.TOPIC) {return routeTopic(binding,message);}else {throw new MqException("交换机类型异常");}}
在主题交换机判断的时候, 需要将每一个都进行判断, 其中存在五种情况
- bindingKey为 * 的时候, 直接判断下一个区间
- bindingKey为 # 的时候, 如果后面没有内容了, 直接true
- bindingKey为 # 的时候, 如果后面存在内容, 遍历RoutingKey是否可以找到后续的部分, 找不到返回-1, 找到返回位置下表
- bindingKey为 普通字符 的时候, 挨个判断即可
- 要保证双方都是一起走完的情况
private boolean routeTopic(Binding binding, Message message) {String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;while(bindingIndex<bindingTokens.length && routingIndex<routingTokens.length){
// 情况1:binding为 * 对应routing 匹配任意一个都可以(直接抬走下一个)if(bindingTokens[bindingIndex].equals("*")){bindingIndex++;routingIndex++;continue;
// 情况2:binding为 #}else if(bindingTokens[bindingIndex].equals("#")){
// 1.# 号 后面没有内容bindingIndex++;if(bindingIndex==bindingTokens.length){return true;}
// 2.# 号 后面有内容,在routingKey后面一直找,找到和bindingKey对应的内容(如果存在,则正常进行下一个部分的判断,不存在直接报错)//如果找到,返回对应的下表,如果没有找到返回-1routingIndex = findNextMatch(routingTokens,routingIndex,bindingTokens[bindingIndex]);if(routingIndex==-1){return false;}//如果找到,将返回的下标进行+1,进行下一位的正常判断bindingIndex++;routingIndex++;}else {
// 情况3:普通的字符串,进行正常的判断即可if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}
// 情况4:有一方走完了,导致跳出循环,这种情况不能返回true,需要在加一步判断,是不是双方都走完了if(bindingIndex == bindingTokens.length && routingIndex == routingTokens.length){return true;}return false;}
遍历查找操作
private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {String n = routingTokens[i];if(n.equals(bindingToken)){//注意:这里返回i,返回routingKey的数值一直没有变化return i;}}return -1;}