RabbitMQ-api开发
前言
MQ就是接收并转发消息
核心概念
admin是用户
每个虚拟机上都有多个交换机
快速入门
引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.22.0</version></dependency>
生产者
建立连接
消费者
其实这里消费了两条消息的,因为释放太快了,有一条还来不及打印
如果没有释放资源的话
这两个就会一直存在东西
connection关闭的话,channel会自动关闭的,所以会报错这里
工作模式的介绍
官网
简单模式,一个生产者,一个消费者
工作队列模式:一个生产者,多个消费者,c1和c2共同消费p生产的消息
交换机类型
作⽤: ⽣产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产
者将消息投递到队列中, 实际上这个在RabbitMQ中不会发⽣. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协
议⾥还有另外两种类型, System和⾃定义, 此处不再描述.
- Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式),就是消息会重复消费的,交换机是广播类型
- Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式),就是bingdingkey是通配符,但是routingkey一定是字符串,看匹不匹配
- headers类型的交换器不依赖于路由键的匹配规则来路由消息, ⽽是根据发送的消息内容中的
headers属性进⾏匹配. headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.
Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑
定,或者没有符合路由规则的队列,那么消息就会丢失
RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.
Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指
定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了
就是看routingkey匹配哪个bindingkey,匹配哪个的话,就去哪个队列
当消息的Routing key与队列绑定的Bindingkey相匹配时,消息才会被路由到这个队列
BindingKey其实也属于路由键中的⼀种, 官⽅解释为:the routingkey to use for the binding.
可以翻译为:在绑定的时候使⽤的路由键. ⼤多数时候,包括官⽅⽂档和RabbitMQJava API 中都把
BindingKey和RoutingKey看作RoutingKey, 为了避免混淆,可以这么理解:
- 在使⽤绑定的时候,需要的路由键是BindingKey.
- 在发送消息的时候,需要的路由键是RoutingKey
第三个,发布订阅模式,x表示的是交换机,有特殊作用,每个生产者都收到的相同消息
也是广播模式
第四个路由模式,增加了routingkey,发布订阅模式没有bingdingkey,会转发到匹配的队列,发布订阅模式的升级
第五个是通配符模式,是路由模式的升级,就是bindingkey支持通配符
星表示一个单词,#表示多个单词
第六个是RPC模式,RPC通信
- 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队
列, ⽤于接收服务端的响应. - 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
- 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以
确保它是所期望的响应
第七个发布确认模式
Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这
种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.
- ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消
息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态. - 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者
(包含消息的唯⼀ID),表明消息已经送达
通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失
的问题
是生产者和队列之间的保证
工作队列模式
简单模式中的生产者,就是把消息转发到和routingkey相同的队列中
多个消费者
因为使用的是默认交换机,所以就可以根据routingkey找到相同队列名称
先启动消费者,因为先启动生产者的话,消费者一个就一下消费完了
而且消费者要关闭close
工作队列模式就是共同消费的意思
一个生产者,一个队列,多个消费者,默认交换机
发布订阅模式
多了exchange的角色
在发布/订阅模型中,多了⼀个Exchange⻆⾊.
Exchange 常⻅有三种类型, 分别代表不同的路由规则
a) Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
b) Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
c) Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)
也就分别对应不同的⼯作模式
就是广播了
与上面的区别就是生产者要声明交换机,然后队列与交换机的绑定关系
发布订阅模式就是把收到的消息全部转发到所有的队列中,每个队列都有这个消息,所以routingkey为空
消费者的话,就是两个消费者分别消费两个队列
路由模式
发布订阅模式就是会把消息发给所有和这个交换机绑定的队列
而路由模式交换机类型不同,第二就是与队列建立绑定关系的时候,要建立bindingkey
第三就是发送消息的时候,要指定routingkey,routingkey与bindingkey一致的时候就会发送了
第三个参数就是bindingkey
通配符模式
在topic类型的交换机在匹配规则上, 有些要求:
- RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw ",
" quick.orange.rabbit " - BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.
- Binding Key中可以存在两种特殊字符串, ⽤于模糊匹配
◦ * 表⽰⼀个单词,不是一个字母
◦ # 表⽰多个单词(0-N个)
消费者获取到消息,默认就删除了
RPC通信-客户端
RPC(Remote Procedure Call), 即远程过程调⽤. 它是⼀种通过⽹络从远程计算机上请求服务, ⽽不需要了解底层⽹络的技术. 类似于Http远程调⽤.
RabbitMQ实现RPC通信的过程, ⼤概是通过两个队列实现⼀个可回调的过程
RabbitMQ 消息确定机制
在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认
消息
⾃动确认(autoAck=true): 消息队列在将消息发送给消费者后, 会⽴即从内存中删除该消息. 这意味
着, 如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
⼿动确认(autoAck=false): 消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck
⽅法来确认消息. ⼿动确认提供了更⾼的可靠性, 确保消息不会被意外丢失, 适⽤于消息处理重要且需
要确保每个消息都被正确处理的场景
发布确认
就是最后一个模式
作为消息中间件, 都会⾯临消息丢失的问题.
消息丢失⼤概分为三种情况:
- ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
- 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.
- 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费
失败的消息从队列中删除了
RabbitMQ也对上述问题给出了相应的解决⽅案. 问题2可以通过持久化机制. 问题3可以采⽤消息应答机制. (后⾯详细讲)
针对问题1, 可以采⽤发布确认(Publisher Confirms)机制实现.
⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都
会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中
deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参
数, 表⽰到这个序号之前的所有消息都已经得到了处理
第一个是批量确认,第二个不是
发送⽅确认机制最⼤的好处在于它是异步的, ⽣产者可以同时发布消息和等待信道返回确认消息.
- 当消息最终得到确认之后, ⽣产者可以通过回调⽅法来处理该确认消息.
- 如果RabbitMQ因为⾃⾝内部错误导致消息丢失, 就会发送⼀条nack(Basic.Nack)命令, ⽣产者同样
可以在回调⽅法中处理该nack命令.
使⽤发送确认机制, 必须要信道设置成confirm(确认)模式.
发布确认是 AMQP 0.9.1 协议的扩展, 默认情况下它不会被启⽤. ⽣产者通过channel.confirmSelect()将信道设置为confirm模式
有三种确认方式,单独确认,批量确认(就是发布数量达到一定数量的时候开始去确认),异步确认(另外开一个线程却确认)
可以去看官网
单独确认
这个确认的话,是虚拟机自己确认的,不关我们的事,不关消费者的事,这个一个一个确认就很慢
如果队列还要持久化这个数据的话,就更慢了
批量确认
异步确认
异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法
可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean
multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理
RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集
合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的
deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.
- 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号
deliveryTag的消息都收到了, 则清除对应集合 - 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
可以一下发布多个消息,然后multiple =true就是批量确认,deliveryTag =3,就是确认它之前的,包括它,要么multiple =false,就是确认单个
如果数据量很大的话,异步确认就要比批量确认快得多