当前位置: 首页 > news >正文

RabbitMQ延时队列的两种实现方式

目录

一、延时插件实现

1、版本要求

2、为运行新容器时安装

3、为已运行的容器安装

4、验证安装

5、代码编写

1. 配置类

2. 生产者

3. 消费者

二、死信队列实现

1、代码编写

1. 配置类

2. 生产者

3. 消费者

三、踩坑记录

1、发送消息失败

2、消息过期后未能转发到死信队列

3、消费者消费报错


一、延时插件实现

1、版本要求

RabbitMQ 3.5.7以上

2、为运行新容器时安装

# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
​
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

3、为已运行的容器安装

# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
​
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 3. 退出容器
exit
​
# 4. 重启容器使插件生效
docker restart rabbitmq

4、验证安装

# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
​
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项

5、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
​@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
​@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2. 生产者

public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
​rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

二、死信队列实现

1、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
​public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
​// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
​// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
​// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
​// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
​// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}

2. 生产者

public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
​Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

三、踩坑记录

1、发送消息失败

原因RabbitTemplate 配置了消息抵达确认,消息ID没有传值。

RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});

生产者实际发送消息未传消息ID:

错误格式

rabbitTemplate.convertAndSend(exchange, routingKey, data);

正确格式

String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));

2、消息过期后未能转发到死信队列

原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。

错误格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}

正确格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}

3、消费者消费报错

原因:发送的消息由于自定义的 MessageProperties ,其中缺失了 contentType 参数,需要使用转化器进行转换,而不是直接发送消息。

错误格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));

正确格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));

http://www.dtcms.com/a/316841.html

相关文章:

  • C++算法竞赛篇(九)字符数组题型讲解
  • 坚鹏:AI智能体软件是知行学成为AI智能体创新应用引领者的抓手
  • uvm-register-backdoor-access
  • SpringBoot AI心理学训练实战
  • 更改CodeBuddy的默认terminal为Git Bash
  • 随机森林算法详解:从集成学习原理到代码实现
  • Java技术栈/面试题合集(11)-设计模式篇
  • java web 未完成项目,本来想做个超市管理系统,前端技术还没学。前端是个简单的html。后端接口比较完善。
  • MySQL内外连接详解
  • 学习笔记-相似度匹配改进2
  • 机器学习——随机森林
  • Python高级编程与实践:Python高级数据结构与编程技巧
  • 【C++】Stack and Queue and Functor
  • C++二级考试核心知识点【内附操作题真题及解析】
  • Juc高级篇:可见性,有序性,cas,不可变,设计模式
  • SpringMVC(一)
  • Design Compiler:布图规划探索(ICC)
  • 《失落王国》v1.2.8中文版,单人或联机冒险的低多边形迷宫寻宝游戏
  • Modbus tcp 批量写线圈状态
  • centos7上如何安装Mysql5.5数据库?
  • 跨域场景下的Iframe事件监听
  • 【机器学习深度学习】模型量化
  • OSPF作业
  • Linux 基础
  • vue3 计算方式
  • GPS信号捕获尝试(上)
  • 【android bluetooth 协议分析 01】【HCI 层介绍 30】【hci_event和le_meta_event如何上报到btu层】
  • 【三个数公因数】2022-10-7
  • MySQL CONV()函数
  • 永磁同步电机无速度算法--基于二自由度结构的反推观测器TSBO