当前位置: 首页 > news >正文

【RabbitMQ】高级特性—事务、消息分发详解

文章目录

  • 事务
    • 1. 配置事务管理器
    • 2. 声明队列
    • 3. 生产者
    • 4. 测试
  • 消息分发
    • 概念
    • 应用场景
      • 限流
        • 代码示例:
      • 负载均衡
        • 代码示例

事务

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作,RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败

1. 配置事务管理器

@Configuration  
public class TransactionConfig {  @Bean  public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {  return new RabbitTransactionManager(connectionFactory);  }  public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setChannelTransacted(true);  return rabbitTemplate;  }  
}

2. 声明队列

@Bean("transQueue")  
public Queue transQueue() {  return QueueBuilder.durable("trans_queue").build();  
}

3. 生产者

@RestController  
@RequestMapping("/trans")  
public class TransactionProducer {  @Autowired  private RabbitTemplate rabbitTemplate;  @Transactional  @RequestMapping("/send")  public String send() {  rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");  int a = 50 / 0;  rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");  return "发送成功";  }  
}

4. 测试

  1. 不加 @Transactional,会发现消息 1 发送成功
  2. 添加 @Transactional,消息 1 和消息 2 全部发送失败

消息分发

概念

RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可

默认情况下 RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者速度很快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降


如何处理呢?

  • 我们可以使用前面说到的 channel.basicQos(int prefetchCount) 方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量

比如:消费端调用了 channelbasicQos(5)RabbitMQ 会为该消费者计数,发送一条消息消息计数 +1,消费一条消息计数 -1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息

  • 类似 TCP/IP 中的滑动窗口

应用场景

消息分发的常见场景有如下:

  1. 限流
  2. 非公平分发

限流

如下使用场景:
订单系统每秒最多处理 5000 请求,正常情况下,订单系统可以正常满足需求,但是在秒杀时间点,请求瞬间增多,每秒 1万 个请求,如果这些请求全部通过 MQ 发送到订单系统,无疑会把订单系统压垮image.png

  • RabbitMQ 提供了限流机制,可以控制消费端一次只拉取 N 个请求
  • 通过设置 prefetchCount 参数,同时也必须要设置消息应答方式为手动应答
  • prefetchCount:控制消费者从队列中预取(prefetch)消息的数量,以此来实现流量控制和负载均衡
代码示例:
  1. 配置 prefetch 参数,设置应答方式为手动应答
listener:  simple:  acknowledge-mode: manual  # 消息接收确认   prefetch: 5
  1. 声明队列和交换机
// 消息分发——限流  
public static final String QOS_EXCHANGE_NAME = "qos_exchange";  
public static final String QOS_QUEUE = "qos_queue";
  1. 配置交换机,队列
@Configuration  
public class QosConfig {  // 1. 交换机  @Bean("qosExchange")  public Exchange qosExchange() {  return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();  }  // 2. 队列  @Bean("qosQueue")  public Queue qosQueue() {  return QueueBuilder.durable(Constant.QOS_QUEUE).build();  }  // 3. 队列和交换机绑定  @Bean("qosBinding")  public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();  }  
}
  1. 发送消息,一次发送 20 条消息
@RequestMapping("/qos")  
public String qos() {  // 发送消息  for (int i = 0; i < 20; i++) {  rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME,"qos", "qos test..." + i);  }  return "发送成功";  
}
  1. 消费者监听
@Component  
public class QosQueueListener {  // 指定监听队列的名称  @RabbitListener(queues = Constant.QOS_QUEUE)  public void ListenerQueue(Message message, Channel channel) throws Exception{  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  // 3. 手动签收  //channel.basicAck(deliveryTag, true);  }  
}
  1. 测试

调用接口,发送消息:

  • 发送消息时,需要先把手动确认注释掉,不然会直接消费掉
  • 可以看到,控制台只打印了 5 条信息image.png|359

我们观察管理平台image.png

  • 可以看到,ready,也就是待发送 15 条,未确认的 5 条(因为代码未手动 ack

把配置里面的 prefetch: 5 注掉,然后在观察运行结果。

  • 从日志和控制台上可以看到:消费者会一次性把 20 条消息全部收到image.png

管理平台:image.png

负载均衡

我们也可以用此配置,来实现“负载均衡”

如下图所示,在有两个消费者的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费老会一直很忙,而另一个消费者很闲

  • 这是因为 RabbitMQ 只是在消息进入队列时分派消息,他不考虑消费者未确认消息的数量image.png

我们可以使用设置 prefetch=1 的方式,告诉 RabbitMQ 一次只给一个消费者一条信息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者

代码示例
  1. 配置 prefetch 参数,设置应答方式为手动应答
listener:  simple:  acknowledge-mode: manual  # 消息接收确认   prefetch: 1
  1. 启动两个消费者
    • 使用 Thread.sleep(100) 来模拟消费满
  
@Component  
public class QosQueueListener {  // 指定监听队列的名称  @RabbitListener(queues = Constant.QOS_QUEUE)  public void ListenerQueue(Message message, Channel channel) throws Exception{  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  // 3. 手动签收  channel.basicAck(deliveryTag, true);  }  // 指定监听队列的名称  @RabbitListener(queues = Constant.QOS_QUEUE)  public void ListenerQueue2(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  // 模拟处理流程慢  Thread.sleep(100);  // 手动签收  channel.basicAck(deliveryTag, true);  }  
}
  1. 测试

调用接口,发送消息

  • 通过日志观察两个消费者消费的消息image.png|334
http://www.dtcms.com/a/322418.html

相关文章:

  • 【n8n】学习n8n【10】:Github的项目n8n-workflows:本地安装2,053 个 n8n 工作流程集合:随时看随时抄/学习~
  • 基于开源AI大模型、AI智能名片与S2B2C商城小程序的零售智能化升级路径研究
  • Python训练Day38
  • Nginx 反向代理与负载均衡架构
  • 基于开源AI大模型、AI智能名片与S2B2C商城小程序的学习型社群构建与运营模式创新研究
  • 深度学习中基于响应的模型知识蒸馏实现示例
  • 开发手札:UnrealEngine和Unity3d坐标系问题
  • K-means聚类学习:原理、实践与API解析
  • AI大语言模型在生活场景中的应用日益广泛,主要包括四大类需求:文本处理、信息获取、决策支持和创意生成。
  • 《Learning To Count Everything》论文阅读
  • 动态路由菜单:根据用户角色动态生成菜单栏的实践(包含子菜单)
  • 使用加密技术实现个人密码本保护
  • try/catch/throw 简明指南
  • orcad的操作(1)
  • 写 SPSS文件系统
  • Docker容器
  • 多级缓存详解
  • RAG-大模型课程《李宏毅 2025》作业1笔记
  • 从“人拉肩扛”到“智能协同”——AGV重构消防智能仓储价值链
  • 我用C++和零拷贝重构了文件服务器,性能飙升3倍,CPU占用降低80%
  • 202506 电子学会青少年等级考试机器人二级理论综合真题
  • Spark02 - SparkContext介绍
  • 304 引发的 SEO 难题:缓存策略与内容更新如何两全?
  • 【ref、toRef、toRefs、reactive】ai
  • 比较useCallback、useMemo 和 React.memo
  • kafka架构原理快速入门
  • Opencv[七]——补充
  • 基于HTML的政策问答
  • java组件安全vulhub靶场
  • HTML金色流星雨