【RabbitMQ】高级特性—事务、消息分发详解
文章目录
- 事务
- 1. 配置事务管理器
- 2. 声明队列
- 3. 生产者
- 4. 测试
- 消息分发
- 概念
- 应用场景
- 限流
- 代码示例:
- 负载均衡
- 代码示例
事务
RabbitMQ
是基于 AMQP
协议实现的,该协议实现了事务机制,因此 RabbitMQ
也支持事务机制。Spring AMQP
也提供了对事务相关的操作,RabbitMQ
事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败
1. 配置事务管理器
@Configuration
public class TransactionConfig { @Bean public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }
}
2. 声明队列
@Bean("transQueue")
public Queue transQueue() { return QueueBuilder.durable("trans_queue").build();
}
3. 生产者
@RestController
@RequestMapping("/trans")
public class TransactionProducer { @Autowired private RabbitTemplate rabbitTemplate; @Transactional @RequestMapping("/send") public String send() { rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1..."); int a = 50 / 0; rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2..."); return "发送成功"; }
}
4. 测试
- 不加
@Transactional
,会发现消息 1 发送成功 - 添加
@Transactional
,消息 1 和消息 2 全部发送失败
消息分发
概念
RabbitMQ
队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可
默认情况下 RabbitMQ
是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者速度很快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降
如何处理呢?
- 我们可以使用前面说到的
channel.basicQos(int prefetchCount)
方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量
比如:消费端调用了 channelbasicQos(5)
,RabbitMQ
会为该消费者计数,发送一条消息消息计数 +1
,消费一条消息计数 -1
,当达到了设定的上限,RabbitMQ
就不会再向它发送消息了,直到消费者确认了某条消息
- 类似
TCP/IP
中的滑动窗口
应用场景
消息分发的常见场景有如下:
- 限流
- 非公平分发
限流
如下使用场景:
订单系统每秒最多处理 5000
请求,正常情况下,订单系统可以正常满足需求,但是在秒杀时间点,请求瞬间增多,每秒 1万
个请求,如果这些请求全部通过 MQ
发送到订单系统,无疑会把订单系统压垮
RabbitMQ
提供了限流机制,可以控制消费端一次只拉取N
个请求- 通过设置
prefetchCount
参数,同时也必须要设置消息应答方式为手动应答 prefetchCount
:控制消费者从队列中预取(prefetch
)消息的数量,以此来实现流量控制和负载均衡
代码示例:
- 配置
prefetch
参数,设置应答方式为手动应答
listener: simple: acknowledge-mode: manual # 消息接收确认 prefetch: 5
- 声明队列和交换机
// 消息分发——限流
public static final String QOS_EXCHANGE_NAME = "qos_exchange";
public static final String QOS_QUEUE = "qos_queue";
- 配置交换机,队列
@Configuration
public class QosConfig { // 1. 交换机 @Bean("qosExchange") public Exchange qosExchange() { return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build(); } // 2. 队列 @Bean("qosQueue") public Queue qosQueue() { return QueueBuilder.durable(Constant.QOS_QUEUE).build(); } // 3. 队列和交换机绑定 @Bean("qosBinding") public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("qos").noargs(); }
}
- 发送消息,一次发送 20 条消息
@RequestMapping("/qos")
public String qos() { // 发送消息 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME,"qos", "qos test..." + i); } return "发送成功";
}
- 消费者监听
@Component
public class QosQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.QOS_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception{ long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag); // 3. 手动签收 //channel.basicAck(deliveryTag, true); }
}
- 测试
调用接口,发送消息:
- 发送消息时,需要先把手动确认注释掉,不然会直接消费掉
- 可以看到,控制台只打印了 5 条信息
我们观察管理平台
- 可以看到,
ready
,也就是待发送15
条,未确认的5
条(因为代码未手动ack
)
把配置里面的 prefetch: 5
注掉,然后在观察运行结果。
- 从日志和控制台上可以看到:消费者会一次性把
20
条消息全部收到
管理平台:
负载均衡
我们也可以用此配置,来实现“负载均衡”
如下图所示,在有两个消费者的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费老会一直很忙,而另一个消费者很闲
- 这是因为
RabbitMQ
只是在消息进入队列时分派消息,他不考虑消费者未确认消息的数量
我们可以使用设置 prefetch=1
的方式,告诉 RabbitMQ
一次只给一个消费者一条信息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者
代码示例
- 配置
prefetch
参数,设置应答方式为手动应答
listener: simple: acknowledge-mode: manual # 消息接收确认 prefetch: 1
- 启动两个消费者
- 使用
Thread.sleep(100)
来模拟消费满
- 使用
@Component
public class QosQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.QOS_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception{ long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag); // 3. 手动签收 channel.basicAck(deliveryTag, true); } // 指定监听队列的名称 @RabbitListener(queues = Constant.QOS_QUEUE) public void ListenerQueue2(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag); // 模拟处理流程慢 Thread.sleep(100); // 手动签收 channel.basicAck(deliveryTag, true); }
}
- 测试
调用接口,发送消息
- 通过日志观察两个消费者消费的消息