RabbitMQ的概述
Rabbit是一个公司名,MQ(Message queue)消息队列的意思,RabbitMQ是Rabbit企业下的一个消息队列产品。
RabbitMQ是一个实现了AMQP的消息队列的服务,是当前主流的消息中间件之一。
什么是MQ:
MQ本质是一个队列,FIFO先入先出,只不过队列中存放的内容是消息(Message)而已,消息是非常简单的,比如只包含文本字符串,JSON,也可以很复杂,比如内嵌对象。
MQ多用于分布式系统之间通信。
系统之间的调用通常有两种方式:
1.同步通信:
直接调用对方的服务,数据从一端出发后立即就可以达到另一端。
2.异步通信:
数据从一端发出后,先进入一个容器进行临时储存,当达到某种条件后,再由这个容器发送给另一端,这个容器的一个具体实现就是MQ(Message queue)
MQ的作用:
1.异步解耦:
在业务流程中,有些操作可能非常耗时,但并不需要立即返回结果,可以借助MQ把这些操作异步化,比如用户注册后发送注册短信或者邮件通知,可以作为异步任务进行处理,而不必等待这些操作完成后才告知用户注册成功。
2.流量削峰:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样突发流量并不常见,但是如果投入大量资源来解决这类峰值,无疑是巨大的浪费,使用MQ能够使用关键组件支撑突发访问压力,不会因为突发流量而崩溃,比如秒杀或者促销,可以使用MQ来控制流量,将请求排队,然后根据自己的处理能力来消化这些请求。
3.消息分发:
当多个系统需要对同一数据做出响应,可以使用MQ来进行消息分发,比如支付成功,支付系统可以向MQ发送消息,其他系统订阅该消息,而无需轮询数据库。
4.延迟通知:
在需要在特定时间后发送通知场景,可以使用MQ的延迟队列消息功能,比如在电子商务平台中,如果用户下单后在一定时间内未付款,可以使用延迟队列在超时后自动取消订单。
RabbitMQ的工作流程:
先看看RabbitMQ的工作流程图:
Producer和Consumer:
Producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息
Comsumer:消费者,也是RabbitMQ Server的客户端,从RabbitMQ中接受消息
Broker:就是RabbitMQ Server,主要是接收消息和收发消息。
Connection和Channel:
Connection:连接,是客户端和RabbitMQ服务器之间的一个TCP连接,这个连接是建立消息传递的基础,他负责传输客户端和服务器之间所有的数据和控制信息。
Channel:通道,信道,Channel是在Connection之上的一个抽象层,在RabbitMQ中,一个TCP连接有多个Channel,每个Channel都是独立的的虚拟链接,消息的发送和接收都是基于Channel的。
Virtual host:
Virtual host:虚拟主机,这是一个虚拟的概念,他为消息队列提供了一种逻辑上的隔离,对于RabbitMQ而言,一个BrokerServer上可以存在多个Virtual host,当多个不同用户使用同一个RabbitMQ Server提供服务时,可以虚拟划分出多个vhost,每个用户在自己的Virtual host中创建exchange/queue等。
Queue队列:
Queue:队列,是RabbitMQ内部对象,用于储存消息。
多个订阅者还可以订阅同一个队列。
Exchange:
Exchange:交换机,Message到达Broker的第一站,他负责接受生产者发送到的消息,并根据特定的规则把这些消息路由对应的一个或者多个Queue队列中。
工作流程:
RabbitMQ快速入门:
1.引入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifacId>amqp-client</artifacId><version>5.7</version>
</dependency>
2.生产者代码:
1.建立连接:
//创建工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);//默认端口号
factory.setVirtualHost("虚拟机名称");
factory.setUssername("用户名");
factory.setPassword("密码");
//创建连接Connection
Connection connection =factory.newConnection();
2.创建channel:
Channel channel=connection.createChannel();
3.声明一个队列:
一个队列中有以下参数,但是创建队列的时,并不是每个参数都会用到:
queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments)
queue:队列的名称。
durable:是否可持久化,true-设为持久化,持久化的队列是会存盘的,服务器重启后,消息不会丢失。
exclusive:
是否独占,只能有一个消费者监听队列
当Connection关闭时,是否删除队列。
autoDelete:是否自动删除,当没有Consumer时,自动删除。
argument:一些参数。
channel.queueDeclare("队列名称",true,false,false,null);
4.发送消息:
当一个新的RabbitMQ节点启动时,会预声明几个内置的交换机,内置交换机名称是空字符串(""),生产者发送的消息会根据队列名称直接路由到对应的队列。
basicPublish(String exchange,String routingKey,AMQP.BasicProperties props。byte[] body);
exchange:交换机的名字,发送消息到哪个交换机中,简单模式下,默认使用""。
routingKey:路由名称,routingKey=队列名称。
props:配置的信息。
body:发送消息的数据。
String msg="Hello world!";
channel.basicPublish("","队列名称",null,msg.getBytes());
System.out.println(msg+"消息发送成功!");
5.释放资源:
channel.close();
connection.close();
3.消费者代码:
1.建立连接。
2.创建channel
3.声明一个队列
上面三个步骤的代码和生产者一样。
4.消费消息:
消费当前队列:
basicConsume(String queue,boolean autoAck,Consumer callback);
queue:队列名称。(从哪个队列获取消息)
autoAck:是否自动确认,消费者收到消息后,自动和MQ确认。
callback:回调对象。
String basicConsume(String queue,boolean autoAck,Consume callback) throws IOException
Consumer:
Consumer用于定义消费者的行为,当我们需要从RabbitMQ接收到消息,需要提供一个实现了Consumer接口的对象。
DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口。
核心方法:
handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body),从队列接收到消息后,会自动执行这个方法。
这个方法可以定义如何处理收到的消息,例如打印消息内容,处理业务逻辑或者将消息储存到数据库等。
consumerTag:消费者标签,通常是消费者在订阅队列时指定的。
envelope:包含消息的封包信息,如队列名称,交换机等。
properties:一些配置文件。
body:消息的具体内容。
DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void headleDelivery (String consumerTag,Envelope envelop,AMQP.BasicProperties properties,byte[] body) throws IOExeception{System.out.println("接收到消息:"+new String(body));
}
};
channel.basicConsume("队列名称",true,consumer);
5.释放资源:
//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();