当前位置: 首页 > news >正文

RabbitMinQ(模拟实现消息队列项目)02

目录

十.整合数据库和文件数据

创建DiskDataManager类

十一.内存结构设计

创建MeneryDataCenter类:

实现集合操作:

对MemoryDataCenter类功能测试:

十二.整合内存和磁盘数据

创建VirtualHost类:

Exchange:

MSGQueue:

Binding:

创建Router类

对Router类的TOPIC匹配进行测试:

发送消息:

创建ConsumerManager类:

订阅消息:

创建ConsumerEnv类:

创建Consumer接口:

为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:

在ConsumerManager类中实现添加消费者方法:

十三.网络通信协议设计

设计应用层协议:

创建request类:

创建response类:

创建参数父类:

创建响应父类:

创建设备功能的参数类:

十四.实现BrokerServer

十五.实现客户端

创建ConnectionFactory.

创建Connection类:

创建Channel类:

客户端代码测试:

十六.完成

成果测试:

启动消息队列服务器:

创建生产者 发送消息:

创建消费者消费消息:


十.整合数据库和文件数据

上面的代码中,使用数据库存储了Exchange,Queue,Binding,使用文件存储了Message,

下面对数据库和文件中的数据进行整合.进行统一管理.

创建DiskDataManager类

/*** 对数据库中的Exchange,Queue,Binding和文件中的Message数据进行整合* 统一管理,后续上层代码直接调用该类中的方法即可,无需再向下层数据结构调用*/
public class DiskDataManager {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();public void init() throws JsonProcessingException {dataBaseManager.init();messageFileManager.init();}//交换机://添加交换机public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}//删除交换机public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}//查找交换机public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//队列//添加队列public void insertQueue(MSGQueue queue) throws IOException, MqException {dataBaseManager.insertQueue(queue);//创建队列后,不仅要将队列写入到数据库中,还要创建出对应的目录和文件messageFileManager.createQueueFile(queue.getName());}//删除队列public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);//删除队列后,还要讲对应的目录和文件删除messageFileManager.destoryQueueFile(queueName);}//查找队列public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueues();}//绑定关系//添加绑定关系public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}//删除绑定关系public void deleteBinding(String bingingKey){dataBaseManager.deleteBindings(bingingKey);}//查找绑定public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}//消息//发送消息public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}//删除消息public void deleteMessageFromQueue(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessageFromFile(queue,message);//删除消息后,查看是否需要进行GCif(messageFileManager.checkGC(queue.getName())){messageFileManager.GC(queue);}}//加载所有的消息到内存中public List<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessage(queueName);}
}

十一.内存结构设计

将数据存储到数据库和文件,是为了实现其持久性,但数据还是要存储在内存上的,这样才能更快的访问到数据.

创建MeneryDataCenter类:

这里通过设计不同的数据集合来存储数据在内存中.

/*** 将数据存储在内存中,创建不同的数据集合来管理* 要管理的数据有:* 交换机* 队列* 绑定关系* 消息* 队列中的消息集合* 待确认消息队列中的消息集合*/
public class MemoryDataCenter {//key:exchangeNameprivate ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//key:queueNameprivate ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
//   key1:exchangeName key2:queueNameprivate ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//key: messageIdprivate ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
//  key:queueName  List:messageprivate ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();//存储在手动确认模式下,管理待确认的消息和队列,在未收到确认消息时,要先将数据存储到这个数据集合中,
//  key1:queueName   key2:messageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> WaitAckQueueMessageMap = new ConcurrentHashMap<>();
}

实现集合操作:

 //交换机://插入public void insertExchange(Exchange exchange){exchangeMap.put(exchange.getName(),exchange);System.out.println("[MemoryDataCenter] 新增交换机成功 exchangeName:"+exchange.getName());}//删除public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 删除交换机成功 exchangeName:"+exchangeName);}//查找public Exchange getExchange(String exchangeName){Exchange exchange = exchangeMap.get(exchangeName);return exchange;}//队列//插入public void insertQueue(MSGQueue queue){queueMap.put(queue.getName(),queue);System.out.println("[MemoryDataCenter] 新增队列成功! queueName: "+queue.getName());}//删除public void deleteQueue(String queueName){queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功! queueName: "+queueName);}//查找public MSGQueue getQueue(String queueName){return queueMap.get(queueName);}//绑定关系://新增public void insertBinding(Binding binding) throws MqException {
//        //绑定关系不存在时,创建一个,存在时,进行覆盖
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getQueueName());
//        if(bindingMap==null){
//            bindingMap = new ConcurrentHashMap<>();
//        }
//        bindingMap.put(binding.getQueueName(),binding);
//        bindingsMap.put(binding.getExchangeName(),bindingMap);//这个方法是ConcurrentMap方法用来判断对应的哈希表是否存在,不存在就执行第二个参数,存在就直接赋值,和上面的逻辑是一样的//且该方法是原子的,不存在线程安全问题ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),f -> new ConcurrentHashMap<>());//此处可能会存在线程安全问题,以绑定关系为基准进行上锁synchronized (binding){Binding binding1 = bindingMap.get(binding.getQueueName());//当绑定关系已经存在时,抛出异常,只有新的绑定插入时,才会成功if(binding1!=null){throw new MqException("[MemoryDataCenter] 绑定已存在 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName());}bindingMap.put(binding.getQueueName(),binding);}System.out.println("[MemoryDataCenter] 新的绑定创建成功! +exchangeName:"+binding.getExchangeName()+" ,queueName: "+binding.getQueueName()+" ,bindingKey: "+binding.getBindingKey());}//删除public void deleteBinding(Binding binding) throws MqException {//先判断交换机是否存在绑定,不存在时,无法删除.抛出异常ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap==null){throw new MqException("[MemoryDataCenter] 无绑定关系,删除失败 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功 exchangeName: "+binding.getExchangeName()+" ,queueName: "+binding.getQueueName()+" ,bindingKey:"+binding.getBindingKey());}//查找public Binding getBinding(String exchangeName,String queueName) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap==null){return null;}Binding binding = bindingMap.get(queueName);return binding;}//消息//插入public void insertMessage(Message message){messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 新增消息成功 messageID:"+message.getMessageId());}//删除public void deleteMessage(String messageId){messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息删除陈功 messageId: "+messageId);}//查找public Message getMessage(String messageId){return messageMap.get(messageId);}//队列消息集合//发送消息到指定队列public void sendMessage(MSGQueue queue,Message message){//1.先查找队列对应的集合是否存在,不存在时创建消息集合LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), f -> new LinkedList<>());//这里当多个线程同时执行插入操作时,可能会覆盖消息,要以集合为维度进行上锁synchronized(messages){messages.add(message);}//将消息也存入到消息集合中messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 发送消息到队列成功 queueName:"+queue.getName()+" ,messageId:"+message.getMessageId());}//从队列中取消息public Message pollMessage(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages==null || messages.isEmpty()){return null;}Message message = messages.remove(0);System.out.println("[MemoryDataCenter] 从队列中取消息成功 queueName:"+queueName+" ,messageId:"+message.getMessageId());return message;}//获取队列中的消息个数public int getMessageCountFromQueue(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages==null) return 0;//此处获取集合中元素个数可能存在线程安全问题,对集合进行上锁synchronized(messages){return messages.size();}}//待确认消息集合//发送消息到待确认消息集合public void sendWaitMessage(String queueName,Message message){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.computeIfAbsent(queueName, f -> new ConcurrentHashMap<>());//此处向待确认消息集合中插入数据时,也可能存在线程安全问题,以集合为维度加锁synchronized (waitMessagesMap){waitMessagesMap.put(message.getMessageId(),message);}System.out.println("[MemoryDataCenter] 发送待确认消息到队列成功 queueName:"+queueName+" ,messageId:"+message.getMessageId());}//从队列中取待确认消息public Message pollWaitMessage(String queueName,String messageId){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);if(waitMessagesMap==null){return null;}Message message = waitMessagesMap.get(messageId);if(message==null){return null;}System.out.println("[MemoryDataCenter] 从队列中取代确认消息成功 messageId:"+messageId+" ,queueName:"+queueName);return message;}//从队列中删除待确认消息public void deleteWaitMessage(String queueName,String messageId){ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);if(waitMessagesMap==null){System.out.println("[MemoryDataCenter] 待确认消息队列不存在,消息删除失败 messageId:"+messageId+" ,queueName:"+queueName);}waitMessagesMap.remove(messageId);System.out.println("[MemoryDataCenter] 待确认消息删除成功 messageId:"+messageId+" ,queueName:"+queueName);}//恢复所有硬盘中的数据//当服务器重启后,内存中的数据都不存在了,要从磁盘中获取数据public void recovery(DiskDataManager diskDataManager) throws IOException, MqException, ClassNotFoundException {//先将内存中的集合都清空,防止存在残留数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();queueMessageWaitAckMap.clear();//恢复交换机数据List<Exchange> exchanges = diskDataManager.selectAllExchanges();for(Exchange e:exchanges){String exchangeName = e.getName();exchangeMap.put(exchangeName,e);}//恢复队列数据List<MSGQueue> queues = diskDataManager.selectAllQueues();for(MSGQueue q:queues){String queueName = q.getName();queueMap.put(queueName,q);}//恢复绑定关系List<Binding> bindings = diskDataManager.selectAllBindings();for(Binding b:bindings){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(b.getExchangeName(), f -> new ConcurrentHashMap<>());bindingMap.put(b.getQueueName(),b);}//恢复消息for(MSGQueue q:queueMap.values()){List<Message> messages = diskDataManager.loadAllMessageFromQueue(q.getName());for(Message m:messages){messageMap.put(m.getMessageId(),m);}}//对于未确认消息,当服务器重启后,服务器中所有的消息都要重新发送,未被确认的消息就都成了未被取走的消息了,//对于未确认的消息, 就不需要回复这些数据了}

