MQ 项目(实习项目,初步更新)
一、什么是消息队列
在认识消息队列之前,需要先知道 阻塞队列 是什么。
阻塞队列:是一种特殊类型的队列,它在队列为空时,从队列中获取元素的操作将会被阻塞:当队列满时,往队列中添加元素的操作将会被阻塞。同时,尝试往已满的阻塞队列中添加新的元素或从空队列中获取元素的线程同样也会被阻塞,直到其他线程从队列中移除一个或者多个元素,或者清空队列后,使队列重新变得空闲起来。这种特性使得阻塞队列在多线程环境下能够有效地协调和同步线程操作
而所谓的 消息队列,就是把阻塞队列这样的数据结构,单独提取成了一个程序,独立进行部署。
他们都可以用来实现 生产者消费者 模型,不同之处在于:
通过阻塞队列实现的生产者消费者模型,只是在一个进程内部进行的;而通过消息队列实现的生产者消费者模型,可以在 进程与进程之间或者是 服务与服务之间;而不仅仅局限于一个进程内部。
生产者消费者模型的作用
- 解耦合(写代码要求高内聚,低耦合)
- 比如,本来有一个分布式系统,A服务器调用B服务器。(A 给 B 发送请求,B 给 A 返回响应),这个过程,A 和 B 之间的 耦合是比较大的
- 引入消息队列之后,A 把请求发给消息队列, B 再从消息队列中获取请求~
- 削峰填谷
比如 A 是入口服务器,,A再调用B完成一些具体的事务。
如果 A 和 B 直接通信,此时,若 A 突然收到一组用户请求的峰值,此时 B 也会随之感受到峰值~~那么 B 可能会扛不住(每个物理上的服务器,硬件资源(比如:CPU、内存、硬盘、网络宽带等) 都是有上限的)
引入消息队列之后,A 把请求发给队列,B从队列中取请求…
虽然 A 收到的请求很多,队列收到的请求也不少,但是 B 可以仍然按照原有的节奏来取请求,不至于说一下就收到太多的并发量~~
市面上一些知名的消息队列
- RabbitMQ
- Kafka
- RocketMQ
- ActiveMQ
二、需求分析
2.1 核心概念
消息队列里,主要涉及到这几个名词:
- 生产者(Producer):生产消息
- 消费者(Consumer):消费消息
- 中间人(Broker):生产者和消费者之间的桥梁,生产者和消费者通过中间人进行交流
- 发布(Push) :生产者向中间人这里投递消息的过程
- 订阅(Subscribe):哪些消费者要从中间人取数据,这个注册的过程,称为 “订阅”
- 消费 (Consume) :消费者从中间人这里取数据的动作
这里要注意区分订阅和消费这两个概念
消费:强调的是我取走的这个动作
订阅:强调的是我告诉你 我要取走的这个过程
比如:高中的时候,会有一些学习报,有一些同学会订购这个报纸,把钱交给代理人,代理人记录下来,告诉你,每月几号来他那领取,这个过程,就是订阅,相当于我们平时说的预定,而到约定的日期了,我去领的这个动作,成为消费
生产者和消费者之间的关系可以是 一对一、一对多或者多对多
比如:一个生产者,一个消费者
多个生产者,多个消费者
其中,Broker 是最核心的地方。负责消息的存储和转发
2.2 Broker
刚刚我们也提到了,Broker是最核心的地方,而Broker Server 内部也涉及到一些关键概念~~
-
虚拟主机(Virtual Host)
虚拟主机类似于 MySQL中的 database,算是一个 “逻辑” 上的数据集合。
-
一个 Broker server 上也可以组织多种不同类别数据,就可以使用 Virtual Host 做出逻辑上的区分。
-
实际开发中,一个 Broker server 也可能同时用来管理多个 业务线(一个单独的模块) 上的数据,就可以使用 Virtual Host 做出逻辑上的区分。
-
-
交换机(Exchange)
生产者把消息投递给 Broker Server,实际上是先把消息交给了 Broker Server 上的某个交换机,再由交换机 把消息转发给对应的队列。
-
队列(Queue)
队列是真正用来存储处理消息的实体 .后续消费者从对应的队列中取数据。
一个大的消息队列中,可以有很多个具体的小的队列
举个例子,比如取快递,有菜鸟驿站。我们的快递肯定是快递员先送到驿站,然后我们再去驿站拿。这个过程,交换机就相当于是快递员,而驿站就相当于是队列
-
绑定(Binding)
把交换机和队列之间,建立起关联关系~~
可以把 交换机 和 队列 视为是 类似于 数据库 中的 “多对多” 这样的的关系~~
- 一个交换机,可以对应到对个队列。也就是说一个交换机,可以把消息分发给多个队列。
- 一个队列,也可被多个交换机对应。相当于是 这个队列,可以收到来自不同交换机的数据,
在数据库中,表示多对多的关系,会使用一个中间表/关联表~~
可以想象,在 mq 中,也有一个这样的中间表的。所谓的 “绑定” 其实就是中间表的一项。
-
消息(Message)
具体来说,可以认为是服务器 A 发给服务器 B 的请求(通过 MQ 转发),就是一个消息。
服务器 B 给 A 返回的响应(通过 MQ 转发),也是一个消息~~
一个消息,可以认为是一个字符串(二进制数据)
消息中具体包含啥样的数据,都是程序员自己定义的~~
下面用一张图来表示他们之间的关系
这样就可以清晰的看到,一个 Broker Server 中可以有多个虚拟主机,一个虚拟主机中,也可以有多个交换机和队列以及绑定。
上述这些概念,都是 RabbitMQ 按照 AMQP 协议来组织的,不是凭空捏造的。
2.3 核心 API
消息队列服务器(Broker Server),要提供的核心 API 有下面几个
-
创建队列(queueDeclare)
- 此处不使用 Create 这样的术语,而是使用 Declare。原因是 Create 就只是单纯的 “创建”;而 Declare 起到的效果是,不存在则创建,存在就啥也不做了~~
-
销毁队列(queueDelete)
-
创建交换机(exchangeDeclare)
-
销毁交换机(exchageDelete)
-
创建绑定(queueBind)
-
解除绑定(queueUnbind)
-
发布消息(basicPublish)
-
订阅消息(basicConsume)
-
确认消息(basicAck)
- 这个 API 起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提高整个系统的可靠性,保证消息处理没有遗漏
- 另外 tcp 里也有一个ack,tcp 中的确认应答(ack),是已读,“已读未回”,虽然收到了消息,但是并不准备回复(只有已读的这个动作),mq 是 已处理,“已读已回”,对方收到消息了,并且给你回复了一个处理信息
- 对于 RabbitMQ 来说,除了提供肯定的确认,还提供了否定的确认~~这里 没有实现否认确定
【注意】
对于 MQ 和消费者之间,工作模式,有两种:
- Push(推) Broker把收到的数据,主动地发送给订阅的消费者。RabbitMQ只支持 推 的方式~~
- Pull(拉)消费者主动调用 Broker 的 api 取数据~~ kafka就能支持拉
另外,上述 API 也不是凭空捏造的,这些 API 的名称以及用法,都是参考了 RabbitMQ 的
2.4 交换机类型
消息队列的交换机在转发消息的时候,有一套转发的规则的~~
他们提供了几种不同的 交换机类型(ExchangeType)来描述不同的转发规则~~
RabbitMQ 主要实现了 四种 交换机类型(也是 AMQP 协议定义的)
- Direct 直接交换机。
- Fanout 扇出交换机
- Topic 主题交换机
- Header 消息头交换机
这里,我们只实现前三种,第四种规则复杂,应用场景比较少。
下面分别来看一下前三种交换机:
Direct 直接交换机
生产者发送消息的时候,会指定一个 “目标队列“ 的名字,交换机收到之后,就看看绑定的队列里,有没有匹配的队列;如果有,转发过去(把消息塞进对应的队列里),如果没有,消息直接丢弃~~
Fanout 扇出交换机
Fanout 交换机会把消息转发给所有与他绑定的队列。
比如:要转发的消息是 “hello”,这个交换机绑定着三个队列
Topic 主题交换机
主题交换机里有两个关键概念:
1)bindingKey,把队列和交换机绑定的时候,指定一个单词(像是个暗号一样)
2)routingKey,生产者发送消息的时候,也指定一个单词
如果当前 routingKey 和 bindingKey 能够对上暗号了,此时就可以把这个消息转发给对应的队列中了
eg:
上述三种交换机类型,就像给 qq 群发红包一样~~
-
专属红包. 我发的时候,必须指定某个人能领 =>直接交换机.
-
我发 10 块钱红包,然后同时我开始做法~~ 群里的每个群友都能领到 10 块钱 =>扇出交换机
-
画图红包~~ 我还是发 10 块红包,同时出个题: 必须要画一个桌子,画的好,画的像,才能领 =>主题交换机 (这里,每个红包的大小还是10块)
2.5 持久化
上述提到的 虚拟机、交换机、队列、绑定、消息,都需要让 BrokerServer 组织管理~~
这些概念对应的数据,需要存储和管理起来。此时内存和硬盘都会各自存储一份,内存为主,硬盘为辅。
在内存中存储的原因:
对于 MQ 来说,能够高效的转发处理数据,是非常关键的指标! 因此对于使用内存来组织数据,得到的效率,就比放硬盘要高很多!!
在硬盘中存储原因:
为了防止内存中数据随着进程重启 / 主机重启而丢失。
2.6 网络通信
其他的服务器(生产者/消费者)通过网络,是要和咱们的 Broker Server 进行交互。
此处设定,使用 TCP + 自定义的应用层协议 实现生产者/消费者 和 BrokerServer 之间的交互工作~~
这里 自定义的应用层协议,它的主要工作,就是让客户端可以通过网络,调用 broker server 提供的编程接口~~
接口(刚刚提到的核心 API)如下:
因此,在客户端这边,也需要提供对应的上述的这些方法。只不过服务器版本的上述方法,效果是真正干实事,把管理数据进行调整。客户端这边的上述方法,则只是发送请求/接收响应~~
此处, 客户端调用了一个本地的方法,结果这个方法在背后,给服务器发了一系列消息,由服务器完成了一系列工作。站在调用者的角度,看到的只是说,当前的这个功能已经完成,并不知道这背后的细节.
虽然调用的是一个本地的方法,实际上就好像调用了一个远端服务器的方法一样~~ 这被称为 远程过程调用 (RPC)
这个东西可以视为 编写 客户端服务器程序 通信过程 的一种设计思想
客户端除了提供上述这9个和服务器这边对应的方法之外,还需要在提供四个方法,支持其他工作~~
-
创建 Connection
-
关闭 Connection
-
创建 Channel
-
关闭 Channel
一个 Connection 里对象,就代表一个 TCP 连接。 Channel,直译为 管道/通道。
一个 Connection 里面可以包含多个 Channel。每个Channel 上面传输的数据都是互不相干的~~
Channel 只是一个逻辑上的概念。因为 TCP 连接的建立和断开,要消耗很多的资源,因此引入 channel,每个 channel 相当于一个传输特定数据类型的小连接,当我们暂时不用的时候,就可以断开这个 Channel,用的时候在新建一个 Channel,这样比操作 TCP 更轻量,花销更小。
举个例子:
他们之间的关系就像 吊瓶 (生病打手上的那种针)~~
比如,打吊瓶,要打三种药,不肯能扎三次针,只扎一次针,换药瓶即可。
此处:针就是 Connection,药瓶就是 Channel。看上去一直是那根管,实际上里面流的药不同,
2.7 消息应答
被消费的消息,需要进行应答
应答模式分为两种:
- 自动应答:消费者只要消费了消息,就算应答完毕了。Broker 直接删除这个消息。
- 手动应答:消费者手动调用应答接口,Broker 收到应答请求之后,才真正删除这个消息。
⼿动应答的⽬的,是为了保证消息确实被消费者处理成功了.在⼀些对于数据可靠性要求⾼的场景,⽐ 较常⻅.
2.8 总结
上面说了这么多,那我们究竟是要做什么呢,概括一下,就是下面四点:
- 需要实现 生产者,broker server,消费者 这三个部分。
- 针对生产者和消费者来说,主要编写的是客户端和服务器的网络通信部分。给客户端提供一组 api,让客户端的业务代码来调用,从而通过网络通信的方式,远程调用 broker server 上的方法。
- 【重点】 实现 broker server 以及 broekr server 内部的一些基本概念和核心 API
-
持久化
上述的这些关键数据,在硬盘中怎么存储,啥格式存储,存储在数据库中还是文件中?
后续服务器重启了,如何读取上述数据,把内存中的内容回复过来?
这些都是我们要考虑的问题
上述这些工作的最终目标,就是实现一个 “分布式系统下” 这样的生产者消费者模型~~
但是在当前情况下,咱们的 broker server 并不支持分布式系统(集群功能),只有一个单机的 broker server,能够给多个生产者消费者提供服务~~
但是,人家专业的 mq,比如 RabbitMQ、kafka 等,这些都是支持集群的(集群)
三、模块划分
四、项目创建
这里创建SpringBoot项⽬.
使⽤SpringBoot2.7.17,Java8.
依赖引⼊SpringWeb和MyBatis(2.3.1).
五、创建核心类
先根据 模块划分;创建三个包
创建服务器下的核心概念
5.1 创建Exchange
Exchange 的整体结构如下
public class Exchange {private String name;private ExchangeType type = ExchangeType.DIRECT;private boolean durable = false;private boolean autoDelete = false;private Map<String, Object> arguments = new HashMap<>();// 省略 getter setter
}
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type) {this.type = type;}public int getType() {return this.type;}
}
- name :交换机的名字.相当于交换机的⾝份标识.
- type :交换机的类型.三种取值,DIRECT,FANOUT,TOPIC.
- durable :交换机是否要持久化存储.true为持久化,false不持久化.
- autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
- arguments :交换机的其他参数属性.预留字段,暂时未使⽤.
RabbitMQ中的交换机,⽀持autoDelete 和arguments ,咱们此处为了简单,暂时没有实现对 应功能,只是预留了字段,可以尝试⾃⼰完成.
5.2 创建MSGQueue
public class MSGQueue {private String name;private boolean durable;private boolean exclusive; private boolean autoDelete;private Map<String, Object> arguments = new HashMap<>();// 省略 getter setter
}
类名叫做MSGQueue,⽽不是Queue,是为了防⽌和标准库中的Queue混淆.
- name :队列的名字.相当于队列的⾝份标识.
- durable :交换机是否要持久化存储.true为持久化,false不持久化.
- exclusive :独占(排他),队列只能被⼀个消费者使⽤.
- autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
- arguments :交换机的其他参数属性.预留字段,暂时未使⽤.
5.3 创建Binding
package com.example.mq2.mqserver.core;/*** 这个类表示绑定* 交换机和队列进行绑定*/
public class Binding {private String exchangeName;private String queueName;// bindingKey 就是在出题,要求领红包的人画个 “桌子” 出来~~private String bindingKey;// Binding 这个东西,依附于 Exchange 和 Queue 的!!!// 比如,对于持久化来说,如果 Exchange 和 Queue 任何一个没有持久化,// 此时你针对 Binding 持久化是没有任何意义的// 省略getter和setter
}
- exchangeName 交换机名字。
- queueName 队列名字
- bindingKey 只在交换机类型为TOPIC 时才有效.⽤于和消息中的routingKey 进⾏匹配。
5.4 创建Message
此处的
Message
,是需要能够在网络上传输,并且也需要能写入到文件中。
此时就需要针对 Message 进行序列化和反序列化 操作.
一个类只有实现了Serializable
接口,它的对象才能被序列化
package com.example.mq2.mqserver.core;import java.util.UUID;/*** 表示一个要传递的消息* 注意!!! 此处的 Message 对象,是需要能够在网络上传输,并且也需要能写入到文件中。* 此时就需要针对 Message 进行序列化和反序列化 操作*/
public class Message implements Serializable {// 消息的基本属性// 这里new一下,防止后面快速获取时,出现空指针异常private BasicProperties basicProperties = new BasicProperties();// 消息的内容private byte[] body;/*** 下面的属性则是辅助用的属性* Message 后续会存储到文件中(如果持久化的话)* 一个文件中会存储很多的消息。如何找到某个消息,在文件中的具体位置呢?* 使用下列的两个偏移量来进行表示。[offsetBeg,offsetEnd)*/// 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入到文件后,所在的位置就固定了,不需要单独存储// 这俩属性存在的目的,主要就是为了让内存中的 Message 对象,能够快速找到对应的硬盘上的 Message 的位置private transient long offsetBegin = 0; // 消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置便宜(字节)// 使用这个属性表示消息在文件中是否是有效消息。(逻辑删除)// 0x1 表示有效,0x0 表示无效private byte isValid = 0x1;/*** 创建一个工厂方法,让工厂方法帮我们封装一下创建 Message 对象的过程* 这方个法中创建 Message 对象,会自动生成唯一的 MessageId* 万一 routingKey 和 basicMessageKey 里的 routingKey 冲突,以外面的为主*/public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message = new Message();if (basicProperties != null) {message.setBasicProperties(basicProperties);}message.setMessageId("M-" + UUID.randomUUID());message.setRoutingKey(routingKey);message.body = body;// 此处是把 body 和 basicProperties 先设置出来,他俩是 Message 的核心内容。// 而 offsetBeg,offsetEnd,isValid,则是消息持久化的时候才会用到,在把消息写入文件之前再进行设定// 此处只是在内存中创建一个 Message 对象return message;}// 因为消息的属性都在 basicProperties 中,这里写几个方法,方便直接获取到public String getMessageId() {return basicProperties.getMessageId();}public void setMessageId(String messageId) {basicProperties.setMessageId(messageId);}public String getRoutingKey() {return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey) {basicProperties.setRoutingKey(routingKey);}public int getDeliverMode() {return basicProperties.getDeliveryMode();}public void setDeliverMode(int deliverMode) {basicProperties.setDeliveryMode(deliverMode);}public BasicProperties getBasicProperties() {return basicProperties;}public void setBasicProperties(BasicProperties basicProperties) {this.basicProperties = basicProperties;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public long getOffsetBegin() {return offsetBegin;}public void setOffsetBegin(long offsetBegin) {this.offsetBegin = offsetBegin;}public long getOffsetEnd() {return offsetEnd;}public void setOffsetEnd(long offsetEnd) {this.offsetEnd = offsetEnd;}public byte getIsValid() {return isValid;}public void setIsValid(byte isValid) {this.isValid = isValid;}
}
package com.example.mq2.mqserver.core;import java.io.Serializable;public class BasicProperties implements Serializable {// 消息的唯一身份标识。此处为了保证 id 的唯一性,使用 UUID 来作为 message idprivate String messageId;// 是一个消息上带有的内容,和 bindingKey 做匹配。// 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名// 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)// 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配。符合要求的才能转发给对应队列private String routingKey;// 这个属性表示是否要持久化。1 表示不持久化,2 表示持久化. (RabbitMQ 就是这么实现的)private int deliveryMode = 1;// 其实对于 RabbitMQ 来说,BasicProperties 里面还有很多别的属性。其他的属性暂时先不考虑了public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getRoutingKey() {return routingKey;}public void setRoutingKey(String routingKey) {this.routingKey = routingKey;}public int getDeliveryMode() {return deliveryMode;}public void setDeliveryMode(int deliveryMode) {this.deliveryMode = deliveryMode;}
}
六、数据库操作
6.1 SQLite 介绍
这里,数据库使用的是 SQLite。SQLite是一个轻量级的关系型数据库,运算速度快,占用资源少。
这里,咱们的MQ需要搭建 mqserver,我们知名的MySQL,是一个客户端服务器结构的程序,本身就比较重量;如果在给他配一个MySQL数据库,就会很麻烦,而且让整个环境变得复杂起来。因此这里使用 SQLite。
一个完整的 SQLite 数据库,只有一个单独的可执行文件 (不到1M);SQLite只是一个本地的数据库,这个数据库相当于是直接操纵本地的硬盘文件,不涉及到网络层面,更轻量,更便捷。
比如,MySQL还需要create/drop database等,而SQLite不需要,你创建一个数据库文件,就是一个数据库。
SQLite也是知名的服务器,应用也非常广泛,在一些性能不高的设备上,是首选的数据库,尤其是移动端和嵌入式设备(空调、冰箱、洗衣机等等)。Android 系统,就是内置的 SQLite.
6.2 Java中配置 SQLite
在Java中要想使用 SQLite,都不需要额外安装。直接使用maven,把 SQLite 的依赖给引入进来,就可以了!!!
在中央仓库中,找到并引入依赖:
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version>
</dependency>
配置yml:
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:mapper-locations: classpath:mapper/**Mapper.xml
url: jdbc:sqlite:./data/meta.db
,会在当前项目所在的路径下,创建一个 meta 的文件夹,与src同级
-
对于 SQLite来说,并不需要指定用户名和密码
- MySQL是一个客户端服务器结构的程序。一个数据库服务器,就会对应多个客户端来访问它。
- SQLite 则不是客户端服务器结构的程序,就只有自己一个人能访问(把数据放在本地文件上,和网络无关,就只有本地主机才能访问)
-
SQLite 虽然和 MySQL不太一样,但是都可以通过 MyBatis 这样的框架来使用
6.3 建库建表
以往我们使用 MySQL的时候,都是提前写好相应的建库建表代码,需要使用的时候,直接放到MySQL中执行;整个操作都是在部署阶段完成的。
这里使用 SQLite,希望能够自动完成上述操作,因此,就把相关代码,也向插入删除等语句一样,直接写到xml里。对于数据库,SQLlite 启动时,自动的创建;而表则是用到的时候,执行相应的代码
创建表
先在接口中声明
在 xml 中实现
这里使用 update 标签来完成
6.4 转化 arguments
在Java中,arguments 是一个 HashMap 表,而数据库中,没有这个类型。因此,就需要转化一下格式。
如何实现把 arguments 这个键值对, 和数据库中的字符串类型相互转换呢?
关键要点, 在于, MyBatis 在完成数据库操作的时候,会自动的调用到对象的 getter 和 setter.
- 比如 MyBatis 往数据库中写数据,就会调用对象的 getter 方法,拿到属性的值,再往数据库中写。如果这个过程中,让 getArguments 得到的结果是 String 类型的;此时,就可以直接把这个数据写到数据库了。
- 比如 MyBatis 从数据库读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中。如果这个过程中,让 setArguments,参数是一个 String,并且在 setArguments 内部针对字符串解析,解析成一个 Map 对象
更新 getter 和 setter
- Exchange 类 和 MSGQueue 类都要更改
// 更改getter 和 setter,与json格式转化public String getArguments() {// 是把当前的 arguments 参数,从 Map 转成 String(JSON)ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}// 如果代码真的异常了,返回一个空的 json 字符串就 okreturn "{}";}// 这个方法,是从数据库读取数据之后,构造 Exchange 对象,会自动调用到public void setArguments(String argumentsJson) {// 把参数中的 argumentsJson 按照 JSON 格式解析,转成 上述的 Map 对象ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<Map<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}
6.5 实现数据库的插入和删除
接口
// 对上述三个概念的 插入、查找、删除void insertExchange(Exchange exchange);List<Exchange> selectAllExchanges();void deleteExchange(String exchangeName);void insertQueue(MSGQueue queue);List<MSGQueue> selectAllQueues();void deleteQueue(String queueName);void insertBinding(Binding binding);List<Binding> selectAllBindings();void deleteBinding(Binding binding);
xml
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});</insert><select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">select * from exchange;</select><delete id="deleteExchange" parameterType="java.lang.String">delete from exchange where name = #{exchangeName};</delete><insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});</insert><select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">select *from queue;</select><delete id="deleteQueue" parameterType="java.lang.String">delete from queue where name = #{queueName};</delete><insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});</insert><select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">select * from binding;</select><delete id="deleteBinding" parameterType="com.example.mq.mqserver.core.Binding">delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};</delete>
这里,Mybatis 框架会通过反射机制自动调用 参数 (eg: Exchange)中的getter方法,获取对应属性的值,然后将这些值插入到数据库中;setter方法同理,把返回值构造成相应的对象
对于 交换机, 和 队列 这两个表, 由于使用 name 作为主键, 直接按照 name 进行删除即可~
对于 绑定来说, 此时没有主键,删除操作,其实是针对 exchangeName 和 queueName 两个维度进行筛选,因此直接传入Binding对象
6.6 实现DataBaseManager
文件位置
mqserver.datacenter.DataBaseManager
1)创建DataBaseManager类
通过这个类来封装针对数据库的操作.
6.6.1 初始化
数据库的初始化 = 建库建表 + 插入一些默认数据
我们期望,在咱们得 broker server 启动的时候,做出下列逻辑判定:
-
如果数据库已经存在了,(表啥的都有了),不做任何操作
-
如果数据库不存在, 则创建库,创建表,构造默认数据,
例如,现在把 broker server 部署到一个新的服务器上显然,此时是没有数据库.就需要让 broker server 启动的时候,自动的把对应的数据库创建好~~
但是如果是一个已经部署过的机器,broker server 重启了,就会发现,数据库已经有了,此时不做任何数据库相关操作.
// 针对数据进行初始化
public void init() {if (!checkExists()) {// 数据库不存在,就进行建库建表操作// 先创建一个 data 目录File dataDir = new File("./data/meta.db");dataDir.mkdirs();// 创建数据表createTable();// 插入默认数据createDefaultData();System.out.println("[DataBaseManager] 数据库初始化完成");} else {// 数据库已经存在了,啥都不做即可System.out.println("[DataBaseManager] 数据库已经存在了");}
}
一般谈到初始化,会想到构造方法,但是这里 咱们自己实现一个。
构造方法,一般是用来初始化类的属性~~一般不会涉及到太多的业务逻辑。此处的初始化,带有业务逻辑的,还是单独拎出来,手动来调用比较合适一点。
初始化中涉及到的方法:
private boolean checkExists() {File file = new File("./data/meta.db");if (file.exists()) {return true;}return false;}private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();System.out.println("[DataBaseManager] 创建表完成!");}// 给数据表中,添加默认的数据// 此处主要是添加一个默认的交换机// RabbitMQ 里有一个这样的设定:带有一个 匿名 的交换机,类型是 DIRECTprivate void createDefaultData() {// 构造一个默认的交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println("[DataBaseManger] 创建数据库完成!");}
6.6.2 手动管理 MetaMapper
这里,我们诸多操作都用到了 MetaMapper;这个类是 spring 帮我们管理的(加了注解 @Mapper),但是这里我们又不想让DataBaseManger被自动管理;因此,我们手动把 MetaMapper 构造出来。
更改启动类
@SpringBootApplication
public class Mq2Application {public static ConfigurableApplicationContext context;public static void main(String[] args) {context = SpringApplication.run(Mq2Application.class, args);}}
ConfigurableApplicationContext
是 Spring 应用程序上下文的接口,它代表了整个 Spring 容器。在这里,将
SpringApplication.run()
的返回值赋给了context
变量,这样可以在程序的其他地方使用这个上下文对象来获取 Spring 托管的 bean 等。
修改init
方法
在 init 方法首行,手动获取 bean
public void init() {// 手动获取到 MetaMappermetaMapper = Mq2Application.context.getBean(MetaMapper.class);
}
6.6.3 封装其他数据库操作
// 把其他的数据库的操作,也在这个类中封装一下public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public List<MSGQueue> selectAllQueues() {return metaMapper.selectAllQueues();}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public List<Binding> selectAllBindings() {return metaMapper.selectAllBindings();}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}
6.7 测试 DataBaseManger
现在对我们刚刚写好的DataBaseManger 进行单元测试,确保它目前的正确性
设计单元测试,要求 单元测试 用例和用例之间,是需要相互独立的,不能产生干扰。
比如:
6.7.1 初始化 和 收尾 方法
因此,先写个 初始化 和 收尾 方法,来处理后续的每个测试 (初始化 和 收尾 方法,在每个单元测试方法执行前后都会执行),这样 就能保证,每次执行一个 单元测试方法,都是全新的环境
// 加上这个注解,就会被识别为 单元测试类
@SpringBootTest
public class DataBaseManagerTests {private DataBaseManager dataBaseManager = new DataBaseManager();// 接下来下面这里需要编写多个 方法。每个方法都是一个/一组单元测试用例。// 还需要做一个准备工作。需要写两个方法,分别用于进行 “准备工作” 和 “收尾工作”/*** 使用这个方法,来执行准备工作,每个测试用例执行前,都要调用这个方法* 由于 init 中,需要通过 context 对象拿到 metaMapper 实例* 所以就需要先把 context 对象给搞出来(重新获取 context对象,让每个测试方法都是新的context,从而保证不受前一个测试方法的影响)* <p>* 虽然在应用的正常运行中,通常只需要获取一次应用上下文,并且可以在整个应用生命周期内共享该上下文。* 但在测试中,为了避免测试之间的相互影响,重新获取应用上下文是一个常见的做法*/@BeforeEachpublic void setUp() {Mq2Application.context = SpringApplication.run(Mq2Application.class);dataBaseManager.init();}/*** 使用这个方法,来执行收尾工作,每个用例执行后,都要调用这个方法* 这里要进行的操作就是把数据库给清空~~(把数据库文件,meta.db 直接删了即可)* <p>* MqApplication.context.close()的目的是关闭应用上下文,* 以确保在每个测试方法执行完毕后,释放资源并清理状态。关闭应用上下文会销毁应用上下文中的所有bean,并触发销毁回调(如果有定义的话)* <p>* 此处的 context 对象,持有了 MetaMapper 的实例,MetaMapper 的实例又打开了 meta.db 数据库文件* 如果 meta.db 被别人打开了,此时的删除文件操作是不会成功的 (window 系统的限制,Linux 则没有)* 另一方面,获取 context 操作,会占用 8080 端口,此处的 close 也是释放 8080*/@AfterEachpublic void tearDown() {Mq2Application.context.close();dataBaseManager.deleteDB();}
}
这里,收尾的时候,需要删除上次的数据库,因此,补充一个删除数据库的方法,这个方法一般只在 测试的时候使用
在 DataBaseManager
中加入如下代码
// 删除数据库文件【测试中用到】public void deleteDB() {File file = new File("./data/meta.db");boolean ret = file.delete();if (ret) {System.out.println("[DataBaseManger] 删除数据库文件成功");} else {System.out.println("[DataBaseManger] 删除数据库文件失败");}File dataDir = new File("./data");// 使用 delete 删除目录的时候,需要保证目录是空的ret = dataDir.delete();if (ret) {System.out.println("[DataBaseManger] 删除数据库目录成功");} else {System.out.println("[DataBaseManger] 删除数据库目录失败");}}
6.7.2 测试 初始化
@Testpublic void testInitTable() {// 由于 init 方法, 已经在上面 setUp 中调用过了. 直接在测试用例代码中, 检查当前的数据库状态即可.// 直接从数据库中查询. 看数据是否符合预期.// 查交换机表, 里面应该有一个数据(匿名的 exchange); 查队列表, 没有数据; 查绑定表, 没有数据.List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();List<MSGQueue> queueList = dataBaseManager.selectAllQueues();List<Binding> bindingList = dataBaseManager.selectAllBindings();// 直接打印结果, 通过肉眼来检查结果, 固然也可以. 但是不优雅, 不方便.// 更好的办法是使用断言.// System.out.println(exchangeList.size());// assertEquals 判定结果是不是相等.// 注意这俩参数的顺序. 虽然比较相等, 谁在前谁在后, 无所谓.// 但是 assertEquals 的形参, 第一个形参叫做 expected (预期的), 第二个形参叫做 actual (实际的)Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals("", exchangeList.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());Assertions.assertEquals(0, queueList.size());Assertions.assertEquals(0, bindingList.size());}
出现问题:
更改 初始化方法
加上创建 目录的方法
更改后,测试通过
6.7.3 测试 交换机
测试插入
之前 交换机 的 getter 和 setter 是和数据库进行交互的,在方法内部已经进行了 序列化/反序列化。这里,我们测试的时候,需要根据交换机的属性来进行判断,序列化之后,不能很好的找到键值对,因此,这里再对 Exchange 写getter和setter方法,来帮助我们进行测试。
Exchange 类中新增 getter 和 setter 方法
// 在这里针对 arguments,再提供一组 getter setter,用来去更方便的获取/设置这里的键值对// 这一组在 Java 代码内部使用(比如测试的时候)public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
private Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setAutoDelete(false);exchange.setDurable(true);exchange.setArguments("aaa", 1);exchange.setArguments("bbb", 2);return exchange;
}@Test
public void testInsertExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.Exchange exchange = createTestExchange("testExchange");dataBaseManager.insertExchange(exchange);// 插入完毕之后, 查询结果List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Exchange newExchange = exchangeList.get(1);Assertions.assertEquals("testExchange", newExchange.getName());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(false, newExchange.isAutoDelete());Assertions.assertEquals(true, newExchange.isDurable());Assertions.assertEquals(1, newExchange.getArguments("aaa"));Assertions.assertEquals(2, newExchange.getArguments("bbb"));
}
测试删除
@Test
public void testDeleteExchange() {// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!Exchange exchange = createTestExchange("testExchange");dataBaseManager.insertExchange(exchange);List<Exchange> exchangeList = dataBaseManager. selectAllExchanges();Assertions.assertEquals(2, exchangeList.size());Assertions.assertEquals("testExchange", exchangeList.get(1).getName());// 进行删除操作dataBaseManager.deleteExchange("testExchange");// 再次查询exchangeList = dataBaseManager.selectAllExchanges();Assertions.assertEquals(1, exchangeList.size());Assertions.assertEquals("", exchangeList.get(0).getName());
}
6.7.4 测试 队列
同理,也添加 新的 getter 和 setter 方法
public Object getArguments(String key) {return arguments.get(key);
}public void setArguments(String key, Object value) {arguments.put(key, value);
}
private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);queue.setArguments("aaa", 1);queue.setArguments("bbb", 2);return queue;
}@Test
public void testInsertQueue() {MSGQueue queue = createTestQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSGQueue> queueList = dataBaseManager.selectAllQueues();// 之前没有数据,因此只有一个Assertions.assertEquals(1, queueList.size());MSGQueue newQueue = queueList.get(0);Assertions.assertEquals("testQueue", newQueue.getName());Assertions.assertEquals(true, newQueue.isDurable());Assertions.assertEquals(false, newQueue.isAutoDelete());Assertions.assertEquals(false, newQueue.isExclusive());Assertions.assertEquals(1, newQueue.getArguments("aaa"));Assertions.assertEquals(2, newQueue.getArguments("bbb"));
}@Test
public void testDeleteQueue() {MSGQueue queue = createTestQueue("testQueue");dataBaseManager.insertQueue(queue);List<MSGQueue> queueList = dataBaseManager.selectAllQueues();Assertions.assertEquals(1, queueList.size());// 进行删除dataBaseManager.deleteQueue("testQueue");queueList = dataBaseManager.selectAllQueues();Assertions.assertEquals(0, queueList.size());
}
6.7.5 测试绑定
private Binding createTestBinding(String exchangeName, String queueName) {Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey("testBindingKey");return binding;}@Testpublic void testInsertBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());}@Testpublic void testDeleteBinding() {Binding binding = createTestBinding("testExchange", "testQueue");dataBaseManager.insertBinding(binding);List<Binding> bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(1, bindingList.size());// 删除Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");dataBaseManager.deleteBinding(toDeleteBinding);bindingList = dataBaseManager.selectAllBindings();Assertions.assertEquals(0, bindingList.size());}
七、消息存储设计
7.1 设计思路
消息需要在硬盘上存储.但是并不直接放到数据库中,⽽是直接使⽤⽂件存储.
原因如下:
- 对于消息的操作并不需要复杂的增删改查.
- 对于⽂件的操作效率⽐数据库会⾼很多.
主流MQ的实现(包括RabbitMQ),都是把消息存储在文件中,而不是数据库中.
下面来设计一下,消息具体如何在文件中存储~~
我们知道,消息是依附于队列的。因此,存储的时候,就把 消息 按照 队列 维度展开。
之前,创建数据库的时候,已经有了一个 data
目录,(meta.db 就在这个目录下)
在 data
中,再创建一些子目录,每个队列都对应一个子目录。而子目录的名字就是队列名。
每个队列的子目录下,再分配两个文件,来存储消息。
整个目录结构如下:
下面来具体看一下保存消息的这两个文件
queue_data
queue_data
这个文件 是一个二进制格式的文件。
这里做出如下的约定(自己约定的):
每个文件中,包含若干个消息,每个消息都以二进制的方式存储,每个消息由2个部分构成:
消息的长度 和 消息的二进制数据
消息的长度,它的大小固定是四个字节,它里面存的值就代表第二部分的长度。
一个文件中,有那么多的消息,我们该怎么找到一个消息呢?
这里不要忘记了,之前在 Message 这个类中,我们定义了 两个变量,一个是 offsetBeg,一个是 offsetEnd。我们用 [offsetBeg,offsetEnd) 来表示一个消息。
我们存储消息的时候,Message 对象在内存中存了一份,同时在硬盘中也存一份。而内存中存到那一份消息,记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息,再根据该消息的两个变量值,就能找到硬盘中的消息数据了。
另外,注意到,Message 对象 还有一个属性是 isVaild,他是干什么的呢?
这个属性是用来标识当前 这个消息在文件中是否有效~~
对于 Broker Server 来说,消息是需要新增,也需要删除的。新增和删除, 对于内存中来说,好办(直接使用一些集合类),但是在文件上就麻烦了。新增消息,可以直接把新的消息追加到文件末尾;删除消息,不好搞。
如果要想直接删除中间的一个消息,就需要把他后面的所有消息都往前移动一个单位,类似于 “顺序表” 的删除;这样的操作。效率是非常低的。所以,这种删除的方式是行不通的~~
因此,使用逻辑删除的方式,是比较适合的
isVaild 为 1,表示该消息有效。
isVaild 为 0,表示该消息无效(已经被删除了)
7.2 垃圾回收
随着时间的推移,这个消息文件可能会越来越大~~ 并且,这里可能大部分都是无效的消息针对这种情况, 就需要考虑对当前消息数据文件,进行垃圾回收~~
此处我们采用的 复制算法,针对消息数据文件中的垃圾进行回收~~
具体的操作就是:直接遍历原有的消息数据文件,把所有的有效数据数据重新拷贝一份到新的文件中,新文件名字和原来文件名字相同,再把旧的文件直接删除掉。
【注意】复制算法:比较适用的前提是,当前的空间,有效数据不多,大多数都是无效的数据。
垃圾回收 有了,那么什么时候进行 垃圾回收呢?
这里就要用到刚刚说的 另外一个文件 queu_stat.txt
了,使用这个文件来保存消息的统计信息。
queue_stat.txt
,这个文件,只存一行数据,并且是文本格式,这一行里有两列:
第一列是 queue_data.txt
中总的消息的数目
第二列是queue_data.txt
中有效消息的数目
两者用 \t
分割
比如:2000\t1500, 代表该队列总共有 2000 条消息,其中有效消息为 1500 条。
那这两个数据有什么用呢?
这里,我们就约定,当消息总数超过 2000 条(为了避免 GC 太频繁,比如一共 4 个消息,其中 2 个消息无效了),并且有效消息数目低于总消息数的 50 %,就触发一次垃圾回收
当消息总数超过 2000 条,并且有效消息数目低于总消息数的 50 %
这 两个 数据,是我们自己设定的,你想设置成多少都可以,只要你觉得合理就行了。
如果当一个文件消息数目非常的多,而且都是有效信息,此时会导致整个消息的数据文件非常庞大,后续针对这个文件操作就会非常耗时。假设当前文件已经达到 10 个 G 了,那么此时如果触发一次 GC,整个耗时就会非常高。
对于 RabbitMQ 来说,解决方案是把一个大的文件,拆成若干个小的文件
文件拆分:当某个文件长度达到一定的阈值的时候,就会拆分成两个文件(拆着拆着就成了很多文件)
文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了,就会和相邻的其他文件合并
这样做,可以保证在消息特别多的时候,也能保证性能上的及时响应,
但是这块的逻辑比较复杂,这里就不实现了,只考虑单个文件的情况~~
如果要实现这个机制,大概的思路:
1.需要专门的数据结构,来存储当前队列中有多少个数据文件每个文件大小是多少,消息数目是多少,无效消息是多少
2.设计策略, 什么时候触发文件的拆分,什么时候触发文件的合并
7.3 创建MessageFileManager类
这个类,就是专门用来管理上述信息的
首先定义一个内部类,来表示消息的统计信息
static public class Stat {// 此处直接定义成 public,就不用再搞 getter setter 方法了// 对于这样的简单的类,就直接使用成员,类似于 C 的结构体了public int totalCount; // 总消息数量public int validCount; // 有效消息数量
}
接着,根据上面的设计,表示出消息文件的目录和文件名
// 约定消息文件所在目录和文件名
// 这个方法,用来获取到指定队列对应的消息所在路径
private String getQueueDir(String queueName) {return "./data/" + queueName;
}// 这个方法用来获取该队列的消息数据所在路径
// 注意,二进制文件,使用 txt 作为后缀,不太合适。txt 一般表示文本,此处咱们也就不改了
// 二进制一般多用 .bin / .dat
private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";
}// 这个方法用来获取该队列的消息统计文件路径
private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";
}
下面来编写,统计文件的读写操作
private Stat readStat(String queueName) {// 由于当前的 消息统计文件 是文本文件,可以直接使用 Scanner 来读取文件内容Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();} catch (IOException e) {e.printStackTrace();}return stat;
}private void writeStat(String queueName, Stat stat) {// 使用 PrintWriter 来写文件// OutputStream 打开文件,默认情况下,会直接把原文件清空,此时相当于新的数据覆盖了旧的try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}
}
创建队列对应的文件和目录
// 创建队列相关的文件和目录public void createQueueFiles(String queueName) throws IOException {// 1. 创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 如果不存在,就创建这个目录boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建目录失败!baseDir=" + baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {// 也是不存在,则创建boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建文件失败!queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3. 创建队列统计文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建文件失败!queueStatFile=" + queueStatFile.getAbsolutePath());}}// 给统计文件设定初始值Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}
删除队列的目录和文件
队列也是可以被删除的。当队列删除之后,对应的消息文件啥的,自然也要被随之删除。
// 删除队列的目录和文件// 队列也是可以删除的,当队列删除之后,对应的消息文件啥的,自然也要随之删除public void destroyQueueFiles(String queueName) throws IOException {// 先删除里面的文件,再删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueSataFile = new File(getQueueStatPath(queueName));boolean ok2 = queueSataFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败,都算整体删除失败throw new IOException("删除队列目录和文件失败!baseDir=" + baseDir.getAbsolutePath());}}
检查队列的目录和文件是否都存在
比如后续有生产者给 broker server 生产消息了,这个消息就有可能需要记录到文件上(取决于消息是否要持久化)
// 检查队列的目录和文件是否都存在// 比如后续有生产者给 broker server 生产消息了,这个消息就有可能需要记录到文件上(取决于消息是否要持久化)public boolean checkFilesExists(String queueName) {// 判定队列的数据文件和统计文件是否都存在File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}
7.4 消息序列化
在设计 Message
的时候,我们已经提到过序列化这个事情。
序列化:就是把一个对象 (结构化的数据) 转化成一个字符串/字节数组。
反序列化:把序列化之后的 (字符串/字节数组),在还原成一个对象 (结构化的数据)。
序列化之后,方便存储和传输。
- 存储一般就是存储到文件中,因为文件只能存字符串/二进制数据,不能直接存对象。
- 传输是指通过网络传输。
由于 Message
,里面存储的 body 部分(字节数组),是二进制数据,不太方便利用 JSON 序列化。
JSON 序列化得到的结果是文本数据.无法存储二进制”
因此,这里直接使用二进制的序列化方式,针对 Message
对象进行序列化
针对二进制序列化,也有很多种解决方案~~
- Java 标准库提供了序列化方案,ObjectInputStream 和 ObjectOutputStream
- Hessian 也是一个解决方案
- protobuffer
- thrift
方案 2 ,需要引入第三方库,操作很麻烦,不如使用 标准库
方案 3 和 方案 4,要用额外的文件来描述传输的格式,操作比较麻烦,但是高效。
此处就使用第一个方案,标准库自带的方案这个方案最大的好处,不必引入额外的依赖~~
序列化,其他的类也可能用到,这里就把序列化相关的代码放到公共类里
package com.example.mq.common;import java.io.*;// 下列的逻辑,并不仅仅是 Message,其他的 Java 中的对象,也是可以通过这种的逻辑进行序列化和反序列化的
// 如果要想让这个对象能够序列化或者反序列化,需要让这个类能够实现 Serializable 接口
public class BinaryTool {// 把一个对象序列化成一个数组// 方法设置成 static,方便其他类直接调用public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组,因为并不知道消息有多长,所以先用变成数组// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中,在统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 此处的 writeObject 就会把该对象进行序列化,生成的二进制字节数据,就会写入到 objectOutputStream 中// 而 objectOutputStream 又关联到了 byteArrayOutputStream,最终结果就会写入到 byteArrayOutputStreamobjectOutputStream.writeObject(object);// ObjectOutputStream 最终关联到那个对象,序列化之后的内容,就会到到那个对象里}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来,转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个字符数组,反序列化为一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);){// 此处的 readObject,就是从 data 这个 byte[] 中获取数据进行反序列化object = objectInputStream.readObject();}}return object;}}
7.5 把消息写入文件中(发送消息)
上面的操作,已经把可以把消息序列化了,而下面,就是把消息存储到文件中,也就是相应的队列对应的文件中。
因此,写入消息的时候,需要两个参数,一个是队列名,另外一个就是消息本身。
具体步骤:
-
先判断当前写入队列的文件在不在
-
把 Message 对象进行序列化,转换成二进制的字节数组
-
获取当前队列消息数据文件的长度,用这个长度来计算 offsetBeg 和 offsetEnd
- 设置该消息 offsetBeg = 当前文件长度 + 4
- 设置该消息 offsetEnd = 当前文件长度 + 4 + 当前二进制数组长度
-
把新的 message 数据,写入到文件的末尾处,采用追加方式
- 先写入 4 个字节的消息长度
- 再写入消息本体
-
更新统计文件,并重新写入
// 这个方法用来把一个新的消息,放到队列对应的文件中// queue 表示要把消息写入的队列。message 则是要写的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException("[MessageFileManger] 队列对应的文件不存在!queueName=" + queue.getName());}// 2. 把 Message 对象,进行序列化,转化成二进制的字节数组byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前队列数据文件的长度,用这个来计算出改 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据,写入到队列数据文件的末尾,此时 Message 对象的 offsetBeg,就是当前文件长度 + 4// offsetEnd 就是当前数据文件长度 + 4 + message 自身长度File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度。单位字节message.setOffsetBegin(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 写入消息到数据文件,注意,是追加写入到数据文件末尾try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度,占据四个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
上面的代码中,我们用到了
DataOutputStream
以及它的writeInt()
方法。为什么不直接使用OutPutStream
的Write()
方法呢?
具体原因如下:
OutPutStream
的Write()
方法,虽然 参数也是 一个int
,但是他实际上只会读取其中的一个字节。虽然我们也可以使用位运算,一个一个字节的去读取(如下图)
但是,Java标准库给我们提供了更方便的用法。就是使用DataOutputStream
的writeInt()
方法;writeInt()
就是直接写入一个字节。
但是,这里会有一个问题,就是在多线程的环境下会出问题。
比如:
- 往队列中插入消息的时候
- 线程A计算好消息的
offsetBegin
和offsetEnd
之后,这时突然线程B往 里面插入了一个消息,那么线程A 再去插入消息的时候,之前算好的位置就不正确了
- 线程A计算好消息的
- 更新消息统计数据的时候
validCount
和totalCount
++ 的时候,就会出现问题。
比如 totalCount++ 的时候,线程 A先拿到 totalCount 的值,再进行+1;但是这个过程中,线程B也拿到了 totalCount 的值,并+1,此时,线程 A 再执行 +1。这样导致虽然是两次+1.但是从结果上看,只加了一次。
针对上述问题,我们可以通过加锁来解决。这里,加锁的对象是同一个队列,因为不同队列的文件是没有关联的,只要保证 不同时修改同一个队列的数据文件就可以了。 修改把消息写入文件的代码:
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查一下当前要写入的队列的文件是否存在if (!checkFilesExists(queue.getName())) {// 这里,如果不存在,说明还没创建,直接抛出系统异常不太合适// 自己创建一个异常类,来处理业务内部的逻辑throw new MqException("[MessageFileManger] 队列对应的文件不存在!queueName=" + queue.getName());}// 2. 把 Message 对象序列化,转化成二进制的字节数组byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前队列数据文件的长度,用这个来计算出改 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据,写入到队列数据文件的末尾,此时 Message 对象的 offsetBeg,就是当前文件长度 + 4// offsetEnd 就是当前数据文件长度 + 4 + message 自身长度File queueDataFile = new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到文件的长度。单位字节message.setOffsetBegin(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4. 写入消息到数据文件,注意,是追加写入到数据文件末尾try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {// 接下来要先写当前消息的长度,占据四个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 5. 更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
7.6 删除消息
这里,主要是从文件中删除,也就是删除硬盘上的数据。采用的方式是逻辑删除,就是把 Messgae
对象的 isValid
属性设置成 0x0(16进制)
。
这里操纵文件使用的是 RamdomAccessFile
(他可以做到随机访问,能操纵光标)
它的 构造方法
1、RandomAccessFile(File file, String mode)
2、RandomAccessFile(String name, String mode)
两个构造方法的第一个参数不做介绍,
**mode :**第二个参数是指以什么模式创建读写流,此参数有固定的输入值,必须为"r"/“rw”/“rws”/"rwd"其中一个。
r:以只读方式打开指定文件。如果试图对该RandomAccessFile指定的文件执行写入方法则会抛出IOException
rw:以读取、写入方式打开指定文件。如果该文件不存在,则尝试创建文件
rws:以读取、写入方式打开指定文件。相对于rw模式,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备,默认情形下(rw模式下),是使用buffer的,只有cache满的或者使用RandomAccessFile.close()关闭流的时候儿才真正的写到文件
rwd:与rws类似,只是仅对文件的内容同步更新到磁盘,而不修改文件的元数据
这里用到了三种方法
- read() 读
- write() 写
- seek() 移动光标
删除消息的具体做法 :
- 先把文件中的这一段数据,读出来,还原回 Message 对象
- 把 isValid 改成 0
- 把上述数据重新写回到文件
// 这个是删除消息的方法// 这里的删除是逻辑删除,也就是把硬盘上存储的这个数据里面的那个 isValid 属性,设置成0// 1. 先把文件中的这一段数据,读出来,还原回 Message 对象// 2. 把 isValid 改成 0// 3. 把上述数据重新写回到文件// 此处这个参数中的 message 对象,必须得包含有效的 offsetBeg 和 offsetEnd// RandomAccessFile -- 对消息文件的随机访问public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1. 先从文件中读取对应的 Message 数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBegin())];randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.read(bufferSrc);// 2. 把当前读出来的二进制数据,转化成 Message 对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3. 把 isValid 设置成无效diskMessage.setIsValid((byte) 0x0);// 重新写入文件byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 虽然上面已经 seek 过了,但是上面 seek 完了之后,进行了读操作,这一读,就导致,文件光标往后移动// 移动到下一个消息的位置了。因此要想让接下来的写入,能够刚好写回到之前的位置,就需要重新调整文件光标randomAccessFile.seek(message.getOffsetBegin());randomAccessFile.write(bufferDest);// 通过上述的折腾,对于文件来说,只有一个字节发生了改变而已~~}// 不要忘记,更新统计文件!!把一个消息设置成无效了,那么有效的消息数量就要 -1Stat stat = new Stat();if (stat.totalCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}
7.7 加载消息到内存中
在设计的时候,我们已经说过了。消息,会存储两份,一份是在内存上,方便快速处理;另一份是在硬盘上,用于持久化存储。下面这个方法,就是把硬盘上的有效数据,加载到内存上。
上一个方法中提到了,内存上存储消息,是以集合的形式,这里,所谓的集合,我们采用链表。链表是为了方便后续的删除消息(头删)。
具体实现步骤:
- 首先,这里的参数只是一个 queueName,而不是一个 MSGQueue;因为 这个方法是在程序启动时调用,此时不会出现多个线程竞争的情况,上面方法的 MSGQueue 参数更多是作为 锁对象,方便加锁。
- 这里加载消息,采用 while 循环
- 先读取消息的长度
(这里不要忘记 我们的一个消息是由两部构成,第一个是 固定大小的4字节,代表消息体的长度;第二部分就是 消息体)- 然后根据这个长度,读取消息的内容
- 接着判断读到的消息大小是否匹配,如果不一致,说明文件有问题,格式错乱了
- 如果没有问题,把读到的数据反序列回 Messge 对象
- 判断该消息是否有效,若有效,继续下一步,否则 continue (然消息是无效数据,但是 offset 不要忘记更新)
- 若有效,设置 Message 对象的 offsetBeg 和 offsetEnd,之后更新 文件的 offset
- 最后,把消息添加到链表中
【注意】
那这里,什么时候循环结束呢?
这里,涉及到了一个新的用法,根据异常来判断。
我们在读取文件长度的时候,使用的是DataOutputStream
的readInt()
方法,这个方法,不像之前流,读到文件末尾返回-1
,而是直接抛出EOFException
,因此,我们可以捕获这个异常,代表文件读完了。
// 使用这个方法,从文件中,读取出所有的消息内容,加载到内存中(具体来说是放到一个链表里)// 这个方法,准备在程序启动的时候,进行调用// 这里使用一个 LinkedList,主要目的是为了后续进行头删操作// 这个方法的参数,只是一个 queueName,而不是 MSGQueue 对象,因为这个方法不需要加锁,只使用 queueName 就够了// 由于该方法是在程序启动时调用,此时服务器还不能处理请求呢~~ 不涉及多线程操作文件public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {// 这个变量记录当前文件光标,long currentOffset = 0;// 一个文件中包含了很多消息,此处势必要循环读取while (true) {// 1. 读取当前消息的长度,这里的 readInt 可能会读到文件的末尾(EOF)// readInt 方法,读到文件的末尾,会抛出 EOFException 异常。这一点和之前的很多流并不一样int messageSize = dataInputStream.readInt();// 2. 按照这个长度,读取消息内容byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配,说明文件有问题,格式错乱了!throw new MqException("[MessageFileManager] 文件格式错误!queueName=" + queueName);}// 3. 把这个读到的二进制数据,反序列化回 Message 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4. 判定一下看看这个消息对象,是不是无效对象if (message.getIsValid() != 0x1) {// 无效数据,直接跳过// 虽然消息是无效数据,但是 offset 不要忘记更新currentOffset += (4 + messageSize);continue;}// 5. 是有效数据,则需要把这个 Message 对象加入到链表中。加入之前还需要填写 offsetBeg 和 offsetEnd// 进行计算 offset 的时候,需要知道当前文件光标的位置的。由于当下使用的 DataInputStream 并不方便// 因此就需要手动计算下文件光标message.setOffsetBegin(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);messages.add(message);}} catch (EOFException e) {// 这个 catch 并非真是处理 “异常”,而是处理 “正常”的业务逻辑。文件读到末尾的时候,会被 readInt 抛出该异常// 这个 catch 语句中也不需要做啥特殊的事情,目的是为了让循环结束System.out.println("[MessageFileManger] 恢复 Message 数据完成!");}}return messages;}
7.8 实现消息文件垃圾回收
由于当前会不停的往消息文件中写入新消息,并且删除消息只是逻辑删除,这就可能导致消息文件越来越大,并且里面又包含大量的无效消息,因此,就需要处理掉那些无效的信息。
此处的垃圾回收,使用 复制算法
判定,当文件中消息总数超过 2000,并且有效消息的数目不足 50%,就要触发垃圾回收。就是把文件中所有有效的消息取出来,单独的再写入到一个新的文件中,然后删除旧文件,使用新文件代替。
// 检查当前是否要针对该队列的消息进行GCpublic boolean checkGC(String queueName) {// 判断是否要 GC,是根据消息数和有效消息数。这两个值都是在 消息统计文件 中的Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double) stat.validCount / (double) stat.totalCount < 0.5) {return true;}return false;}// 引入一个新的数据文件,来存放有效信息private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通过这个方法,真正执行消息数据文件的垃圾回收操作// 使用复制算法来完成// 创建一个新的文件,名字就是 queue_data_new.txt// 把之前消息数据文件中的有效信息都读出来,写到新的文件中// 删除旧的文件,再把新的文件改回 queue_data.txt// 同时要记得更新消息统计文件public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的时候,是针对消息数据文件进行大洗牌。在这个过程中,其他线程不能针对该队列的消息文件做任何修改synchronized (queue) {// 由于 gc 操作可能比较耗时,此处统计一下执行消耗的时间。long gcBeg = System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if (queueDataNewFile.exists()) {// 正常情况下,这个文件不应该存在。如果存在,就是意外~~ 说明上次 gc 了一半,程序意外崩溃了。throw new MqException("[MessageFileManger] gc 时发现该队列的 queue_data_new 已经存在!queueName=" + queueDataNewFile.getName());}boolean ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManger] 创建文件失败!queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2. 从旧的文件中,读取出所有的有效消息对象了。(这个逻辑直接调用上述方法即可,不用重新写了)LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());// 3. 把有效消息,写入到新的文件中try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写四个字节的长度dataOutputStream.writeInt(buffer.length);// 再写消息本体dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件,并且把新的文件进行重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManger] 删除旧的数据文件失败!queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 把 queue_data_new.txt => queue_data.txtok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManger] 文件重命名失败!queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5. 更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManger] gc 执行完毕!queueName=" + queue.getName() + ",time= "+ (gcEnd - gcBeg) + "ms");}}
7.9 测试MessageFileManager
7.9.1 准备阶段
MessageFileManager 这个类,主要是为了管理消息相关的数据,而消息事宜是以队列为维度展开的,因此准备工作就是先创建好测试要用的队列,在后续每个方法的开始前都会重新创建,该方法结束之后,就会删除该队列。
package com.example.mq2;import com.example.mq2.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;@SpringBootTest
public class MessageFileMangerTests {private MessageFileManager messageFileManager = new MessageFileManager();private static final String queueName1 = "testQueue1";private static final String queueName2 = "testQueue2";// 这个方法是每个测试用例执行之前的准备工作@BeforeEachpublic void setUp() throws IOException {// 准备阶段,创建出两个队列,以备后用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例 执行完毕之后的收尾工作@AfterEachpublic void tearDown() throws IOException {// 收尾阶段,就是把刚才的队列给干掉messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}
}
7.9.2 测试创建队列文件
在准备工作阶段, 我们已经写好了创建 队列文件 的代码,因此,这里测试的时候主需要验证就行了
@Test
public void testCreateFiles() {// 创建队列文件已经在上面 setUp 阶段执行过了。此处主要是验证看看文件是否存在File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");// 这里不是验证它是否存在,而是直接验证它是不是一个文件,更具体Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 = new File("./data/" + queueName1 + "/queue_Stat.txt");Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 = new File("./data/" + queueName2 + "/queue_Stat.txt");Assertions.assertEquals(true, queueStatFile2.isFile());}
7.9.3 测试读写统计文件
// 测试读写统计文件@Testpublic void testReadWriteStat() {MessageFileManager.Stat stat = new MessageFileManager.Stat();stat.totalCount = 100;stat.validCount = 50;// 因为stat 的 write 和 read都是私有的,无法直接调用// 此处需要使用反射的形式,来调用 writerStat 和 readStat 了。// Java 原生的反射 API 其实非常难用~~// 此处使用 Spring 帮我们封装好的 反射 的工具类ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);// 写入完毕之后,在调用一下读取,验证读取的结果和写入的数据是一致的MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);Assertions.assertEquals(100, newStat.totalCount);Assertions.assertEquals(50, newStat.validCount);}
这里,用到了一个新的知识点。MessageFileManager
这个类的 readStat()
和 writeStat()
方法,都是私有的,我们在类外,不能直接访问,因此,这里考虑反射来获取到对应的方法。Java 原生的反射相关 API 太复杂了,这里使用 Spring
提供的反射类 ReflectionTestUtils
.
具体的方法:ReflectionTestUtils.invokeMethod();
eg:ReflectionTestUtils.invokeMethod(messageFileManger,"writeStat",queueName1,stat);
eg:MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
参数:
第一个:调用谁的方法
第二个:要调用的方法名
后面则是 该调用的方法要传的参数
7.9.4 测试 sendMessage
这个方法,也就是把消息放到 队列的文件中。因此,测试 sendMessage
之前,就需要先把 队列 和 消息准备好
// 测试 sendMessage@Testpublic void sendMessage() throws IOException, MqException, ClassNotFoundException {// 1. 构造出消息Message message = createTestMessage("testMessage");// 2. 构造出队列// 此处创建的 queue 对象的 name,不能随便写,只能用 queueName1 和 queueName2.// 需要保证这个队列对象对应的目录和文件啥的都存在才行MSGQueue queue = createTestQueue(queueName1);// 调用发送消息的方法messageFileManager.sendMessage(queue, message);// 检查 stat 文件MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager,"readStat", queueName1);Assertions.assertEquals(1,stat.totalCount);Assertions.assertEquals(1,stat.validCount);// 检查 data 文件List<Message> messages = messageFileManager.loadAllMessageFromQueue(queue.getName());Assertions.assertEquals(1, messages.size());// 因为只有一个,这里直接获取Message curMessage = messages.get(0);Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());// 比较两个字节数组的内容是否相同,不能使用 assertEquals 了Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());// 这里打印一下 message 的具体信息,在 Message 类里 重写 toString 方法System.out.println("message:" + message);}
7.9.5 测试 loadAllMessageFromQueue
虽然在上面的测试中,我们在核对发送前后的信息时,已经用到了这个方法,但是只有一条信息,而这里是单独测试这个方法,并且是多条信息。
测试过程:
向队列中发送一百条消息,然后从队列中取出来,和之前的一一对应。
@Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 往队列中插入 100 条消息,然后验证看看这 100 条消息从文件中读取之后,是否和最初是一致的MSGQueue queue = createTestQueue(queueName1);List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 100; i++) {Message message = createTestMessage("testMessage" + i);messageFileManger.sendMessage(queue, message);expectedMessages.add(message);}// 读取所有消息LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}
7.9.6 测试 deleteMessage
测试删除消息,回顾一下咱们的删除消息,这里的删除是逻辑删除,也就是把硬盘上存储的这个数据里面的那个 isValid 属性,设置成0。
这里测试删除消息,就是 创建队列,写入 10 个消息,删除其中的几个消息,再把剩下的全部读出来,判断是否符合预期。
// 测试删除消息// 创建队列,写入 10 个消息,删除其中的几个消息,再把剩下的全部读出来,判断是否符合预期@Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {// 创建队列MSGQueue queue = createTestQueue(queueName1);// 写入 10 个消息List<Message> expectedMessages = new LinkedList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);messageFileManager.sendMessage(queue, message);expectedMessages.add(message);}// 删除其中的几个消息,这里为了方便测试,直接删除后三个messageFileManager.deleteMessage(queue, expectedMessages.get(9));messageFileManager.deleteMessage(queue, expectedMessages.get(8));messageFileManager.deleteMessage(queue, expectedMessages.get(7));// 读取剩下的消息,对比内容是否正确LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7, actualMessages.size());for (int i = 0; i < actualMessages.size(); i++) {Message expectedMessage = expectedMessages.get(i);Message actualMessage = actualMessages.get(i);System.out.println("[" + i + "] actualMessage=" + actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());Assertions.assertEquals(0x1, actualMessage.getIsValid());}}
八、统一硬盘操作(封装数据库和数据文件)
当前,我们已经使用了 数据库 管理了交换机,绑定队列;又使用 数据文件 管理了消息。
下面,我们创建一个类,通过这个类,来统一管理上述操作。这样,上层逻辑如果需要操作硬盘,统一都通过这个类来使用。(上层代码不关心数据是存储在数据库还是文件中)
package com.example.mq2.mqserver.datacenter;import com.example.mq2.common.MqException;
import com.example.mq2.mqserver.core.Binding;
import com.example.mq2.mqserver.core.Exchange;
import com.example.mq2.mqserver.core.MSGQueue;
import com.example.mq2.mqserver.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*** 使用这个类来管理硬盘上的数据* 1. 数据库:交换机,绑定,队列* 2. 数据文件:消息* 上层逻辑如果需要操作硬盘,统一都通过这个类来使用。(上层代码不关心数据是存储在数据库还是文件中)*/
public class DiskDataCenter {// 用这个实例来管理数据库中的数据private DataBaseManager dataBaseManager = new DataBaseManager();// 这个实例用来管理数据文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init() {// 针对上述两个实例进行初始化dataBaseManager.init();// 当前 messageFileManger.init 是空的方法,只是先列在这里,一旦后续需要扩展,就在这里进行初始化即可messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchange() {return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);}public List<MSGQueue> selectALLQueues() {return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings() {return dataBaseManager.selectAllBindings();}// 封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue, message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue, message);// 因为删除了信息,这里还要判断是否要 gcif (messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}}
九、内存数据管理
上述提到的 交换机、队列、绑定、消息等,都是在硬盘上存储的,而我们之前设计的时候,已经说了,内存和硬盘上都要存储一份,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)。下面,就来实现在内存上存储上述数据。
9.1 设计数据结构
交换机:直接使用 HashMap,key 是 name,value 是 Exchange 对象。
【注意】
- 后面可能会在多线程环境下使用,因此,考虑使用 ConcurrentHashMap(它是线程安全的),来代替 HashMap(后面的也都是如此)。
- 因为 name 是唯一的 (之前设定的时候已经规定了,名字是唯一标识符),因此 使用 name 作为了 key
// key 是 exchangeName,value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
队列:也是使用 HashMap,key 是 name,value 是 MSGQueue 对象。(也使用 ConcurrentHashMap 替代,后续也都是,就不在赘述了)
// key 是 queueName,value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
绑定:使用嵌套的 HashMap,key 是 exchangeName,value 是一个 HashMap;第二个 HashMap 的 key 是 queueName,value 是 Binding 对象
// 第一个 key 是 exchangeName,第二个 key 是 queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
消息:使用 HashMap,key 是 messageId,value 是 Message 对象。
// key 是 messageId,value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
表示队列和消息之间的关联:使用嵌套的 HashMap。key 是 queueName,value 是一个 LinkedList,LinkedList 中每个元素是一个 Message 对象。
// key 是 queueName,value 是一个 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
表示 “未被确认” 的消息: 使用嵌套的 HashMap,key 是 queueName,value 是 HashMap,第二个 HashMap 的 key 是 messageId,value 是 Message 对象。
// 第一个 key 是 queueName,第二个是 messageIdprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
- 后续实现消息确认的逻辑,需要根据 ack 响应的内容,这里会提供一个确认的messageld.根据这个 messageld 来把上述结构中的Message 对象找到并移除
- 咱们此处实现的 MQ, 支持两种应答模式(ACK)
自动应答:消费者取了元素,这个消息就算是被应答了,此时这个消息就可以被干掉了。
手动应答:消费者取了元素,这个消息还不算被应答,需要消费者主动再调用一个 basicAck 方法此时才认为是真正应答了,才能删除这个消息。
9.2 管理交换机
- 添加交换机
public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功!exchangeName=" + exchange.getName());}
- 获取交换机
public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}
- 删除交换机
public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功!exchangeName=" + exchangeName);}
9.3 管理队列
- 添加队列
public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功!queueName=" + queue.getName());}
- 获取队列
public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}
- 删除队列
public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功!queueName=" + queueName);}
9.4 管理绑定
- 添加绑定
因为绑定表是一个嵌套的 Map 表,因此,现根据第一个 key(exchangeName),查找对应的 value(也是一个 Map 表,key 是 queueName,value 是 binding),如果不存在则创建;然后根据 第二个key(queueName),查找是否存在相应的 binding,如果存在就报错(已经存在了,就不能在添加同样的了)
public void insertBinding(Binding binding) throws MqException {// 先使用 exchangeName 查一下,对应的哈希表是否存在,不存在就创建一个
// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
// if (bindingMap == null) {
// bindingMap = new ConcurrentHashMap<>();
// bindingsMap.put(binding.getExchangeName(),bindingMap);
// }// 上面这段代码,可以使用下面这个代替,逻辑是一样的ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindingMap) {// 再根据 queueName 查一下。如果已经存在,就抛出异常,不存在才能插入if (bindingMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName=" + binding.getExchangeName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName=" + binding.getExchangeName()+ "queueName" + binding.getQueueName());}
- 获取绑定
获取绑定,写两个版本
① 根据 exchangeName 和 queueName 确定唯一一个 Binding
② 根据 exchangeName 获取所有的 Binding
// 获取绑定,写两个版本// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding// 2. 根据 exchangeName 获取所有的 Bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}
- 删除绑定
public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null) {throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName" + binding.getExchangeName()+ "queueName" + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName=" + binding.getExchangeName()+ "queueName" + binding.getQueueName());}
9.5 管理消息
- 添加消息
public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功!messageId=" + message.getMessageId());}
- 根据 id 查询信息
public Message getMessage(String messageId) {return messageMap.get(messageId);}
- 根据 id 删除信息
public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除!messageId=" + messageId);}
- 发送消息到指定队列
// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中// 先根据队列的名字,找到该队列对应的消息链表
// LinkedList<Message> messages = queueMessageMap.get(queue.getName());
// if (messages == null) {
// messages = new LinkedList<>();
// queueMessageMap.put(queue.getName(),messages);
// }// computeIfAbsent 是线程安全的LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());// 再把数据加到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心插入一下,即使 message 已经在 消息中心存在了,重复插入也没有关系// 主要是相同的 messageId,对应的 message 的内容一定是一样的。(服务器代码不会对 Message 内容(BasicProperties 和 body)做修改)addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中!messageId=" + message.getMessageId());}
- 从队列中取消息
// 从队列中取消息
public Message pollMessage(String queueName) {// 根据队列名,查找一下,对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);// 如果没找到,说明队列中没有任何消息if (messages == null) {return null;}synchronized (messages) {// 消息数为 0,这里也返回 nullif (messages.size() == 0) {return null;}// 链表中有元素,就进行头删Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出!messageId" + currentMessage.getMessageId());return currentMessage;}}
- 获取指定队列中消息的个数
// 获取指定队列中消息的个数
public int getMessageCount(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if (messages == null) {// 队列中没有消息return 0;}synchronized (messages) {return messages.size();}}
9.6 管理待确认的消息
- 添加未确认的消息
// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageHashMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 消息进入待确认队列!messageId" + message.getMessageId());}
- 删除未确认的消息(消息已经确认了)
// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if (messageHashMap == null) {return;}messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除!messageId" + messageId);}
- 获取指定的未确认的消息
// 获取指定的未确认的消息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if (messageHashMap == null) {return null;}return messageHashMap.get(messageId);}
9.7 从硬盘中恢复数据到内存
如果遇到一些突发情况,比如关机,服务器挂了等,可能会导致内存中的数据丢失,这时候,就需要从硬盘上,把这些数据恢复到内存中。(这也是把数据在硬盘上也存储一份的作用)。
具体步骤:
- 清空之前内存集合中的数据(直接清空,防止残留数据 对 新数据产生干扰)
- 恢复所有的交换机数据
- 恢复所有的队列数据
- 恢复所有的绑定数据
- 恢复所有的消息数据
注意:不需要恢复 等待确认 的消息。因为在 当消息在等待 ACK 的时候,服务器重启了,此时消息还没有被处理,就相当于还没有被取走,等到消费者用的时候,从 messageMap 里重新取一次就行了,不需要单独存起来。
// 这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个维度的数据都恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {// 0. 清空之前的所有数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();// 1. 恢复交换机的所有数据List<Exchange> exchanges = diskDataCenter.selectAllExchange();for (Exchange exchange : exchanges) {exchangeMap.put(exchange.getName(), exchange);}// 2. 恢复所有的队列数据List<MSGQueue> queues = diskDataCenter.selectALLQueues();for (MSGQueue queue : queues) {queueMap.put(queue.getName(), queue);}// 3. 恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(), binding);}// 4. 恢复所有的消息数据// 遍历所有的队列,根据每个队列的名字,来获取所有的消息for (MSGQueue queue : queues) {LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);for (Message message : messages) {messageMap.put(message.getMessageId(), message);}}// 注意!!针对 “未确认的消息” 这部分内存中的数据,不需要从硬盘恢复,之前考虑存储的时候,也没设定这一块// 一旦在等待 ack 的过程中,服务器重启了,此时这些 “未被确认的消息”,就恢复成 “未被取走的消息”。// 这个消息在硬盘上存储的时候,就是当做 “未被取走”
}
9.8 测试 MemoryDataCenter
9.8.1 准备阶段
- 先做好 初始化 和 收尾工作。
@SpringBootTest
public class MemoryDataCenterTests {private MemoryDataCenter memoryDataCenter = null;@BeforeEachpublic void setUp() {memoryDataCenter = new MemoryDataCenter();}@AfterEachpublic void tearDown() {// 因为数据是保存在内存上,为了避免影响,每次都要清空memoryDataCenter = null;}
}
这里,每个单元测试前,都要创建一个新的实例,并且该方法结束后,也要将该实例置为空;这是因为
MemoryDataCenter
的数据都是保存在内存上,因此需要这些操作来保证下一次运行,数据都是新的,不会受上次干扰。
与MessageFileManager
不同,MessageFileManager
的数据都是保存在文件上(下图是 MessageFileManager 的准备、收尾 工作。)
因此,每次只需要把文件上的内容销毁即可,不需要每次都创建新的实例。
- 准备好测试用的交换机和队列
// 创建一个测试交换机private Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);return exchange;}// 创建一个测试队列private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);return queue;}
9.8.2 测试交换机
这里,使用一个测试方法,直接测试交换机的 创建、查找以及删除。
// 针对交换机进行测试@Testpublic void testExchange() {// 1. 先构造出一个交换机并插入Exchange expectExchange = createTestExchange("testExchange");memoryDataCenter.insertExchange(expectExchange);// 2. 查询出这个交换机,比较结果是否一致。此处直接比较这俩引用指向同一个对象,// 因为是在内存上存储的,保存在内存上的 Map 表中,实体只有一个,两个引用都是指向它Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectExchange, actualExchange);// 3. 删除这个交换机memoryDataCenter.deleteExchange("testExchange");// 4. 再查一次,看是否就查不到了actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertNull(actualExchange);}
9.8.3 测试队列
// 针对队列进行测试@Testpublic void testQueue() {// 1. 构造一个队列,并插入MSGQueue exceptQueue = createTestQueue("testQueue");memoryDataCenter.insertQueue(exceptQueue);// 2. 查询这个队列,并比较MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(exceptQueue, actualQueue);// 3. 删除这个交换机memoryDataCenter.deleteQueue("testQueue");// 4. 再次查询队列,看是否能查到actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertNull(actualQueue);}
9.8.4 测试绑定
// 针对绑定进行测试@Testpublic void testBinding() throws MqException {// 1. 创建绑定Binding expectBinding = new Binding();expectBinding.setExchangeName("testExchange");expectBinding.setQueueName("testQueue");expectBinding.setBindingKey("testBindingKey");memoryDataCenter.insertBinding(expectBinding);// 2. 查询 (我们有两个获取绑定的方法,因此查询两次)// 根据交换机和队列名。查询指定的绑定Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");Assertions.assertEquals(expectBinding, actualBinding);// 根据交换机,查询所有的绑定ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(1, bindingMap.size());Assertions.assertEquals(expectBinding, bindingMap.get("testQueue"));// 3. 删除memoryDataCenter.deleteBinding(expectBinding);// 4. 查询看是否存在actualBinding = memoryDataCenter.getBinding("testExchange", "TestQueue");Assertions.assertNull(actualBinding);}
9.8.5 测试消息
- 测试消息本身的 添加、获取、删除
private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());return message;
}@Test
public void testMessage() {Message expectMessage = createTestMessage("toMessage");memoryDataCenter.addMessage(expectMessage);Message actualMessage = memoryDataCenter.getMessage(expectMessage.getMessageId());Assertions.assertEquals(expectMessage, actualMessage);memoryDataCenter.removeMessage(expectMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(expectMessage.getMessageId());Assertions.assertNull(actualMessage);
}
- 测试发送消息
- 创建一个队列,创建 10 条消息,把这些消息都插入队列中
- 从队列中取出这些消息
- 比较取出的这些消息和之前的消息是否一致
@Test
public void testSendMessage() {// 创建一个队列,创建 10 条消息,把这些消息都插入队列中MSGQueue queue = createTestQueue("testQueue");List<Message> expectMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);memoryDataCenter.sendMessage(queue, message);expectMessages.add(message);}// 2. 从队列中取出这些消息List<Message> actualMessage = new ArrayList<>();while (true) {Message message = memoryDataCenter.pollMessage("testQueue");if (message == null) {break;}actualMessage.add(message);}// 3. 比较取出的这些消息和之前的消息是否一致Assertions.assertEquals(expectMessages.size(), actualMessage.size());for (int i = 0; i < actualMessage.size(); i++) {Assertions.assertEquals(expectMessages.get(i), actualMessage.get(i));}
}
- 测试消息应答
@Testpublic void testMessageWaitAck() {Message expectMessage = createTestMessage("expectMessage");memoryDataCenter.addMessageWaitAck("testQueue", expectMessage);Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectMessage.getMessageId());Assertions.assertEquals(expectMessage, actualMessage);memoryDataCenter.removeMessageWaitAck("testQueue", expectMessage.getMessageId());actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectMessage.getMessageId());Assertions.assertNull(actualMessage);}
- 测试恢复信息(Recovery)
@Test
public void testRecovery() throws IOException, MqException, ClassNotFoundException {// 由于后续需要进行数据库操作,依赖 Mybatis,就需要先启动 SpringApplication,这样才能进行后续的操作MqApplication.context = SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造数据DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange expectedExchange = createTestExchange("testExchange");diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue = createTestQueue("testQueue");diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage = createTestMessage("testMessage");diskDataCenter.sendMessage(expectedQueue, expectedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果// 这里就不能直接对比引用了,因为他是从硬盘中反序列化之后得到的,这个过程创建了新的对象Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());Message actualMessage = memoryDataCenter.pollMessage("testQueue");Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());// 4. 清理硬盘的数据,把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录)MqApplication.context.close(); // 先关闭连接,才能正常删除掉(释放使用)File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir); // 可以递归删除目录}
运行之后,发现报错了
这个错误说,我们的队列文件不存在,也就是 queue_data.txt 和 queue_stat.txt 这两个文件不存在(这里也体现了,我们打印错误信息的好处,不然有时候定位到代码,我们也不能快速反应什么问题,不如文字直入心灵)。因此,我们就根据这个点,来排查问题。
既然不存在,说明,创建文件的代码 (创建文件的代码在 MessageFileManger 中) 没有正确运行,所以我们先找到这个方法,
发现,这个方法只在测试代码中调用了,原因就是我们封装的时候,上层代码没有调用这个方法,因此找到他的上层先关代码
我们统一硬盘操作的时候,把 MessageFileManger
这个类封装在了 DiskDataCenter
这个类里,所以,应该由这个类,调用 MessageFileManger
中创建数据文件的代码,我们找到相应的代码
这是与队列相关的操作,可以发现,我们只是创建了队列,并没有创建队列文件,因此,添加上即可。同时,销毁队列时,也要删除队列文件。修改如下:
在 DiskDataCenter
中,添加相应的代码:
如图,添加上述两行代码即可。
9.9 总结
上述过程,主要是在内存中,而这就需要我们设计合理的数据结构来存储数据。
我们广泛使用了 哈希表、链表、嵌套的数据结构 等,来保存和管理 交换机、队列、绑定、消息;上述都是基本的数据结构,因此我们只有深刻理解这些数据结构,才能更好地去使用。
其次我们要考虑线程安全的问题:
要不要加锁?锁加到哪里?使用哪个对象作为锁对象?
这个没有标准的答案,要根据具体的情况来分析。
总的原则: 分析如果不加锁,这个代码会造成啥样的后果/问题?这个后果你觉得是否严重?
十、 虚拟主机(VirtualHost)设计
前面我们已经描述了虚拟主机是什么
此处为了简单,只实现单个虚拟主机;并不打算实现 添加/删除 虚拟主机。但是,仍会在设计数据结构时,留下这样一个扩展空间。
虚拟主机,不仅仅要管理数据,还要提供一些核心 API,供上层代码调用。
10.1 创建 VirtualHost 类
这里,做好准备工作,把 MemoryDataCenter
以及 DiskDataCenter
引入过来,并在构造方法中进行初始化,构造初始数据。
/*** 通过这个类,来表示 虚拟主机* 每个虚拟主机下面都管理着自己的 交换机,队列,绑定,消息,数据* 同时提供 api 供上层调用* 针对 VirtualHost 这个类,作为业务逻辑的整合者,就需要对于代码中抛出的异常进行处理了*/
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataCenter diskDataCenter = new DiskDataCenter();// 构造方法public VirtualHost(String name) {this.virtualHostName = name;// 对于 MemoryDataCenter 来说,不需要额外的初始化操作的// 但是,针对 DiskDataCenter 来说,则需要进行初始化操作,建库建表和初始数据的设定diskDataCenter.init();// 另外还需要针对硬盘的数据,进行恢复到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println("[VirtualHost] 恢复内存数据失败!");}}public String getVirtualHostName() {return virtualHostName;}public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter() {return diskDataCenter;}
}
10.2 创建交换机(exchangeDelcare)
一个虚拟主机上,肯能会存在多个交换机,那么,我们应该如何表示交换机和虚拟主机之间的从属关系呢?
- 方案一:参考数据库设计,“一对多”方案,比如给交换机表,添加个属性,用来表示虚拟主机的 id/name…
- 方案二:在 VirtualHost 这个类中,重新约定交换机的名字。让 交换机的名字 = 虚拟主机的名字 + 交换机真实的名字。
- 方案三:更优雅的办法,是给每个虚拟主机,分配一组不同的数据库和文件… (比方案二更麻烦)
虚拟主机存在的目的,就是为了保证隔离,让不同虚拟主机之间的内容不要有影响~~
比如:
在虚拟主机1 中 ,搞了个 exchange,叫做 testExchange;
在虚拟主机2 中,也搞一个 exchange,也叫做 testExchange
上述这种情况是可以的,虚机主机就是干这个的。此时我们就可以受用方法二,加上前缀:
virtualHost1testExchange
,virtualHost2testExchange
。
按照上述这种方式,也可以去区分不同的队列。进一步的,由于绑定是和交换机和队列都相关,此时绑定也就被隔离开了。
再进一步,消息和队列是强相关的,队列名区分开了,消息自然也就区分开了~~
这里,我们就采用 方案二(RabbitMQ也是这样实现的)来编写我们的代码。
编写交换机的步骤:
- 把交换机的名字,机上虚拟主机作为前缀
- 判断该交换机是否已经存在。直接通过内存查询,存在直接返回。
- 若不存在,则创建,并把交换机对象写入硬盘(如果持久化的话),随后再写入内存
上述逻辑,先写硬盘,后写内存。目的就是因为硬盘更容易写失败,如果硬盘写失败了,内存就不写了。要是先写内存。内存写成功了,硬盘写失败了,还需要把内存的数据给再删掉,就比较麻烦
// 创建交换机
// 如果交换机不存在,就创建,如果存在,直接返回
// 返回值是 boolean。 创建成功,返回 true。失败返回 false
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) {// 把交换机的名字,加上虚拟主机名作为前缀exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1. 判断该交换机是否已经存在,这里直接在内存上判断,因为硬盘上只是为了持久化存储,是为了 recovery// 而内存上,是所有的,不管有没有持久化存储,内存上都有Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在!exchangeName=" + exchangeName);return true;}// 2. 不存在,真正创建交换机。先构造 Exchange 对象Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4. 把交换机写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建完成!exchangeName=" + exchangeName);// 上述逻辑,先写硬盘,后写内存。目的就是因为硬盘更容易写失败,如果硬盘写失败了,内存就不写了// 要是先写内存。内存写成功了,硬盘写失败了,还需要把内存的数据给再删掉,就比较麻烦}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败!exchangeName=" + exchangeName);e.printStackTrace();return false;}
}
【注意1】
这里,因为我们方法有个参数是Map<String, Object> arguments
,但是,在对应的Exchange
类中却没有对应的getter
方法,因此,我们还需要再补充一个getter
方法。
在Exchange
类中补充getter
方法
public class Exchange {......public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}
【注意2】
这里,考虑到多线程的环境下,若多个线程同时创建交换机,如果要创建的交换机的名字相同,但是类型不同,那么最后创建的是哪个交换机呢?为了避免这种情况,我们就要考虑加锁。
添加一个锁对象,作为VirtualHost
的成员,这样后续与交换机相关的锁,都可以用这个对象。
private final Object exchangeLocker = new Object();
10.3 删除交换机(exchangeDelete)
- 根据交换机的名字找到对应的交换机
- 删除硬盘上的数据
- 删除内存上的数据
// 删除交换机
public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1. 先找到对应的交换机Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if (toDelete == null) {throw new MqException("[VirtualHost] 交换机不存在无法删除!");}// 2. 删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3. 删除内存中的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功!exchangeName=" + exchangeName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败!exchangeName=" + exchangeName);e.printStackTrace();return false;}
}
10.4 创建队列(queueDelcare)
过程与创建交换机类似
- 拼接名字
- 判断是否存在,存在直接返回
- 不存在则创建,并写入到硬盘(如果持久化),最后写入内存
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {// 把队列的名字,给拼接上虚拟主机的名字queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 判断队列是否存在MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if (existsQueue != null) {System.out.println("[VirtualHost] 队列已经存在!queueName=" + queueName);return true;}// 2. 创建队列对象MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 写硬盘if (durable) {diskDataCenter.insertQueue(queue);}// 4. 写内存memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功!queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败!queueName=" + queueName);e.printStackTrace();return false;}
}
【注意】
对于操作队列,和交换机一样,也是需要加锁的,并且操作交换机和操作队列是不同的,因此,不能用同一把锁。需要额外创建一个锁;同交换机,下面这个变量也是VirtualHost
的成员,专门用来给队列加锁。
private final Object queueLocker = new Object();
10.5 删除队列(queueDelete)
- 根据交换机的名字找到对应的交换机
- 删除硬盘数据
- 删除内存中数据
// 删除队列
public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1. 根据队列名字,查询下当前的队列对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在!无法删除!queueName=" + queueName);}// 2. 删除硬盘数据diskDataCenter.deleteQueue(queueName);// 3. 删除内存数据memoryDataCenter.deleteQueue(queueName);System.out.println("[VirtualHost] 删除队列成功!queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除队列失败!queueName=" + queueName);e.printStackTrace();return false;}
}
10.6 创建绑定(queueBind)
bindingKey
是进⾏ topic 转发时的⼀个关键概念,使⽤router 类
来检测是否是合法的 bindingKey。- 后续再介绍
router.checkBindingKey
的实现,此处先留空。 - 上述中涉及到的
router
相关的操作,我们后续会进行实现。router
是Router
类的一个实例 ,这里我们用这个类来实现交换机的转发规则,同时也借助这个类验证 bindingKey/routingKey 是否合法等操作。
创建绑定的,步骤:
- 拼接交换机和队列的名字
- 判断当前的绑定是否已经存在
- 验证 bindingKey 是否合法
- 创建binding对象
- 获取一下对应的交换机和队列。如果交换机或者队列不存在,这样的绑定也是无法创建的。
- 创建成功后,先写硬盘,后写内存
// 交换机队列绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 判定当前的绑定是否已经存在了Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding != null) {throw new MqException("[VirtualHost] binding 已经存在!queueName=" + queueName+ ",exchangeName=" + exchangeName);}// 2. 验证 bindingKey 是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法!bindingKey=" + bindingKey);}// TODO 3,4 这里的逻辑应该能变一下,先判断,在创建// 3. 创建 Binding 对象Binding binding = new Binding();binding.setQueueName(queueName);binding.setExchangeName(exchangeName);binding.setBindingKey(bindingKey);// 4. 获取一下对应的交换机和队列。如果交换机或者队列不存在,这样的绑定也是无法创建的。MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在!exchangeName=" + exchangeName);}// 5. 先写硬盘if (queue.isDurable() && exchange.isDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功!exchangeName=" + exchangeName+ ",queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败!exchangeName=" + exchangeName+ ",queueName=" + queueName);e.printStackTrace();return false;}
}
【注意】
对于绑定,和 交换机 以及 队列 都相关,因此,加锁的时候,就要加两把锁,一把是对交换机的,另一把是对 队列的,缺一不可。并且,再删除绑定的时候,也是需要加两把锁的,注意,这两个方法的加锁顺序要相同,不然一个先对交换机加锁,另一个先对队列加锁,容易造成死锁的问题。
10.7 解除绑定(queueUnBind)
解除绑定,刚开始的思路设计如下:
也就是先验证绑定是否存在,然后再看该绑定对应的交换机和队列是否存在,如果有不成立的,那么该绑定就不能删除。
判断交换机和队列是否存在,是因为我们的 删除交换机和队列的操作,是直接删除的,没有考虑和绑定之间的关系,这样就会造成 如果我先删除了队列,那么绑定就删不掉了。
针对绑定删除时涉及到的这个问题,可选的解决方案,主要有两种:
- 第一种方式:参考类似于 mysq! 的外键一样.。删除队列/交换机的时候,判定一下看当前队列/交换机是否存在对应的绑定。如果存在,则禁止删除队列/交换机,要求先解除绑定,再尝试删除队列/交换机
- 上面这种方法,有点是更严谨,但是太麻烦,尤其是我们删除一个队列的时候,由于我们的 绑定表是嵌套的HashMap 表,需要一次遍历内外两层HashMap 去寻找 该队列对应的绑定,太慢,而且太复杂,这里学习阶段,就不使用这种方式了。
- 第二种方式就是,删除绑定的时候,就直接删除,不去验证 交换机/队列是否存在。 这种方式的有点就是简单,缺点就是不怎么严谨。但在我们学习阶段,基本不会这种出现问题
因此,这里就直接删除 绑定。
// 交换机队列解除绑定
public boolean queueUnBind(String queueName, String exchangeName) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 获取 binding 看是否已经存在~Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败!绑定不存在!exchangeName=" + exchangeName+ "queueName=" + queueName);}// 无论绑定是否持久化了,都尝试从硬盘上删一下,就算不存在,这个删除也无副作用diskDataCenter.deleteBinding(binding);// 4. 删除内存上的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 绑定删除成功");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败");e.printStackTrace();return false;}
}
10.8 关于上述核心API线程安全的总结
一:
上述的 API,都加了很多锁,并且锁对象还是 VirtualHost
这个类的成员变量,这个锁的粒度是很大的。这样加锁,可能会出现效率问题,比如:我们针对 A 交换机进行操作,此时就会影响到 B 交换机的操作。正常情况下,这两个交换机是不能相互影响的。
针对上述情况,确实可以做出调整,使用更加细粒的锁,但是影响不大。
对于 Broker Server
, 创建交换机, 创建绑定,创建队列,删除交换机, 删除绑定,删除队列… 都属于 低频操作 !!! 既然是低频操作,所以遇到两个线程都去操作创建队列之类的情况本身就概率很低了。
因此,绝大多数不会触发锁冲突;并且 synchronized
首先是偏向锁状态,这个状态下加锁成本也还好;其实只有遇到竞争才真加锁,因此这里就不调整更加细粒的锁了。
二:
既然在这一层代码加锁了,那么里面的 MemoryDatacenter
中的操作是否就不必加锁了。那么 MemoryDatacenter
是否之前的加锁就都没意文了吗?
其实不是的,咱们也不知道 MemoryDatacenter
这个类的方法会给哪个类调用的。当前 VirtualHost
自身是保证了线程安全的,此时在 VirtualHos
内部调用MemoryDataCenter
, MemoryDataCenter
里面不加锁问题不大。但是如果是另一个别的类,也多线程调用 MemoryDataCenter
,这种情况就不好说了,因此 “先保证自身是安全的”。
10.9 发送消息到指定的交换机/队列中(basicPublish)
同上,代码中牵扯到的 router.checkRoutingKey(routingKey)
,也是在后面实现。
// 发送消息到指定的交换机/队列中
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法!routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在!exchangeName=" + exchangeName);}// 4. 判断交换机类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 这里的规则就是让 routingKey 的值就等于没有拼接前队列的名字,// 这样拼接后,队列的名字还是相当于还是 virtualHostName + queueName,就是相当于用 routingKey 指定要转发的队列名字String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);}// 7. 队列存在,直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发// 5. 找到该交换机关联的所有绑定,并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingMap.entrySet()) {// 1) 获取到绑定对象,判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了,可能此处有多个这样的队列// 希望不要因为一个队列的失败,影响到其他队列的消息传输System.out.println("[VirtualHost] basisPublish 发布消息时,发现队列不存在!queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列// 如果是 fanout,所有绑定的队列都要转发// 如果是 topic,还需要判定下,bindingKey 和 routingKey 是不是匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败");e.printStackTrace();return false;}
}