一篇文章入门RabbitMQ:基本概念与Java使用
Java配置RabbitMQ
spring:rabbitmq:host: 192.168.88.130port: 5672virtual-host: /hostusername: rootpassword: 123
发送消息
@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() {String queueName = "simple.queue";String msg = "hello,amqp";rabbitTemplate.convertAndSend(queueName,msg);}
}
接收消息
一个队列绑定多个消费者的情形
轮询
默认情况:会将消息轮询投递给消费者
只是简单的分配消息,没有考虑消息的复杂程度,可能会出现消息堆积
能者多劳
修改application.yml设置preFerch为1,确保同一时刻最多投递消费者1条消息,消费者处理完才能获取下一个消息
listener:simple:prefetch: 1
交换机
接受publisher发送的消息
Fanout交换机
将消息广播到每一个跟其绑定的queue
Direct交换机
每个queue与exchange都会设置BindingKey
发布者发布消息时指定消息的RoutingKey
exchange将消息路由到BindingKey与RoutingKey一致的队列
Topic交换器
Routing Key可以是多个单词的列表,并且以.分割
通配符: xx.# / #.xx
声明队列与交换机
控制台
SpringAMQP
基于Bean
- Queue:声明队列,用工厂类QueueBuilder构建
- Exchange:声明交换机,用工厂类ExchangeBuilder构建
- Binding:声明队列和交换机的绑定关系,用工厂类BindingBuilder构建
基于注解
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "hmall.queue1"),exchange = @Exchange(name = "hmall.exchange1", type = "direct"),key = {"red","blue"}//创建两个队列,Key分别为red和blue
))
消息转换器
当发送对象时,spring会把对象转换为字节
JDK序列化
Spring默认MessageConvert处理,实现是SimpleMessageConverter,基于JDK的ObjectOutputStream序列化
- JDK的序列化有安全风险
- 消息太大
- 消息可读性差
JSON序列化
用JSON序列化代替JDK序列化
<!-- JSON--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
在publisher和consumer启动项/配置类中都要配置MessageConverter
@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
可靠性问题
生产者可靠性
生产者重连
spring:rabbitmq:connection-timeout: 1s #MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数max-attempts: 3 #最大重试次数
是阻塞式的重试,在多次重试的等待过程中,当前线程是被阻塞的,影响业务性能
生产者确认
开启确认机制后,MQ收到消息会将确认消息返回给生产者
spring:rabbitmq:publisher-confirm-type: correlated #开始publisher confirm机制,#none关闭机制,simple同步阻塞等待回执消息,correlated异步返回回执消息 publisher-returns: true #开启publisher returns机制,返回失败消息
在publisher项目启动项配置ReturnCallback
@Configuration
@Slf4j
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){@Overridepublic void returnedMessage(ReturnedMessage returned) {log.debug("收到消息的return callback,{},{},{},{},{}",returned.getExchange(),returned.getReplyCode(),returned.getReplyText(),returned.getRoutingKey(),returned.getMessage());}});}
}
发送消息,指定消息ID,消息ConfirmCallback
@Testvoid testConfirmCallback(){//创建cdCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());//添加confirmcallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm result) {log.info("消息发送成功");if(result.isAck()){log.info("消息发送成功");//收到ack}else{//收到nacklog.error("消息发送失败");}}@Overridepublic void onFailure(Throwable ex) {log.error("消息发送失败,原因{}",result.getReason);}});rabbitTemplate.convertAndSend("confirm.exchange","1","hello,confirm",cd);}
MQ可靠性
默认情况下,RabbitMQ会将收到的消息保存在内存中以降低收发消息的延迟,会导致两个问题
- MQ宕机,内存中的消息会消失
- 内存空间有限,消费者故障或消息处理过慢,会导致消息堆积,引发MQ阻塞
数据持久化
- 持久化队列(Persistent Queue)
队列本身的元数据(如名称、配置)和队列中的消息会被持久化存储(通常写入磁盘或数据库)。即使消息队列服务重启、崩溃或服务器断电,队列和消息也不会丢失,恢复服务后可继续处理。 - 交换机持久化
- 消息持久化 delivery_mode = 2
Lazy Queue
- 接收到消息直接存入磁盘而非内存(内存只保留最近的消息,默认2048条)
- 消费者要消费信息时才会从磁盘中读取到内存
- 支持数百万条消息存储
消费者可靠性
消费者确认机制
消费者处理消息结束后,应该向RabbitMQ发送回执,说明信息处理状态
- ack: 成功处理消息,RabbitMQ从队列中删除该消息
- nack: 失败,需要再次投递消息
- reject: 失败并拒绝该消息,RabbitMQ从队列中删除该消息
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: auto #none关闭, auto自动, manual手动
none:消息投递到消费者立刻返回ack
消息失败处理
当消费者出现异常时,消息会不断requeue,无限循环,使mq的消息处理飙升
解决方法:使用retry机制
spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true #开启消费者失败重试max-attempts: 3 #最大重试次数initial-interval: 1000 #第一次重试间隔multiplier: 1 #每次重试间隔倍数stateless: true #无状态;false有状态。默认无状态,如果业务中包含事务改为false
如果重试次数耗尽消息依然失败,则需要有MessageRecoverer接口来处理
- RejectAndDontRequeueRecoverer:重试耗尽后直接reject,默认
- ImmediateRequeueMessageRecovere: 返回nack,消息重新入队
- RepublishMessageRecoverer: 将失败消息投递到指定的交换机,如error.direct,之后由人工处理(由该交换机路由到对应的错误队列(如
error.queue
)。)
业务幂等性
幂等 指的是f(x) = f(f(x))。在程序开发中指同一个业务执行一次或多次对业务状态的影响是一致的。
- 给每个消息设置唯一的id,业务处理成功后将id放入数据库,下次接受消息在数据库查询id是否存在
@Bean
public MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jjmc = new Jackson2JasonMessageConverter();jjmc.setCreateMessageIds(true);//发送消息时附带UUIDreturn jjmc;
}
- 结合业务本身
延迟消息
生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息
应用:延迟任务
死信交换机
成为死信(dead letter)条件:
- 消费者使用reject,nack声明消费失败,并且requeue为false
- 消息是一个过期消息,超时无人消费
- 消息队列堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,死信就会投递到这个交换机中,这个交换机为死信交换机(Dead Letter Exchange,DLX)
延迟消息插件
原理是设计了一种支持延迟消息功能的交换机,消息投递到交换机后可以暂存一定时间,到期后再投递到队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "hmall.queue1",durable = "true"),exchange = @Exchange(name = "hmall.exchange1", type = "direct",delayer = "true"),key = {"red","blue"}//创建两个队列,Key分别为red和blue,延迟消息设为true,持久化队列
))