对MemoryDataCenter类功能测试:

@SpringBootTest
public class MemoryDataCenter {private MemoryDataCenter memoryDataCenter;@BeforeEachpublic void setUp(){memoryDataCenter = new MemoryDataCenter();System.out.println("前置工作已经准备后!");}@AfterEachpublic void tearDown(){memoryDataCenter = null;System.out.println("收尾工作以完成!");}
}

测试功能:

//测试交换机相关操作private Exchange createExchange(String exchangeName){Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);return exchange;}@Testvoid testExchange(){Exchange exchange = createExchange("exchangeTest");memoryDataCenter.insertExchange(exchange);Exchange act = memoryDataCenter.getExchange(exchange.getName());Assertions.assertEquals(exchange.getName(),act.getName());Assertions.assertEquals(exchange.getType(),act.getType());Assertions.assertEquals(exchange.isDurable(),act.isDurable());Assertions.assertEquals(exchange.isAutoDelete(),act.isAutoDelete());memoryDataCenter.deleteExchange(exchange.getName());act = memoryDataCenter.getExchange(exchange.getName());Assertions.assertNull(act);}//测试队列相关操作private MSGQueue createQueue(String queueName){MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);return queue;}@Testvoid testQueue(){MSGQueue queue = createQueue("queueTest");memoryDataCenter.insertQueue(queue);MSGQueue act = memoryDataCenter.getQueue(queue.getName());Assertions.assertEquals(queue.getName(),act.getName());Assertions.assertEquals(queue.isDurable(),act.isDurable());Assertions.assertEquals(queue.isAutoDelete(),act.isAutoDelete());memoryDataCenter.deleteQueue(queue.getName());act  = memoryDataCenter.getQueue(queue.getName());Assertions.assertNull(act);}//测试绑定关系相关操作private Binding createBinding(String exchangeName,String queueName,String bindingKey){Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);return binding;}@Testvoid testBinding() throws MqException {//要先创建队列和交换机Exchange exchange = createExchange("exchangeTest");MSGQueue queue = createQueue("queueTest");Binding binding = createBinding(exchange.getName(), queue.getName(), "bindingKeyTest");memoryDataCenter.insertBinding(binding);Binding act = memoryDataCenter.getBinding(exchange.getName(), queue.getName());Assertions.assertEquals(binding.getExchangeName(),act.getExchangeName());Assertions.assertEquals(binding.getQueueName(),act.getQueueName());Assertions.assertEquals(binding.getBindingKey(),act.getBindingKey());memoryDataCenter.deleteBinding(binding);act = memoryDataCenter.getBinding(act.getExchangeName(),act.getQueueName());Assertions.assertNull(act);}//测试消息操作private Message createMessage(String body){Message message = new Message();return message.createMessageById("routingKeyTest",null,body.getBytes());}@Testpublic void testMessage(){Message expectedMessage = createMessage("testMessage");memoryDataCenter.insertMessage(expectedMessage);Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);//删除消息memoryDataCenter.deleteMessage(expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}//测试队列中的消息集合@Testvoid testQueueMessage(){MSGQueue queue = createQueue("queueTest");Message message = createMessage("hello");memoryDataCenter.sendMessage(queue,message);int n = memoryDataCenter.getMessageCountFromQueue(queue.getName());Assertions.assertEquals(1,n);Message act = memoryDataCenter.pollMessage(queue.getName());n = memoryDataCenter.getMessageCountFromQueue(queue.getName());Assertions.assertEquals(0,n);Assertions.assertEquals(message.getMessageId(),act.getMessageId());Assertions.assertArrayEquals(message.getBody(),act.getBody());Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());}//测试待确认队列集合@Testvoid testWaitMessageQueue(){MSGQueue queue = createQueue("queueTest");Message message = createMessage("hello");memoryDataCenter.sendWaitMessage(queue.getName(), message);Message act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());Assertions.assertEquals(message.getMessageId(),act.getMessageId());Assertions.assertArrayEquals(message.getBody(),act.getBody());Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());memoryDataCenter.deleteWaitMessage(queue.getName(), message.getMessageId());act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());Assertions.assertNull(act);}//测试加载磁盘所有数据到内存@Testvoid testRecovery() throws IOException, MqException, ClassNotFoundException {//这里需要使用到mybatis,需要进行了类加载,先启动SpringApplicationMq02Application.context = SpringApplication.run(Mq02Application.class);//在磁盘上构造好数据:DiskDataManager diskDataCenter = new DiskDataManager();diskDataCenter.init("");//创建交换机:Exchange exchange = createExchange("testExchange");diskDataCenter.insertExchange(exchange);//创建队列:MSGQueue queue = createQueue("testQueue");diskDataCenter.insertQueue(queue);//创建绑定Binding binding = new Binding();binding.setExchangeName(exchange.getName());binding.setQueueName(queue.getName());binding.setBindingKey("bindingKey");diskDataCenter.insertBinding(binding);//创建消息Message message = createMessage("testContext");diskDataCenter.sendMessage(queue,message);//执行恢复:memoryDataCenter.recovery(diskDataCenter);//结果比对://交换机:Exchange actualExchange = memoryDataCenter.getExchange(exchange.getName());Assertions.assertEquals(exchange.getName(),actualExchange.getName());Assertions.assertEquals(exchange.getType(),actualExchange.getType());Assertions.assertEquals(exchange.isDurable(),actualExchange.isDurable());Assertions.assertEquals(exchange.isAutoDelete(),actualExchange.isAutoDelete());//队列:MSGQueue actualQueue = memoryDataCenter.getQueue(queue.getName());Assertions.assertEquals(queue.getName(),actualQueue.getName());Assertions.assertEquals(queue.isDurable(),actualQueue.isDurable());Assertions.assertEquals(queue.isAutoDelete(),actualQueue.isAutoDelete());//绑定:Binding actulaBinding = memoryDataCenter.getBinding(exchange.getName(), queue.getName());Assertions.assertEquals(binding.getExchangeName(),actulaBinding.getExchangeName());Assertions.assertEquals(binding.getQueueName(),actulaBinding.getQueueName());Assertions.assertEquals(binding.getBindingKey(),actulaBinding.getBindingKey());//消息:Message actualMessage = memoryDataCenter.getMessage(message.getMessageId());Assertions.assertEquals(message.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(message.getDeliveryMode(),actualMessage.getDeliveryMode());Assertions.assertEquals(message.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertArrayEquals(message.getBody(),actualMessage.getBody());//   清除文件//清理之前要先关闭文件Mq02Application.context.close();File file = new File("./data");FileUtils.deleteDirectory(file);}

