事务设置和消息分发
事务
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.
SpringAMQP也提供了对事务相关的操作,RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么
全部成功,要么全部失败.|
前期准备工作:
//事务public static final String TRANS_QUEUE = "TRANS_QUEUE";public static final String TRANS_EXCHANGE = "TRANS_EXCHANGE";public static final String TRANS_KEY = "TRANS_KEY";
//事务@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(MQConstants.TRANS_QUEUE).build();}@Bean("transExchange")public Exchange transExchange() {return ExchangeBuilder.directExchange(MQConstants.TRANS_EXCHANGE).build();}@Bean("transBinding")public Binding transBinding(@Qualifier("transExchange") Exchange exchange, @Qualifier("transQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.TRANS_KEY).noargs();}
配置事务管理器
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}
添加 @Transactional
如果不添加 @Transactional,我们的事务管理器是不会在这个代码上生效的
@Transactional@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans1 ");int n = 10 / 0;rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans2 ");return "消息发送成功";}
经过观察,我们可以看到这两条消息要么同时发送成功,要么同时发送失败
消息分发
RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者,这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经收到消费并已经确认了消息,这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。
如何处理呢?我们可以使用前面章节讲到的**channel.basicQos(intprefetchCount)**方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量比如:消费端调用了channelbasicQos(5),RabbitMQ会为该消费者数,发送一条消息计数+1,消费一条消息计数-1,当达到了设定的上限,RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息。
类似TCP/IP中的"滑动窗口".
prefetchCount设置为0时表示没有上限。
basicQos对拉模式的消费无效
限流和负载均衡
配置信息:
listener:simple:acknowledge-mode: manual # 设置确认模式prefetch: 5
前期准备:
常量类:
//限流public static final String QOS_QUEUE = "QOS_QUEUE";public static final String QOS_EXCHANGE = "QOS_EXCHANGE";public static final String QOS_KEY = "QOS_KEY";
声明:
//限流@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(MQConstants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(MQConstants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.QOS_KEY).noargs();}
生产者:
@RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.QOS_EXCHANGE, MQConstants.QOS_KEY, "qos");}return "消息发送成功";}
如果我们不进行手动确认,观察最大未确认的消息接收量:
可以得知消费者最大接收到的未确认消息数量为 我们设置的 prefetch 值
我们可以通过限流这种方式实现负载均衡
@Component
public class QosListener {@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle1(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(10000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle2(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(5000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
可以观察到消费能力强的队列会持续消费消息,消费能力弱的队列消费的消息会相对较少