当前位置: 首页 > news >正文

RabbitMQ消息队列——三个核心特性

1、消息过期机制

可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了。

  • 示例场景:消费者(库存系统)挂了,一个订单15分钟还没被库存系统处理,这个订单其实已经失效了,哪怕库存系统再恢复,其实也不用扣减库存。
  • 示例场景:清理过期数据、模拟延迟队列的实现(不开会员就慢速)、专门让某个程序出路过期请求。

第一种方式是给队列中的所有消息指定一个统一的过期时间。也就是说,无论何时进入这个队列的消息,在特定的时间点都会过期失效。这种方式是针对整个队列而言。

第二种方式是给某条具体的消息指定过期时间。这意味着,针对特定的消息我们可以指定一个独立的过期时间。这样,在达到指定的时间后,这条消息将会过期并自动失效。

第一种方式
生产者代码:

package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class TtlProducer {// 定义队列名称为"ttl_queue"private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 建立连接、创建频道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 消息虽然可以重复声明,必须指定相同的参数,在消费者的创建队列要指定过期时间,// 后面要放args,在生产者你又想重新创建队列,又不指定参数,那肯定会有问题,// 所以要把这里的创建队列注释掉。// channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发送消息String message = "Hello World!";// 使用默认的交换机,将消息发送到指定队列channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消费者代码:

package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;public class TtlConsumer {// 定义我们正在监听的队列名称"ttl_queue"private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接工厂的主机地址为 "localhost"factory.setHost("localhost");// 建立连接Connection connection = factory.newConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列,指定消息过期参数Map<String, Object> args = new HashMap<String, Object>();// 设置消息过期时间为5秒args.put("x-message-ttl", 5000);// 创建队列,并传入队列名称、是否持久化、是否私有、是否自动删除,args 指定参数channel.queueDeclare(QUEUE_NAME, false, false, false, args);// 打印等待消息的提示信息System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义了如何处理消息的回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};// 消费消息,该方法会持续阻塞,等待接收消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

第二种方式
生产者代码:

// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()// 设置消息的过期时间为1000毫秒.expiration("1000").build();
// 发布消息到指定的交换机("my-exchange")和路由键("routing-key")
// 使用指定的属性(过期时间)和消息内容(UTF-8编码的字节数组)
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

2、消息确认机制

为了保证消息成功被消费(快递成功被取走),rabbitmg 提供了消息确认机制,当消费者接收到消息后,比如要给一个反馈:

  • ack:消费成功
  • nack:消费失败
  • reject:拒绝

如果告诉 rabbitmg 服务器消费成功,服务器才会放心地移除消息。

3、死信队列

为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?

  • 死信:指过期的消息、被拒收的消息、消息队列已满以及处理失败的消息的统称。
  • 死信队列:专门用来处理死信的队列(实际上是一个普通的队列,但被专门用来接收并处理死信消息。可以将它理解为名为"死信队列"的队列)。
  • 死信交换机:用于将死信消息转发到死信队列的交换机,也可以设置路由绑定来确定消息的路由规则(是一个普通的交换机,只是被专门用来将消息发送到死信队列。可以将其理解为名为"死信交换机“的交换机)。

死信可以通过将死信交换机绑定到死信队列来实现。这样,当消息被标记为死信时,它将被转发到死信交换机,并最终路由到死信队列进行处理。

在这里插入图片描述
死信队列主要用于处理以下情况下的死信消息。根据官方文档的说明,有以下三种情况:
1、消息被拒绝:当消费者使用 basic.reject或 basic.nack拒绝消息,并将 requeue参数设置为 false ,意味着不将消息重新放回队列,这时消息就成为了死信。
2、消息过期:当消息的过期时间设置,并且消息在队列中等待时间超过了设定的过期时间,该消息就会变成死信。
3、队列长度限制:当队列达到了设置的最大长度限制,新进入的消息将无法被存储,而被直接丢弃。这些无法进入队列的消息被视为死信。

生产者代码:

package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.Scanner;public class DlxDirectProducer {// 定义死信交换机名称为"dlx-direct-exchange"private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";// 定义本来的业务交换机名称为"direct2-exchange"private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");// 创建老板的死信队列,随机分配一个队列名称String queueName = "laoban_dlx_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");// 创建外包的死信队列,随机分配一个队列名称String queueName2 = "waibao_dlx_queue";channel.queueDeclare(queueName2, true, false, false, null);channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");// 创建用于处理老板死信队列消息的回调函数,当接收到消息时,拒绝消息并打印消息内容DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [laoban] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 创建用于处理外包死信队列消息的回调函数,当接收到消息时,拒绝消息并打印消息内容DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [waibao] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 注册消费者,用于消费老板的死信队列,绑定回调函数channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {});// 注册消费者,用于消费外包的死信队列,绑定回调函数channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {});// 创建一个Scanner对象用于从控制台读取用户输入Scanner scanner = new Scanner(System.in);// 进入循环,等待用户输入消息和路由键while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];// 发布消息到业务交换机,带上指定的路由键channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));// 打印发送的消息和路由键System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");}}}//..
}

消费者代码:

package com.yupi.springbootinit.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class DlxDirectConsumer {// 定义我们正在监听的死信交换机名称"dlx-direct-exchange"private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";// 定义我们正在监听的业务交换机名称"direct2-exchange"private static final String WORK_EXCHANGE_NAME = "direct2-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");// 创建用于指定死信队列的参数的Map对象Map<String, Object> args = new HashMap<>();// 将要创建的队列绑定到指定的交换机,并设置死信队列的参数args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);// 指定死信要转发到外包死信队列args.put("x-dead-letter-routing-key", "waibao");// 创建新的小狗队列,并将其绑定到业务交换机,使用"xiaodog"作为路由键String queueName = "xiaodog_queue";channel.queueDeclare(queueName, true, false, false, args);channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");Map<String, Object> args2 = new HashMap<>();args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);// 指定死信要转发到老板死信队列args2.put("x-dead-letter-routing-key", "laoban");// 创建新的小猫队列,并将其绑定到业务交换机,使用"xiaocat"作为路由键String queueName2 = "xiaocat_queue";channel.queueDeclare(queueName2, true, false, false, args2);channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");// 打印等待消息的提示信息System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建用于处理小狗队列消息的回调函数,当接收到消息时,拒绝消息并打印消息内容DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaodog] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 创建用于处理小猫队列消息的回调函数,当接收到消息时,拒绝消息并打印消息内容DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println(" [xiaocat] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 注册消费者,用于消费小狗队列,绑定回调函数,自动确认消息改为falsechannel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {});// 注册消费者,用于消费小猫队列,绑定回调函数channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {});}
}
http://www.dtcms.com/a/272960.html

相关文章:

  • LeetCode 1652. 拆炸弹
  • AI时代的接口调试与文档生成:Apipost 与 Apifox 的表现对比
  • Leetcode刷题营第十九题:对链表进行插入排序
  • Python 网络爬虫中 robots 协议使用的常见问题及解决方法
  • 图解 BFS 路径搜索:LeetCode1971
  • 芯片I/O脚先于电源脚上电会导致Latch-up(闩锁效应)吗?
  • Logback日志框架配置实战指南
  • 5种使用USB数据线将文件从安卓设备传输到电脑的方法
  • 【JavaScript 函数、闭包与 this 绑定机制深度解析】
  • 【C语言】指针笔试题2
  • 模块三:现代C++工程实践(4篇)第二篇《性能调优:Profile驱动优化与汇编级分析》
  • FlashAttention 快速安装指南(避免长时间编译)
  • QT网络通信底层实现详解:UDP/TCP实战指南
  • Centos 7下使用C++使用Rdkafka库实现生产者消费者
  • 【LeetCode 热题 100】19. 删除链表的倒数第 N 个结点——双指针+哨兵
  • 学习 Flutter (一)
  • html的outline: none;
  • C++STL-deque
  • 1. COLA-DDD的实战
  • 【基础架构】——软件系统复杂度的来源(低成本、安全、规模)
  • 告别卡顿与慢响应!现代 Web 应用性能优化:从前端渲染到后端算法的全面提速指南
  • IDEA运行Spring项目报错:java: 警告: 源发行版 17 需要目标发行版 17,java: 无效的目标发行版: 17
  • Cargo.toml 配置详解
  • 【科研绘图系列】R语言探索生物多样性与地理分布的可视化之旅
  • 网安-解决pikachu-rce乱码问题
  • 访问Windows服务器备份SQL SERVER数据库
  • (C++)任务管理系统(文件存储)(正式版)(迭代器)(list列表基础教程)(STL基础知识)
  • x86交叉编译ros 工程给jetson nano运行
  • Rust and the Linux Kernel
  • Sophix、Tinker 和 Robust 三大主流 Android 热修复框架的详细对比