十二.整合内存和磁盘数据

将内存和磁盘上的数据进行整合,用"虚拟机"这个概念将其整合起来. 不同虚拟机中的交换机 队列,绑定关系,消息都是不互通的. 此处为了简化,仅实现单台虚拟主机,但在数据结构上设置不同虚拟主句名
 为区分不同的虚拟主机上的设备,通过配置设备名区别:(以虚拟机名为前缀)
 规定:
 *  exchangeName = virtualHostName+exchangeName;
 *  queueName = virtualHostName+queueName;
并且将调用的方法抛出的异常都在这个类中进行处理,不再向上抛出

创建VirtualHost类:

@Data
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataManager diskDataManager = new DiskDataManager();public VirtualHost(String virtualHostName){this.virtualHostName = virtualHostName;//初始化磁盘数据:diskDataManager.init();//初始化内存数据try {memoryDataCenter.recovery(diskDataManager);} catch (IOException | MqException | ClassNotFoundException e) {System.out.println("[VirtualHost] 内存数据恢复失败");e.printStackTrace();}}
}

Exchange的声明和删除:

 //在对交换机在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以交换机为维度对其上锁//交换机锁对象:private final Object exchangeLocker = new Object();//交换机操作://创建交换机,//创建后,将其保存到内存和磁盘上public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable,boolean autoDelete, Map<String,Object> args){//   先根据约定 设置交换机名exchangeName = virtualHostName + exchangeName;synchronized(exchangeLocker){//先在内存上查找,若已存在,则直接返回Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange!=null){System.out.println("[VirtualHost] 交换机已经存在,不再创建 exchangeName:"+exchangeName);return true;}exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(type);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);//这里对args参数的设置.要在Exchange类中再为args关于Map参数添加set和get方法exchange.setArgs(args);//先存入数据库,再存入内存中,//这个顺序是:插入数据库操作比较容易出现异常,存内存出现异常的可能小较小//         若插入数据库失败,则不再存入内存中;//         若是转换顺序,当存数据库出现异常时,还要将内存中的数据再删了,比较麻烦if(durable){//当交换机设置为持久化时,将其存入内存:diskDataManager.insertExchange(exchange);}//存入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功 exchangeName:"+exchangeName);return true;}}//删除交换机//在内存和磁盘上将数据删除public boolean exchangeDelete(String exchangeName){exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker){Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 要删除的交换机不存在 exchangeName:"+exchangeName);}//删除内存数据:memoryDataCenter.deleteExchange(exchangeName);//删除磁盘数据:boolean durable = exchange.isDurable();if(durable){diskDataManager.deleteExchange(exchangeName);}System.out.println("[VirtualHost] 交换机删除成功 exchangeName:"+exchangeName);return true;}} catch (MqException e) {System.out.println("[VirtualHost] 交换机删除失败 exchangeName:"+exchangeName);e.printStackTrace();}return false;}

在Exchange类中关于args属性上,再增加关于Map参数类型的set方法:

 public void setArgs(Map<String,Object> args){this.args = args;}

MSGQueue的声明和删除:

//在对队列在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以队列为维度对其上锁//创建 队列锁对象:private final Object queueLocker = new Object();/**队列* 创建队列:创建队列并将其存入到磁盘和内存中*/public boolean queueDeclare(String queueName,boolean isDurable,boolean autoDelete, Map<String,Object> args){queueName = virtualHostName+queueName;try {synchronized(queueLocker){MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue!=null){System.out.println("[VirtualHost] 队列已经存在 queueName:"+queueName);return true;}MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(isDurable);queue.setAutoDelete(autoDelete);//此处在MSGQueue类中,针对args属性,要实现关于Map类型的set方法queue.setArgs(args);//存入磁盘if(isDurable) {diskDataManager.insertQueue(queue);}//存入内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 创建队列成功 !");}return true;} catch (IOException | MqException e) {System.out.println("[VirtualHost] 创建队列失败 queueName:"+queueName);e.printStackTrace();return false;}}//删除队列:从磁盘和内存中 删除队列public boolean queueDelete(String queueName){queueName = virtualHostName+queueName;try{synchronized(queueLocker){MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if(existsQueue==null){throw new MqException("[VirtualHost] 队列不存在,删除队列失败 queueName:"+queueName);}if(existsQueue.isDurable()){diskDataManager.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 删除队列成功 queueName:"+queueName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueName:" +queueName);e.printStackTrace();return false;}}

同样,在MSGQueue类中关于args属性上,再增加关于Map参数类型的set方法:

 public void setArgs(Map<String,Object> args){this.args = args;}

Binding的创建和删除:


