RabbitMQ的文档介绍及使用
消息队列是为应用之间建立异步通信的工具。应用之间的通信方式有很多,比如我们所熟知的最原始的基于 TCP 协议的 socket 直接调用;基于 http 协议的 RESTful;或者是基于文件系统的 FTP 等等。
-
消息中间件的介绍
目前主流的消息中间件是存在4中的,常见的是Kafka 、RabbitMQ、RocketMQ、ActiveMQ这四种,
对比图如下
然后作为综合性能比较好的RabbitMQ,本文将着重讲解
-
RabbitMQ的系统架构
安装的教程可以看笔者的另外一篇文章:
https://blog.csdn.net/LMS2022223000/article/details/150118038?fromshare=blogdetail&sharetype=blogdetail&sharerId=150118038&sharerefer=PC&sharesource=LMS2022223000&sharefrom=from_link
-
Broker:要使用 RabbitMQ 来收发消息,必须要安装一个 RabbitMQ 的服务,可以安装在 Windows 上面也可以安装在 Linux 上面,默认是 5672 的端口。这台安装 RabbitMQ的服务器的机器我们把它叫做 Broker。
-
VirtualHost虚拟机:在一个Broker上面划分出多个隔离的环境,这多个环境就可以理解成是VirtualHost虚拟机。每个VirtualHost虚拟机相当月一个相对独立的RabbitMQ服务器,每个VirtualHost虚拟机之间是相互隔离的。一个VirtualHost虚拟机里面可以有若干个Exchange和Queue,同一个VirtualHost虚拟机里面不能有相同名称的Exchange或者Queue,每个VirtualHost虚拟机中的exchange、queue、message不能互通。
-
Connection:无论是生产者还是消费者,都需要和 Broker 建立连接,这个连接就是Connection,这个连接是一个 TCP 的长连接。一个生产者或一个消费者与 Broker 之间只有一个Connection,即只有一条TCP连接。
-
Channel:消息推送使用的通道,如果每一次访问消息队列中间件都建立一个TCP连接的话,那么系统资源会被大量的占用,效率也会降低,所以AMQP提供了Channel机制,共享同一个TCP连接,而一个TCP连接里可以有大量的Channel,不同的Channel之间是完全隔离的。
-
Exchange:交换机,用于接收消息,可根据路由键将消息转发到绑定的队列。
-
Queue:也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者。
-
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来,在进行绑定的时候一般会指定一个binding key。
-
Routing Key:一个路由规则,生产者将消息发送到交换机时,会在消息头上携带一个 key,这个 key就是routing key,来指定这个消息的路由规则。
-
Producer:消息生产者,就是投递消息的程序。
-
Consumer:消息消费者,就是接受消息的程序。
-
交换机的类型
交换机负责接收消息,并根据不同类型的路由规则,将消息转发到一个或多个队列
-
每个交换机都有
-
名称(可以为空)
-
类型(direct、fanout、topic、headers)
-
绑定关系(Binding)用于指定哪些队列接收哪些消息
-
交换机的四种类型
类型 | 名称 | 特点说明 |
直连型 | direct | 精准路由,依据 routingKey 完全匹配 |
扇出型 | fanout | 广播模式,忽略 routingKey, 发送给所有队列 |
主题型 | topic | 模糊匹配,支持通配符(* 和 #) |
头部型 | headers | 根据消息头(headers)匹配路由规则 |
-
direct(直连)交换机
特点
-
routingKey 精准匹配
-
每条信息只会进入到绑定了指定 routingKey 的队列
-
声明一个名为hmall.direct的交换机
-
声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
-
声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
-
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
-
在publisher中编写测试方法,向hmall.direct发送消息
消息接收:
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
消息发送:
@Test
public void testSendDirectExchange() { // 交换机名称 String exchangeName = "hmall.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "bule", message);
}
消息将被投递到绑定了 "blue"
的队列。
-
fanout(扇出)交换机
特点
-
不关心 routingKey
-
消息被 广播到所有绑定的队列
-
-
-
1) 可以有多个队列
-
2) 每个队列都要绑定到Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
消息发送:
@Test
public void testFanoutExchange() { // 交换机名称 String exchangeName = "hmall.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
✅ 应用场景
-
广播通知(如:发短信、发邮件)
-
多系统日志收集
-
topic(主题)交换机
特点
-
支持模糊匹配 routingKey(以点分隔)
-
通配符说明:
-
*
匹配一个单词(例如order.*
) -
#
匹配多个单词(例如order.#
)
-
假如此时publisher发送的消息使用的RoutingKey共有四种:
-
china.news 代表有中国的新闻消息;
-
china.weather 代表中国的天气消息;
-
japan.news 则代表日本新闻
-
japan.weather 代表日本的天气消息;
解释:
-
topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
-
china.news
-
china.weather
-
topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
-
china.news
-
japan.news
消息发送:
/** * topicExchange */ @Test
public void testSendTopicExchange() { // 交换机名称String exchangeName = "hmall.topic"; // 消息 String message = "喜报!孙悟空大战哥斯拉,胜!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收:
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
-
headers(头部)交换机
✨ 特点
-
routingKey
不再使用 -
根据消息头属性进行路由(
headers
字段) -
支持
x-match=all
或x-match=any
(全部匹配 / 任意匹配)
🧪 示例
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(headers) .build(); channel.basicPublish("headersExchange", "", props, body);
Map<String, Object> args = new HashMap<>();
args.put("x-match", "all");
args.put("format", "pdf");
args.put("type", "report");
channel.queueBind("reportQueue", "headersExchange", "", args);
✅ 应用场景
-
消息内容非常复杂,不适合用
routingKey
-
使用
HTTP-style
路由方式
完美撒花~