当前位置: 首页 > news >正文

RabbitMQ:延时消息(死信交换机、延迟消息插件)

目录

  • 一、死信交换机【不推荐】
  • 二、延迟消息插件【推荐】
    • 2.1 安装插件【Linux】
    • 2.2 安装插件【Windows】
    • 2.3 如何使用


延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延时任务:设置一定时间之后才执行的任务。

一、死信交换机【不推荐】

当一个队列的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false。
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。
  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机也称之为死信交换机

在这里插入图片描述
具体实现流程如下:

  1. 首先创建两个队列direct.queue、dlx.queue,需要注意的是在创建direct.queue队列时,需要绑定死信交换机。

在这里插入图片描述
如何绑定死信交换机:选中Dead letter exchange输入交换机的名称
在这里插入图片描述
2. 创建两个交换机分别绑定两个队列mt.direct、mt.dlx.direct

在这里插入图片描述
3. 消费者监听死信队列,并给mt.direct发送定时消息

@Test
public void dlxExchangeTest(){String exchangeName = "mt.direct";String message = "黄色警报 ......";rabbitTemplate.convertAndSend(exchangeName, "dlx", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1000");  // 设置过期时间,单位ms,1000=1sreturn message;}});
}
@RabbitListener(queues = "dlx.queue")
public void listenDlxQueue(String message){System.out.println(String.format("消费者收到了dlx.queue: %s", message));
}

二、延迟消息插件【推荐】

要想使用延迟消息,需要先安装延迟消息插件rabbitmq_delayed_message_exchange,根据自己RabbitMQ的版本去下载。

2.1 安装插件【Linux】

去官网下载插件。
在这里插入图片描述
将插件放入RabbitMQ的plugins中,具体路径如下:

/usr/lib/rabbitmq/lib/rabbitmq_server-3.13.7/plugins

安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启RabbitMQ服务

systemctl restart rabbitmq-server

再次登录rabbitmq,如果exchange的类型中出现:x-delayed-message,说明该插件安装成功!
在这里插入图片描述

2.2 安装插件【Windows】

将插件放入RabbitMQ的plugins中,具体路径如下:

xxx\RabbitMQ Server\rabbitmq_server-3.12.10\plugins

然后进入到RabbitMQ额度sbin目录下,执行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2.3 如何使用

rabbitmq_delayed_message_exchange插件的实现原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一段时间,到期后在投递到队列。

Java注解的实现方式

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "mt.delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayQueue(String message){System.out.println(String.format("消费者收到了delay.queue: %s", message));
}

Java Bean的实现方式

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("mt.delay.direct").durable(true).delayed().build();
}

消费者

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "mt.delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayQueue(String message){System.out.println(String.format("消费者收到了delay.queue: %s", message));
}

生产者

@Test
public void delayExchangeTest(){String exchangeName = "mt.delay.direct";String message = "延迟警报 ......";rabbitTemplate.convertAndSend(exchangeName, "delay", message, new DelayMessageProcessor(5000));
}
package com.ming.processor;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** @RequiredArgsConstructor 是Lombok库提供的一个注解,用于自动生成包含必需参数的构造函数。必需参数是指那些被声明为 final 或者有 @NonNull 注解的成员变量。*/
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {/*** 定义延迟时间*/private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}
http://www.dtcms.com/a/342090.html

相关文章:

  • 领域专用AI模型训练指南:医疗、法律、金融三大垂直领域微调效果对比
  • 28、工业网络资产漏洞扫描与风险评估 (模拟) - /安全与维护组件/industrial-network-scanner
  • 深度解析Atlassian 团队协作套件(Jira、Confluence、Loom、Rovo)如何赋能全球分布式团队协作
  • Whisk for Mac 网页编辑器 PHP开发
  • 牛客:链表的回文结构详解
  • NewsNow搭建喂饭级教程
  • SQL中对视图的操作命令汇总
  • STM32H750 CoreMark跑分测试
  • [最新]Dify v1.7.2版本更新:工作流可视化和节点搜索
  • 2025 年 8 月《GPT-5 家族 SQL 能力评测报告》发布
  • SQL视图、存储过程和触发器
  • OBCP第四章 OceanBase SQL 调优学习笔记:通俗解读与实践指南
  • CentOS 7安装FFmpeg
  • QT QProcess, WinExec, ShellExecute中文路径带空格程序或者脚本执行并带参数
  • Qt实现TabWidget通过addTab函数添加的页,页内控件自适应窗口大小
  • Qt文件压缩工具项目开发教程
  • 【Bug】CentOS 7 使用vim命令报错vim: command not found
  • 开源 C++ QT Widget 开发(四)文件--二进制文件查看编辑
  • Elasticsearch官方文档学习-未完待续
  • java项目:如何优化JVM参数?
  • 【深入理解 Linux 网络】收包原理与内核实现(下) 从 TCP 传输层到应用
  • 遥感机器学习入门实战教程|Sklearn案例⑤:集成学习方法全览
  • ES_flattened
  • Nacos部署微服务
  • Python机器学习入门:用scikit-learn构建你的第一个预测模型
  • 安装nvtop编译报错:fatal error: linux/kcmp.h: No such file or directory
  • 亚远景科技助力力邦合信通过ASPICE CL2评估
  • 今日科技焦点 | A股科技芯片受追捧,美股科技股承压——技术赛道的资本与市场博弈
  • 云计算下数据隐私保护系统的设计与实现(LW+源码+讲解+部署)
  • 2025高性能氢气传感器领域的创新引领者:杭州德克西智能科技有限公司