//  该类实现和绑定相关的操作private Router router = new Router();//绑定的插入和删除//插入绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try{//1.验证绑定是否存在,不存在再创建Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding!=null){throw new MqException("[VirtualHost] 绑定已存在 exchangeName:"+exchangeName+" ,queueName:"+queueName);}//这里再创建一个类router,实现关于绑定相关的操作//2.判断bindingKey格式是否正确boolean ok = router.checkBindingKey(bindingKey);if(!ok){throw new MqException("[VirtualHost] 绑定格式有误 bindingKey:"+bindingKey);}//3.创建绑binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//4.验证绑定的队列和交换机是否存在MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost] 要绑定的队列不存在 queueName:"+queueName+" ,bindingKey:"+bindingKey);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 要绑定的交换机不存在 exchangeName:"+exchangeName+" ,bindingKey:"+bindingKey);}//5,存入磁盘//当队列和交换机同时设置持久化时,将该绑定关系存入磁盘if(queue.isDurable() && exchange.isDurable()){diskDataManager.insertBinding(binding);}//6.存入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 创建绑定成功 bindingKey: "+bindingKey);return true;}catch (MqException e) {System.out.println("[VirtualHost] 创建绑定失败 bindingKey:"+bindingKey);e.printStackTrace();}return false;}//删除绑定public boolean bindingDelete(String exchangeName,String queueName){exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if(binding==null){throw new MqException("[VirtualHost] 绑定不存在 queueName:"+queueName+" ,exchangeName:"+exchangeName);}//从内存删除memoryDataCenter.deleteBinding(binding);//从磁盘删除//此处可能绑定没有保存在磁盘上,删除失败,但没有关系,没有影响diskDataManager.deleteBinding(binding.getBindingKey());System.out.println("[VirtualHost] 删除绑定成功 exchangeName:"+exchangeName+" ,queueName:"+queueName +" , bindingKey:"+binding.getBindingKey());return true;}catch(Exception e){System.out.println("[VirtualHost] 删除绑定失败 exchangeName:"+exchangeName+" ,queueName:"+queueName);e.printStackTrace();return false;}}

创建Router类:

实现匹配判断功能:

/*** 该类实现和绑定相关的操作//路由规定://routingKey: 只能由 数字 字母(大小写) 下划线 构成,使用.作为分割//bindingKey:只能包含 数字 字母 下划线 * #,以 . 作为分割,* #只能作为独立的分段*/
public class Router {//判断消息携带的绑定格式是否正确public boolean checkRoutingKey(String routingKey){char[] ch = routingKey.toCharArray();for(char i:ch){if(i>='a' && i<='z') continue;if(i>='A' && i<='Z') continue;if(i>='0' && i<='9') continue;if(i=='.' || i=='_') continue;else return false;}return true;}//判断绑定格式是否正确public boolean checkBindingKey(String bindingKey){char[] ch = bindingKey.toCharArray();for(char c:ch){if(c>='A' && c<='Z') continue;if(c>='a' && c<='z') continue;if(c>='0' && c<='9') continue;if(c=='_' || c=='*' || c=='#' || c=='.') continue;else return false;}//规定不能让* #相连,即出现以下情况规定不成立:// *.#   #.*  #.#//以 . 对字符串进行分隔,判断String[] s = bindingKey.split("\\.");for(int i=0;i<s.length-1;i++){if (s[i].equals("*") && s[i+1].equals("#") ||s[i].equals("#") && s[i+1].equals("*") ||s[i].equals("#") && s[i+1].equals("#")) {return false;}}return true;}//判断bindingKey与routingKey是否匹配成功public boolean isRouting(ExchangeType type,String routingKey,String bindingKey) throws MqException {//判断当前交换机类型:fanout/topicif(type==ExchangeType.FANOUT){//匹配到绑定交换机的所有队列//直接返回即可return true;}else if(type==ExchangeType.TOPIC){//进行routingKey和BindingKey的匹配判断return routingTopic(routingKey,bindingKey);}else{throw new MqException("[Router] 交换机类型有误 type:"+type);}}/*** 规定:rotingKey匹配bindingKey*  *:匹配任意单个字符串*  #:匹配任意个任意字符串* @param routingKey 消息携带的匹配字符串* @param bindingKey 交换机和队列的绑定关系* @return*/private boolean routingTopic(String routingKey, String bindingKey) {String[] b = bindingKey.split("\\.");String[] r = routingKey.split("\\.");int n1 = b.length;int n2 = r.length;int i = 0;int j = 0;while(i<n1 && j<n2){if(b[i].equals("*")){//可以匹配routingKey的任意单个字符//直接向后走就行:i++;j++;}else if(b[i].equals("#")){//匹配routingKey的任意个任意字符if(i==n1-1){//当bindingKey的最后一个字符为#时,可以匹配routingKey后面的所有字符串,直接返回true即可:return true;}else{//当b的#不是最后一个字符时,就找r之后的字符串中是否有b的下一个字符串的下标,当找不到时,就返回-1:i++;j = checkNext(b[i],r,j);//当在r中找不到b的下一个字符串时,一定匹配失败,直接返回if(j==-1) return false;else{i++;j++;}}}else{//b为普通字符串时if(!b[i].equals(r[j])) return false;else {i++;j++;}}}//b / r有一个已经匹配到结尾了,只有两个都完全匹配完,才算匹配成功if(i!=n1 || j!=n2){return false;}return true;}private int checkNext(String next, String[] r, int j) {for(int k=j;k<r.length;k++){if(r[k].equals(next)) return k;}return -1;}}

对Router类的TOPIC匹配进行测试:

@SpringBootTest
public class RouterTopicTest {private Router router = new Router();// [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         true@Testvoid test01() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC, "aaa", "aaa");Assertions.assertTrue(ok);}@Testvoid test02() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bbb", "aaa.bbb");Assertions.assertTrue(ok);}@Testvoid test03() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bbb.ccc", "aaa.bbb");Assertions.assertFalse(ok);}@Testvoid test04() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.ccc", "aaa.bbb");Assertions.assertFalse(ok);}@Testvoid test05() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.bb.cc");Assertions.assertTrue(ok);}@Testvoid test06() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "aaa.*");Assertions.assertTrue(ok);}@Testvoid test07() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.*.bb");Assertions.assertFalse(ok);}@Testvoid test08() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "*.aaa.bb");Assertions.assertFalse(ok);}@Testvoid test09() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "#");Assertions.assertTrue(ok);}@Testvoid test10() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb", "aaa.#");Assertions.assertTrue(ok);}@Testvoid test11() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.#");Assertions.assertTrue(ok);}@Testvoid test12() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test13() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test14() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.aaa.bb.cc", "aaa.#.cc");Assertions.assertTrue(ok);}@Testvoid test15() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"cc", "#.cc");Assertions.assertTrue(ok);}    @Testvoid test16() throws MqException {boolean ok = router.isRouting(ExchangeType.TOPIC,"aaa.bb.cc", "#.cc");Assertions.assertTrue(ok);}

发送消息:

//发送消息到队列public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){exchangeName = virtualHostName + exchangeName;try {//1.判断交换机是否存在Exchange exchange = memoryDataCenter.getExchange(exchangeName);if(exchange==null){throw new MqException("[VirtualHost] 交换机不存在 exchangeName:"+exchangeName);}//2.判断routingKey格式是否正确boolean ok = router.checkRoutingKey(routingKey);if(!ok) {throw new MqException("[VirtualHost] routingKey格式有误 routingKey:"+routingKey);}//3.根据交换机的类型进行路由匹配,分发消息if(exchange.getType()==ExchangeType.DIRECT){//直接交换机,routingKey就是队列名,bindingKey无用,将消息路由到指定的队列上//获取到指定队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue==null){throw new MqException("[VirtualHost] 队列不存在 queueName:"+queueName);}//构造消息对象Message message = new Message();message = message.createMessageById(null,basicProperties,body);//发送消息到队列,再构造一个方法实现sendMessage(queue,message);}else{//当交换机类型为fanout/topic时://遍历交换机所有的绑定ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);for(Binding b:bindings.values()){MSGQueue queue = memoryDataCenter.getQueue(b.getQueueName());//判断交换机绑定的队列是否存在:if(queue==null){System.out.println("[VirtualHost] 队列不存在 queueName:"+b.getQueueName());continue;}//构造消息对象Message message = new Message().createMessageById(routingKey, basicProperties, body);//判断routingKey与binding是否成功if(!router.isRouting(exchange.getType(),message.getRoutingKey(),b.getBindingKey())){//匹配失败:System.out.println("[VirtualHost] routingKey和BindingKey不匹配 routingKey:"+routingKey+" , bindingKey:"+b.getBindingKey());continue;}//匹配成功时,就将消息转发sendMessage(queue,message);System.out.println("[VirtualHost] 消息发送成功 queueName:"+queue.getName()+" ,messageId:"+message.getMessageId());}}return true;}catch (Exception e){System.out.println("[VirtualHost]消息发送失败 ");e.printStackTrace();return false;}}//消费者管理对象:private ConsumerManager consumerManager = new ConsumerManager(this);private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {//存入磁盘//是否持久化//1:持久化 0:非持久化if(message.getDeliveryMode()==1){diskDataManager.sendMessage(queue,message);}//存入内存:memoryDataCenter.sendMessage(queue,message);//消息已经到达队列,通知订阅队列的消费者消费消息consumerManager.notifyConsumer(queue.getName());System.out.println("[VirtualHost] 发送消息成功");}

创建ConsumerManager类:

对消费者进行管理:


/*** 消费者管理类*/
public class ConsumerManager {//持有上层的VirtualHost对象的引用,用来操作数据private VirtualHost virtualHost;//    使⽤⼀个线程池⽤来执⾏消息回调private ExecutorService workerPool = Executors.newFixedThreadPool(4);//存放令牌(队列名)的队列:那个队列当前有消息了,就将队列名加入到阻塞队列中//然后扫描线程通过该队列中存放的队列名找到对应的消息和订阅者,将信息打包放到线程池中进行消费private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();//扫描线程private Thread scannerThread = null;//通知消费者消费消息://调用时机:发送方发送消息成功后,//当队列中有消息了,就将其放到阻塞队列中,然后就要通知消费者消费消息了public void notifyConsumer(String queueName) throws InterruptedException {tokenQueue.put(queueName);}
}

订阅消息:

    /**//订阅消息//添加一个订阅者:* @param consumerTag 消费者身份标识* @param queueName 队列名* @param autoAck 是否自动确认消息* @param consumer 回调函数* @return*/public boolean basicConsume(String consumerTag, String queueName,boolean autoAck, Consumer consumer){queueName = virtualHostName + queueName;try {//通过消费者管理类实现添加消费者功能consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsumer 成功 queueName:"+queueName);return true;}  catch (MqException e) {System.out.println("[VirtualHost] basicConsumer 失败 queueName:"+queueName);e.printStackTrace();return false;}}

创建ConsumerEnv类:

消费者完整环境类:


/*** 表示消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {//消费者唯一标识private String consumerTag;//订阅队列的队列名字private String queueName;//是否自动确认消息private boolean autoAck;//要执行的具体功能,通过一个接口,由调用者自己实现其方法体private Consumer consumer;
}

创建Consumer接口:

实现消费者的回调函数接口:通过lambda表达式,让消费者自己实现对消息的处理

/*** 函数式接口,回调函数,当消费者收到消息后,要处理消息,调用者通过这个接口实现具体的功能*/
@FunctionalInterface
public interface Consumer {//deliver:投递的意思,这个方法在每次服务器收到发送来的消息后,调用//通过这个方法把消息推送给对应的消费者void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:

//此处再添加一个属性:订阅该队列的消费者集合private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//当订阅队列的消费者不止一个时 , 规定以轮训的方式消费消息//再添加一个属性,记录当前轮到哪个消费者消费消息了//这里使用AtomicInteger类来实现,目的是不让手动修改,且要实现自增的功能private AtomicInteger atomicInteger = new AtomicInteger(0);//添加一个新的订阅者(消费者)public void addConsumerEnv(ConsumerEnv consumerEnv){consumerEnvList.add(consumerEnv);}//挑选一个订阅者,消费当前消息,按照轮训的方式public ConsumerEnv chooseConsumerEnv(){if(consumerEnvList.isEmpty()){//当前该队列还没有消费者订阅System.out.println("[MSGQueue] 当前该队列没有订阅者");return null;}//按照轮训的方式获取一个要消费消息的订阅者下标int index = atomicInteger.get()%consumerEnvList.size();//让轮训值 自增atomicInteger.getAndIncrement();return consumerEnvList.get(index);}

在ConsumerManager类中实现添加消费者方法:

 //添加新的消费者,并消费队列中当前存在的消息public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//1.找到对应的队列MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 队列不存在 queueName:"+queueName);}//2.创建一个消费者ConsumerEnv consumerEnv = new ConsumerEnv();consumerEnv.setConsumerTag(consumerTag);consumerEnv.setQueueName(queueName);consumerEnv.setAutoAck(autoAck);consumerEnv.setConsumer(consumer);//3.将订阅者加入到队列的订阅者队列中queue.addConsumerEnv(consumerEnv);//4.当队列中已经有一些消息时,要将其消费掉synchronized (queue){int n = virtualHost.getMemoryDataCenter().getMessageCountFromQueue(queueName);for(int i=0;i<n;i++){//这个方法调用一次就消费一条消息consumerMessage(queue);}}}//  消费消息:调用消息的回调函数,并将消息从队列中删除//从队列中获取一个消息,并让消费者消费,// 当消费者不止一个时,按照轮训的方式让消费者依次消费消息private void consumerMessage(MSGQueue queue) throws MqException {//1.从队列的订阅者中挑选一个订阅者ConsumerEnv consumerEnv = queue.chooseConsumerEnv();if(consumerEnv==null){//当前队列号没有订阅者,无法消费消息System.out.println("[ConsumerManager] 当前队列中还没有订阅者");return;}//2.消费消息Message message = virtualHost.getMemoryDataCenter().pollMessage(queue.getName());if(message==null){//当前队列中还没有消息,不需要消费System.out.println("当前队列中还没有消息");return;}//将消息带到消费者的回调方法中,给线程池执行workerPool.submit(()->{try{//1.在执行回调之前,先将消息放到待确认队列集合中,一旦消息被消费失败了.就重新发送消息virtualHost.getMemoryDataCenter().sendWaitMessage(queue.getName(),message);//2.执行订阅者的回调方法consumerEnv.getConsumer().handlerDeliver(consumerEnv.getConsumerTag(),message.getBasicProperties(),message.getBody());//3.根据消费者的确认消息方式及消费者消费消息的情况,执行删除消息操作//  这里完成为自动确认模式下的操作,手动模式下,在basicAck方法中实现if(consumerEnv.isAutoAck()){//4.删除磁盘中的数据//  是否持久化//  1:非持久化 0:持久化if(message.getDeliveryMode()==0) {virtualHost.getDiskDataManager().deleteMessageFromQueue(queue, message);}//5.删除未确认消息队列中的消息virtualHost.getMemoryDataCenter().deleteWaitMessage(queue.getName(), message.getMessageId());//6.删除消息集合中的消息virtualHost.getMemoryDataCenter().deleteMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费 ");}}catch (Exception e){System.out.println("[ConsumerManager] 消费消息失败");e.printStackTrace();}});}

在ConsumerManager类中,添加扫描线程,不停扫描阻塞令牌队列,查看是否有新的消息到来,需要消费者及时消费:

//先获取到令牌,根据令牌找到指定的队列,从队列中获取消息进行消费public ConsumerManager(VirtualHost parent){virtualHost = parent;//为推的模式.不断的扫描令牌队列,一但有消息进入队列,就将其推送给消费者Thread t = new Thread(()->{while(true){try {//1.获取令牌String queueName = tokenQueue.take();//2.根据令牌,找到指定的队列MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);if(queue==null){throw new MqException("[ConsumerManager] 获取令牌时,发现队列不存在");}synchronized (queue){//3.从队列中获取一个消息并进行消费consumerMessage(queue);}} catch (InterruptedException | MqException e) {throw new RuntimeException(e);}}});//将线程设为后台线程//当前台线程执行结束了,后台线程也就结束了,//若设为前台线程,那么只有当前台线程执行完了,整个进程才会结束,// 这里的循环是while(true)会一直卡着执行结束不了,因此要设成后台线程t.setDaemon(true);//启动线程t.start();}

十三.网络通信协议设计

生产者和消费者都是客户端,需要通过网络和消息队列服务器进行通信.

此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服 务器这边功能的远程调⽤.

设计应用层协议:

使⽤⼆进制的⽅式设定协议.

请求数据格式:

响应数据格式:

其中 type 表⽰请求响应不同的功能. 取值如下:

• 0x1 创建 channel

• 0x2 关闭 channel

• 0x3 创建 exchange

• 0x4 销毁 exchange

• 0x5 创建 queue

• 0x6 销毁 queue

• 0x7 创建 binding

• 0x8 销毁 binding

• 0x9 发送 message

• 0xa 订阅 message

• 0xb 返回 ack

• 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的

对于请求来说,payload是各种请求方法的参数信息

对响应来说,payload是方法的返回数据信息.

创建request类:


/*** 表示一个网络通信中的请求对象*/
@Data
public class Request {/** type 表⽰请求响应不同的功能. 取值如下*  0x1  创建 channel* • 0x2  关闭 channel* • 0x3  创建 exchange* • 0x4  销毁 exchange* • 0x5  创建 queue* • 0x6  销毁 queue* • 0x7  创建 binding* • 0x8  销毁 binding* • 0x9  发送 message* • 0xa  订阅 message* • 0xb  返回 ack* • 0xc  服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的*///请求类型,设定占4字节private int type;//请求的数据长度,占4字节private int length;//请求体 payload 表⽰这次⽅法调⽤的各种参数信息private byte[] payload;
}

创建response类:

/*** 表示一个响应对象*/
@Data
public class Response {//按照自己的定义,响应类型,4字节private int type;//响应的数据长度,4字节private int length;//响应体private byte[] payload;
}

创建参数父类:

//定义参数⽗类
//构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
//不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再
//通过继承的⽅式体现
@Data
public class BasicArgs implements Serializable {//表示一次请求的身份标识,用来和该请求 对应的返回的响应相对照protected String rid;//每一次请求需要建立连接,通过TCP建立连接,一个连接可以发送多次消息,每条消息通过信道传送//一条信道可以发送多条消息//这次通信的信道channel的身份标识protected String channelId;
}

创建响应父类:

/*** 定义payload的返回数据*/
@Data
public class BasicReturns implements Serializable {//一次请求或相应的身份标识protected String rid;//标识一个channelprotected String channelId;//表示方法的执行结果  payload 表⽰这次⽅法调⽤的返回值.protected boolean ok;
}

创建设备功能的参数类:

exchangeDeclareArgs:

/*** 这个类表示调用声明交换机方法的参数*/
@Data
public class ExchangeDeclareArgs extends BasicArgs implements Serializable {private String exchangeName;private ExchangeType type;private boolean isDurable;private boolean autoDelete;private Map<String,Object> args;
}

exchangeDeleteArgs:

@Data
public class ExchangeDeleteArgs extends BasicArgs implements Serializable {private String exchangeName;
}

queueDeclareArgs:

@Data
public class QueueDeclareArgs extends BasicArgs implements Serializable {private String queueName;private boolean isDurable;private boolean autoDelete;private Map<String,Object> args;
}

queueDeleteArgs:

@Data
public class QueueDeleteArgs extends BasicArgs implements Serializable {private String queueName;
}

bindingDeclareArgs:

@Data
public class BindingDeclareArgs extends BasicArgs implements Serializable {private String ExchangeName;private String queueName;private String bindingKey;
}

bindingDeleteArgs:

@Data
public class BindingDeleteArgs extends BasicArgs implements Serializable {private String exchangeName;private String queueName;
}

basicPublishArgs:

@Data
public class BasicPublishArgs extends BasicArgs implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}

basicConsumerArgs:

@Data
public class BasicConsumerArgs extends BasicArgs implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//这个类对应的BasicConsumer方法还有一个参数 consumer,是一个回到参数//消费者客户端收到服务器发送的消息后,针对自己的业务,实现这个回调接口就行了,//无需再将回调参数传给服务器,因此解救不需要在这里写这个参数了//并且,这个 回调参数也无法通过网络传输给服务器
}

basicAckArgs:

/*** 手动响应数据*/
@Data
public class BasicAckArgs extends BasicArgs implements Serializable {private String queueName;private String messageId;
}

subscribeReturns:

/*** 这里类表示返回数据的具体参数* 是服务器给消费者提供的订阅消息* consumerTag其实是channelId.* basicProperties和body共同构成了Message.*/
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}

十四.实现BrokerServer

public class BrokerServer {//调用相关数据private VirtualHost virtualHost = new VirtualHost("default");//服务器⾃⾝的 socketprivate ServerSocket serverSocket = null;//引入线程池,处理多个客户端的请求private ExecutorService executorService = null;//引入一个哈希表,存储所有的会话对象//key: channelId, val:socket对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();//引入一个布尔变量,表示当前服务器是否要停止,//要对所有线程是立即可见的,用volatile修饰private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(9090);}//启动服务public void start() throws IOException {System.out.println("[BrokerServer] 启动服务");executorService = Executors.newCachedThreadPool();try {while (runnable) {//accept:不断接收客户端发来的请求:Socket clientSocket = serverSocket.accept();executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {//正常结束System.out.println("[BrokerServer] 服务器停止运行!");}}//停止服务器public void stop() throws IOException {runnable = false;executorService.shutdown();serverSocket.close();}//处理一个客户端的连接//一个个连接可能有多次的请求和相应//要读取数据,处理数据,然后将结果返回给客户端private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//  这里需要按照特定格式进行读取和解析数据try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求Request request = readRequest(dataInputStream);//2.根据请求计算相应Response response = process(request, clientSocket);//3.将结果返回给客户端writeResponse(dataOutputStream,response);}} catch (EOFException | SocketException e) {//当出现这两种异常时,是正常的异常,是请求读取结束了,读到了空字符串抛出的异常,// 正常结束循环就可以了System.out.println("[BrokerServer] connection 连接关闭 ,客户端地址: " + clientSocket.getInetAddress().toString()+ " : " + clientSocket.getPort());} catch (ClassNotFoundException e) {throw new RuntimeException(e);} catch (MqException e) {throw new RuntimeException(e);}} catch (IOException e) {throw new RuntimeException(e);} finally {//关闭资源try {//当前连接处理完之后,需要关闭SocketclientSocket.close();//把当前socket对应的所有channel也删除了clearCloseSessions(clientSocket);} catch (IOException e) {e.printStackTrace();}//删除sessions中客户端和服务器建立的连接}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArgs BasicArgs = (BasicArgs) BinaryTool.fromBytes(request.getPayload());System.out.println("[Request] rid=" + BasicArgs.getRid() + ", channelId=" + BasicArgs.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(BasicArgs.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + BasicArgs.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(BasicArgs.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + BasicArgs.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArgs 对象了.ExchangeDeclareArgs Args = (ExchangeDeclareArgs) BasicArgs;ok = virtualHost.exchangeDeclare(Args.getExchangeName(), Args.getType(),Args.isDurable(), Args.isAutoDelete(), Args.getArgs());} else if (request.getType() == 0x4) {ExchangeDeleteArgs Args = (ExchangeDeleteArgs) BasicArgs;ok = virtualHost.exchangeDelete(Args.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArgs Args = (QueueDeclareArgs) BasicArgs;ok = virtualHost.queueDeclare(Args.getQueueName(), Args.isDurable(), Args.isAutoDelete(), Args.getArgs());} else if (request.getType() == 0x6) {QueueDeleteArgs Args = (QueueDeleteArgs) BasicArgs;ok = virtualHost.queueDelete((Args.getQueueName()));} else if (request.getType() == 0x7) {BindingDeclareArgs Args = (BindingDeclareArgs) BasicArgs;ok = virtualHost.bindingDeclare(Args.getQueueName(), Args.getExchangeName(), Args.getBindingKey());} else if (request.getType() == 0x8) {BindingDeleteArgs Args = (BindingDeleteArgs) BasicArgs;ok = virtualHost.bindingDelete(Args.getQueueName(), Args.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArgs Args = (BasicPublishArgs) BasicArgs;ok = virtualHost.basicPublish(Args.getExchangeName(), Args.getRoutingKey(),Args.getBasicProperties(), Args.getBody());} else if (request.getType() == 0xa) {BasicConsumerArgs Args = (BasicConsumerArgs) BasicArgs;ok = virtualHost.basicConsume(Args.getConsumerTag(), Args.getQueueName(), Args.isAutoAck(),new Consumer() {//这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端//此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询,//  就可以得到对应的socket 对象了, 从而可以往里面发送数据了@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}// 2. 构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toByte(subScribeReturns);Response response = new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.//    注意! 此处的 dataOutputStream 这个对象不能 close !!!//    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.//    此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArgs Args = (BasicAckArgs) BasicArgs;ok = virtualHost.basicAck(Args.getQueueName(), Args.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(BasicArgs.getChannelId());basicReturns.setRid(BasicArgs.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toByte(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearCloseSessions(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String, Socket> entry : sessions.entrySet()) {if (entry.getValue() == clientSocket) {// 不能在这里直接删除!!!// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);}}

十五.实现客户端

创建ConnectionFactory.

表示用来创建连接的工厂类:

/***连接工厂*/
@Data
public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口号private int port;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}
}

创建Connection类:

一个Connection对应一个TCP,一个连接可以包含多个channel.

public class Connection {private Socket socket = null;private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//创建线程池,用来处理客户端这边执行用户回调的线程池private ExecutorService callbackPool = null;//  创建一个hash.来管理多个channelConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();//这个方法在客户端构造好请求后,调用,用来发送请求到服务器:public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());}
// 和服务器建立连接,接收服务器返回的响应,并处理响应public Connection(String host,int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);
//      创建一个扫描线程,不断的从socket中读取响应,交给对应的channel进行处理Thread t = new Thread(()->{try{while (!socket.isClosed()){Response response = readResponse();//处理响应dispatchResponse(response);}} catch (SocketException e){//连接正常断开System.out.println("[Connection] 连接正常断开");}catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开");e.printStackTrace();}});t.start();}public void close(){try{//关闭Connection ,释放资源callbackPool.shutdownNow();channelMap.clear();outputStream.close();inputStream.close();socket.close();;}catch (IOException e){e.printStackTrace();}}// 读取服务器返回的响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());return response;}// 使用这个方法来分别处理响应, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {// 服务器推送给消费者客户端的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());// 根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}// 执行该 channel 对象内部的回调.callbackPool.submit(() -> {try {channel.getConsumer().handlerDeliver(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());// 把这个结果放到对应的 channel 的 hash 表中.Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());}//获取到响应后,将其放到响应的集合中,让客户端从集合中取走对应的响应.channel.putReturns(basicReturns);}}
// 通过这个方法, 在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.channelMap.put(channelId, channel);// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.boolean ok = channel.createChannel();if (!ok) {// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!// 把刚才已经加入 hash 表的键值对, 再删了.channelMap.remove(channelId);return null;}return channel;}
}

创建Channel类:

用于客户端发送请求调用的相关的API:

@Data
public class Channel {private String channelId;// 当前这个 channel 属于哪个连接.private Connection connection;// 用来存储后续客户端收到的服务器的响应.private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.// 此处约定一个 Channel 中只能有一个回调.private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}/**   type 表⽰请求响应不同的功能. 取值如下*  0x1  创建 channel* • 0x2  关闭 channel* • 0x3  创建 exchange* • 0x4  销毁 exchange* • 0x5  创建 queue* • 0x6  销毁 queue* • 0x7  创建 binding* • 0x8  销毁 binding* • 0x9  发送 message* • 0xa  订阅 message* • 0xb  返回 ack* • 0xc  服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的*/// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArgs 对象BasicArgs basicArgs = new BasicArgs();basicArgs.setChannelId(channelId);basicArgs.setRid(generateRid());byte[] payload = BinaryTool.toByte(basicArgs);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求了.connection.writeRequest(request);// 等待服务器的响应//服务器对根据请求处理并返回响应,对请求的处理时间不确定,// 该步骤可能会发生阻塞BasicReturns basicReturns = waitResult(basicArgs.getRid());return basicReturns.isOk();}// 通过UUID,生成唯一ridprivate String generateRid() {return "R-" + UUID.randomUUID().toString();}private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {// 如果查询结果为 null, 说明包裹还没回来.// 此时就需要阻塞等待.synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}// 读取成功之后, 还需要把这个消息从哈希表中删除掉.basicReturnsMap.remove(rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应.// 把所有的等待的线程都唤醒.notifyAll();}}// 关闭 channel, 给服务器发送一个 type = 0x2 的请求public boolean close() throws IOException {BasicArgs basicArgs = new BasicArgs();basicArgs.setRid(generateRid());basicArgs.setChannelId(channelId);byte[] payload = BinaryTool.toByte(basicArgs);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArgs.getRid());return basicReturns.isOk();}// 创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> Args) throws IOException {ExchangeDeclareArgs exchangeDeclareArgs = new ExchangeDeclareArgs();exchangeDeclareArgs.setRid(generateRid());exchangeDeclareArgs.setChannelId(channelId);exchangeDeclareArgs.setExchangeName(exchangeName);exchangeDeclareArgs.setType(exchangeType);exchangeDeclareArgs.setDurable(durable);exchangeDeclareArgs.setAutoDelete(autoDelete);exchangeDeclareArgs.setArgs(Args);byte[] payload = BinaryTool.toByte(exchangeDeclareArgs);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArgs.getRid());return basicReturns.isOk();}// 删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArgs Args = new ExchangeDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setExchangeName(exchangeName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 创建队列public boolean queueDeclare(String queueName, boolean durable, boolean autoDelete,Map<String, Object> Args) throws IOException {QueueDeclareArgs queueDeclareArgs = new QueueDeclareArgs();queueDeclareArgs.setRid(generateRid());queueDeclareArgs.setChannelId(channelId);queueDeclareArgs.setQueueName(queueName);queueDeclareArgs.setDurable(durable);queueDeclareArgs.setAutoDelete(autoDelete);queueDeclareArgs.setArgs(Args);byte[] payload = BinaryTool.toByte(queueDeclareArgs);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArgs.getRid());return basicReturns.isOk();}// 删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArgs Args = new QueueDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {BindingDeclareArgs Args = new BindingDeclareArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setExchangeName(exchangeName);Args.setBindingKey(bindingKey);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {BindingDeleteArgs Args = new BindingDeleteArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setExchangeName(exchangeName);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArgs Args = new BasicPublishArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setExchangeName(exchangeName);Args.setRoutingKey(routingKey);Args.setBasicProperties(basicProperties);Args.setBody(body);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {// 先设置回调.if (this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");}this.consumer = consumer;BasicConsumerArgs Args = new BasicConsumerArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.Args.setQueueName(queueName);Args.setAutoAck(autoAck);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}// 确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArgs Args = new BasicAckArgs();Args.setRid(generateRid());Args.setChannelId(channelId);Args.setQueueName(queueName);Args.setMessageId(messageId);byte[] payload = BinaryTool.toByte(Args);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(Args.getRid());return basicReturns.isOk();}
}

客户端代码测试:


@SpringBootTest
public class MqClientTest {private BrokerServer brokerServer = null;private ConnectionFactory factory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器Mq02Application.context = SpringApplication.run(Mq02Application.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();// 2. 配置 ConnectionFactoryfactory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {// 停止服务器brokerServer.stop();// t.join();Mq02Application.context.close();// 删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);factory = null;}@Testpublic void testConnection() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);// 此处稳妥起见, 把改关闭的要进行关闭.channel.close();connection.close();}@Testpublic void testQueue() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue", true, false,  null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testBinding() throws IOException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true,  false, null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection = factory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue", true, false, null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);Assertions.assertTrue(ok);ok = channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);Assertions.assertArrayEquals(requestBody, body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
}

完成

成果测试:

启动消息队列服务器:

//启动服务器:BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();

创建生产者 发送消息:

/*** 模拟生产者*/
public class producer {public static void main(String[] args) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();System.out.println("启动生产者");factory.setHost("127.0.0.1");factory.setPort(9090);//创建连接Connection connection = factory.newConnection();//创建channelChannel channel = connection.createChannel();//创建交换机 队列 绑定channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("queue",true,false,null);//发送消息boolean ok = channel.basicPublish("exchange", "queue",null,"hello".getBytes());System.out.println("消息发送成功: ok:"+ok);Thread.sleep(1000);//关闭资源channel.close();connection.createChannel();}
}

创建消费者消费消息:

/*** 模拟消费者*/
public class consumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();System.out.println("消费者启动");factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("queue",true,false,null);//接收消息boolean ok = channel.basicConsume("queue", true, new org.rabbitmq.mq02.common.Consumer() {@Overridepublic void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("处理消息开始");System.out.println("consumerTag:"+consumerTag);System.out.println("basicProperties:"+basicProperties);System.out.println("body:"+body.toString());System.out.println("处理消息结束");}});System.out.println("消费一条消息成功 ok:"+ok);// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}

完结.

项目源码:

Admin/模拟实现消息队列 - Gitee.com

http://www.dtcms.com/a/360925.html

相关文章:

  • Axure科技感可视化原型案例:赋能设计与研发的宝藏资源
  • 二、感知机
  • 你的Redis是不是在家能用,到了学校就连不上?
  • CPTS-Vintage 票据,基于资源的约束委派 (RBCD),DPAPI密钥
  • 搭建APP应用程序如何选择服务器
  • ‌NAT穿透技术原理:P2P通信中的打洞机制解析‌
  • 【机器学习入门】4.4 聚类的应用——从西瓜分类到防控,看无监督学习如何落地
  • Windows11安装WSL教程
  • HBase实战(一)
  • golang json v1 和 v2对比差异
  • 【重学MySQL】九十六、MySQL SQL Mode高效配置全攻略
  • Beego: Go Web Framework 详细指南
  • ⚡ Linux xargs 命令参数详解
  • 【数据可视化-103】蜜雪冰城门店分布大揭秘:2025年8月数据分析及可视化
  • Ubuntu 25.10 Snapshot4 发布。
  • 小迪Web自用笔记23
  • Linux 定时任务 crontab 完全指南 —— 让服务器自动干活,解放双手
  • 【XR技术概念科普】详解6DoF:为什么它是沉浸感的关键?
  • 【开题答辩全过程】以 健身爱好者饮食管理小程序为例,包含答辩的问题和答案
  • 餐饮门店的小程序怎么做?如何开发餐饮店下单小程序?
  • Rinetd解决服务器IP端口的转发
  • Adobe Illustrator 2025最新破解教程下载安装教程,Illustrator2025最新版下载
  • Adobe Photoshop 2025 最新下载安装教程,附PS2025下载
  • 自由学习记录(91)
  • 从零开始的python学习——函数(1)
  • stdexcept介绍与使用指南
  • 13 选 list 还是 vector?C++ STL list 扩容 / 迭代器失效问题 + 模拟实现,对比后再做选择
  • 基于 HTML、CSS 和 JavaScript 的智能图像边缘检测系统
  • 【数据分享】上市公司-信息透明度综合指数数据(2003-2023)
  • Neurokit———开源多模态电生理数据处理Python包