当前位置: 首页 > news >正文

RabbitMQ 学习

MQ 的相关概念

什么是 MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦 + 物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

为什么要用 MQ

使用 MQ 主要有3个目的:

  1. 流量消峰

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

  1. 应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

在这里插入图片描述

  1. 异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。

在这里插入图片描述

MQ 底层实现的两大主流方式

由于 MQ 执行的是跨应用的信息传递,所以制定底层通信标准非常重要。

目前主流的 MQ 通信协议标准包括:

  • AMQP(Advanced Message Queuing Protocol):通用西医,IBM 公司研发。
  • JMS(Java Message Service):专门为 java 语言服务,SUN 公司研发,一组由 java 接口组成的 java 标准。

AMQP 对比 JMS

AMQPJMS
七层网络模型传输层与会话层应用层
消息模型支持多种消息模型。包括点对点(P2P)和发布/订阅(Pub/Sub)主要支持点对点和发布/订阅两种消息模型
支持的编程语言和平台支持多种编程语言和平台,包括 Java、C++、Python 等主要支持 Java 平台
可靠性提供了强大的消息可靠性保证,包括消息持久化、事务性消息和消息确认机制支持消息持久化和事务性消息,但具体实现取决于消息传递系统的提供者
传输协议使用二进制协议进行消息传递,提供了高效、可靠的消息投递机制使用面向文本的协议,传输效率较低
拓展性和兼容性具有很好的拓展性和兼容性,可以在不同的消息代理之间交互操作在 Java 环境中有较好的拓展性和兼容性,但在非 Java 环境集成时受到限制

MQ 的分类

ActiveMQ

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。

kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber,Twitter,Netflix 等大公司所采纳。

优点:性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。

缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢。

RocketMQ

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ

缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。

RabbitMQ

2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点:由于 erlang 语言的高并发特性,性能较好,吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全,开源提供的管理界面非常棒,用起来很好用,社区活跃度高,更新频率相当高

缺点:商业版需要收费,学习成本较高。

MQ的选择

kafka

Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了

RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况,RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

RabbitMQ

结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

RabbitMQ

概念

RabbitMQ 是一个消息中间件,它接受并转发消息,你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件,RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

四大核心概念

  1. 生产者

    产生数据发送消息的程序是生产者

  2. 消费者

    消费与接收具有相似的含义,消费者大多时候是一个等待接收消息的程序,请注意生产者,消费者和消息中间件很多时候并不在同一机器上,同一个应用程序既可以是生产者又是可以是消费者

  3. 交换机

    交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中,交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

  4. 队列

    队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式

RabbitMQ核心部分

在这里插入图片描述

  1. 简单模式
  2. 工作模式
  3. 发布订阅模式
  4. 路由模式
  5. 主题模式
  6. RPC模式

名词介绍

在这里插入图片描述

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念,当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

安装

  1. 官网地址:https://www.rabbitmq.com/download.html
  2. 文件上传:上传到 /usr/local/software 目录下(如果没有 software 需要自己创建)

在这里插入图片描述

  1. 安装文件:分别按照以下顺序安装
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
  1. 常用命令
# 添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
# 启动服务
/sbin/service rabbitmq-server start
# 查看服务状态
/sbin/service rabbitmq-server status
# 停止服务
/sbin/service rabbitmq-server stop
# 开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
  1. 添加一个新的用户
# 创建账号
rabbitmqctl add_user admin 123
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
# 当前用户和角色
rabbitmqctl list_users
  1. 再次利用 admin 用户登录

在这里插入图片描述

  1. 重置命令
# 关闭应用的命令为
rabbitmqctl stop_app
# 清除的命令为
rabbitmqctl reset
# 重新启动命令为
rabbitmqctl start_app

windows 启动 rabbitmq 服务

  • 配置 erlang 环境
  • 下载 rabbitmq 服务
  • 进入sbin目录下执行命令
rabbitmq-plugins enable rabbitmq_management
# 启动服务
rabbitmq-server.bat
  • 在浏览器访问http://localhost:15672

初始化账号和密码都是:guest

RabbitMQ 交换机

RabbitMQ 交换机是消息代理的核心组件,负责接收生产者发送的消息,并根据路由规则将消息分发到绑定队列。它在 AMQP 模型中扮演着关键角色,支持高可靠性、高吞吐量的消息路由。

交换机的定义和作用

交换机是 RabbitMQ 的消息路由枢纽。生产者发送消息时,会指定交换机名称和路由键;交换机基于绑定规则决定消息是否转发到一个或多个队列。这样可以解耦生产者和消费者,实现灵活的消息分发。RabbitMQ 默认提供一个无名的 Direct 类型交换机(名称是空字符串),绑定键就是队列名,因此有时感觉“不需要交换机也能工作”。在实际应用中,自定义交换机可提升系统灵活性和可维护性。

主要交换机类型及工作原理

RabbitMQ 支持多种交换机类型,每种通过不同的路由机制处理消息分发。以下是核心类型及其使用场景:

  • Direct Exchange(直连交换机)

    工作原理:消息的路由键必须与队列绑定键完全匹配。如果匹配成功,消息被转发到对应队列;否则丢弃。这是一种点对点精确路由模型。

    使用场景:常用于一对一消息分发,例如订单系统将 order.paid 路由到支付队列。

    示例:声明交换机时设置类型为 “direct”。生产者发送消息时需指定匹配的 Routing Key。

  • Topic Exchange(主题交换机)

    工作原理:基于通配符模式匹配路由。绑定键可以包含通配符:*(匹配一级单词)和 #(匹配多级单词)。交换机将路由键与绑定键的模式进行匹配,支持灵活路由。

    使用场景:适用于复杂的路由逻辑,例如日志系统中 log.error.* 匹配所有错误日志队列(如 log.error.app1log.error.app2)。

    公式示例:设绑定键为 log.error.*,路由键为 log.error.network 时匹配成功,分发到相关队列。

  • Fanout Exchange(扇型交换机)

    工作原理:忽略路由键,直接将消息广播到所有绑定队列。类似于“发布-订阅”模型。

    使用场景:适合广播通知,例如系统公告或事件通知(如通知所有用户服务更新)。

    优点:高吞吐量,因为不需要计算路由匹配。

  • Headers Exchange(头交换机)

    工作原理:不使用路由键,而是基于消息头的键值对匹配绑定键。支持更复杂的逻辑(如多条件匹配),但性能较低。

    使用场景:用于高级路由需求,例如过滤包含特定元数据的消息(如 priority=high)。

    示例:绑定规则可为 headers: {priority: high},仅匹配消息头中有 priority=high 的消息。

  • Dead Letter Exchange(死信交换机,DLX)

    工作原理:专门处理“死信”消息(如消息过期、队列满拒绝或重试失败)。需将队列绑定到 DLX,并设置规则(如 TTL 超时)。DLX 通常配合主题或直连交换机实现。

    优化方法:RabbitMQ 官方提供 Delay Exchange 插件,通过插件简化延迟消息处理,无需复杂的 DLX + TTL 组合。

    使用场景:错误处理和延迟任务,例如订单超时取消(消息在队列中 TTL 过期后路由到 DLX)。

交换机属性和配置

在声明交换机时,需指定属性以满足不同需求:

  • Name:交换机名称,标识路由目标。
  • Type:类型(direct/topic/fanout/headers/dead letter),决定了路由行为。
  • Durability:持久性。设置 durable=true 时,RabbitMQ 重启后交换机保留;否则临时创建(提高性能但风险丢失)。
  • Auto-delete:自动删除。当所有绑定队列被删除时,交换机自动移除。
  • Internal:是否内部使用(默认为 false)。设置为 true 后,消息只能路由到其他交换机(不直接到队列),适合构建多层路由拓扑。
  • 其他属性:如 Arguments(自定义参数),用于高级配置。

工作流程和最佳实践

  • 消息路由流程
    1. 生产者发送消息到交换机(指定交换机名称和路由键)。
    2. 交换机根据类型检查路由规则:
      • Direct/Topic:匹配绑定键和路由键。
      • Fanout:忽略路由键,广播所有队列。
      • DLX:处理死信事件(如 TTL 超时)。
    3. 匹配成功则消息入队;否则丢弃或转移到 DLX。
    4. 消费者从队列获取消息处理。
  • 最佳实践
    • 优先使用 Direct 或 Topic 类型以提高可读性和性能。
    • 启用持久化保证消息不丢。
    • 使用 DLX 处理失败消息,避免消息无限重试。
    • 结合 Delay 插件简化延迟任务,减少代码复杂度。
  • 默认交换机行为:未指定交换机时,RabbitMQ 使用无名 Direct 交换机,路由键自动作为队列名。适合简单场景,但不鼓励生产中依赖此方式。

总结

RabbitMQ 交换机是实现高效消息路由的核心,Direct、Topic、Fanout、Headers 和 DLX 类型覆盖了从精确匹配到广播分发、延迟处理的多样需求。正确配置属性和理解工作原理(如路由键匹配)是优化系统的关键。建议实际开发中结合监控工具(如 RabbitMQ Management UI)调试路由规则。

RabbitMQ 可靠性

RabbitMQ 的可靠性保障围绕三大核心环节构建,形成完整的消息传递保障闭环:

生产者可靠性

核心机制

  1. 消息持久化(防止 Broker 宕机丢失消息)

    channel.basicPublish("exchange", "routingKey",MessageProperties.PERSISTENT_TEXT_PLAIN, // 消息持久化message.getBytes());
    
  2. 生产者确认机制(异步确认消息投递)

    channel.confirmSelect(); // 开启确认模式
    channel.addConfirmListener((sequenceNumber, multiple) -> {// 消息成功到达Broker回调log.info("消息{}确认成功", sequenceNumber);
    }, (sequenceNumber, multiple) -> {// 消息投递失败回调log.error("消息{}投递失败", sequenceNumber);// 重试逻辑:指数退避策略retryWithBackoff(sequenceNumber);
    });
    
  3. 本地事务+消息落库(双重保障)

    @Transactional
    public void processOrder(Order order) {// 1. 业务数据落库orderDao.save(order);// 2. 发送消息(附带业务ID)rabbitTemplate.convertAndSend("orderExchange", "order.new", order.getId(), message -> {message.getMessageProperties().setCorrelationId(order.getId());return message;});
    }
    

Broker 可靠性

核心机制

  1. 队列持久化

    channel.queueDeclare("orderQueue", true, false, false, null); // durable=true
    
  2. 镜像队列配置(高可用)

    rabbitmqctl set_policy ha-all "^order." '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    
  3. 死信队列配置

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "dlxExchange");
    args.put("x-dead-letter-routing-key", "deadOrder");
    channel.queueDeclare("orderQueue", true, false, false, args);
    

消费者可靠性

核心机制

  1. 手动 ACK 模式(防止消息丢失)

    channel.basicConsume("orderQueue", false, (consumerTag, delivery) -> {try {processOrder(new String(delivery.getBody()));channel.basicAck(deliveryTag, false); // 显式确认} catch (Exception e) {channel.basicNack(deliveryTag, false, false); // 转入死信队列}
    }, consumerTag -> {});
    
  2. 消费者幂等性设计

    public void processOrder(String orderId) {// 幂等检查if(orderCache.exists(orderId)) return;// 业务处理orderService.createOrder(orderId);// 记录已处理orderCache.set(orderId);
    }
    
  3. QoS预取限制(防止消息堆积)

    channel.basicQos(10); // 每次最多处理10条
    

完整 java 实现示例

1. 持久化+事务
2. 镜像队列
2. 镜像队列
3. 死信路由
4. 手动ACK
处理成功
处理失败
回调确认
生产者
RabbitMQ Broker
Node1
Node2
DLX Exchange
Dead Letter Queue
消费者
业务系统

生产者代码

public class ReliableProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 启用生产者确认channel.confirmSelect();// 定义消息属性(持久化)AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;// 发送消息(带唯一业务ID)String message = "Order-1001";channel.basicPublish("orderExchange", "order.new", props, message.getBytes(StandardCharsets.UTF_8));// 异步确认处理channel.addConfirmListener(new ConfirmCallback() {@Overridepublic void handle(long seq, boolean multiple) {// 成功处理log.info("消息[{}]已确认", seq);}}, (seq, multiple) -> {// 失败处理(指数退避重试)new RetryHandler().retryWithBackoff(seq, 5, 1000);});}}
}

消费者代码

public class ReliableConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// QoS设置(预取限制)channel.basicQos(10);// 定义消费者(禁用自动ACK)DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {// 幂等处理if(OrderService.processWithIdempotency(message)) {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} else {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}} catch (Exception e) {// 异常转入死信队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};channel.basicConsume("orderQueue", false, callback, consumerTag -> {});}
}

RabbitMQ 死信队列

死信队列核心概念

死信队列(Dead Letter Queue)是 RabbitMQ 中处理异常消息的核心机制,当消息满足特定条件时,会被重新路由到指定的 DLQ 中。消息成为死信的三种情况:

  1. 被消费者拒绝且不重新入队(NACK/Reject)
  2. 消息在队列中超时(TTL 过期)
  3. 队列达到最大长度被强制溢出

配置方法详解

声明死信交换器

// 创建死信交换器(类型通常为 Direct 或 Topic)
channel.exchangeDeclare("dlx_exchange", "direct", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");

绑定普通队列到死信队列

Map<String, Object> args = new HashMap<>();
// 关键参数配置
args.put("x-dead-letter-exchange", "dlx_exchange");  // 指定死信交换器
args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 死信路由键
args.put("x-message-ttl", 60000);  // 消息TTL(毫秒)[^1]
args.put("x-max-length", 1000);    // 队列最大长度channel.queueDeclare("normal_queue", true, false, false, args);

核心使用场景

  1. 消息异常处理

    1.消息处理失败
    正常队列
    死信交换器
    死信队列
    异常分析系统
  2. 延迟队列实现

    // 设置消息过期时间
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000") // 60秒后成为死信[^1].build();
    channel.basicPublish("", "normal_queue", props, message.getBytes());
    
  3. 系统过载保护

    // 当队列长度>1000时,自动转移消息到DLQ
    args.put("x-max-length", 1000);  // 超过此长度消息成为死信[^4]
    
  4. 消息重试机制

    try {processMessage(message);channel.basicAck(deliveryTag, false);
    } catch (Exception e) {// 重试3次后进入死信队列if(retryCount.get() > 3) {channel.basicReject(deliveryTag, false);} else {channel.basicNack(deliveryTag, false, true);}
    }
    

完整案例

  1. 生产者
public class DLQProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection conn = factory.newConnection();Channel channel = conn.createChannel()) {// 发送带TTL的消息(60秒过期)AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build();channel.basicPublish("", "normal_queue", props, "测试死信消息".getBytes());System.out.println("已发送TTL消息");// 发送立即过期的消息(进入DLQ)AMQP.BasicProperties expiredProps = new AMQP.BasicProperties.Builder().expiration("1") // 立即过期.build();channel.basicPublish("", "normal_queue", expiredProps, "立即死信消息".getBytes());System.out.println("已发送立即过期消息");}}
}
  1. 消费者
public class DLQConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection conn = factory.newConnection();Channel channel = conn.createChannel();// 消费普通队列消息DeliverCallback callback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {// 模拟业务处理异常if(message.contains("异常")) {throw new Exception("模拟业务异常");}System.out.println("正常处理: " + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 拒绝消息并进入死信队列channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);System.err.println("消息转入死信队列: " + message);}};channel.basicConsume("normal_queue", false, callback, consumerTag -> {});}
}

监控与管理关键点

监控维度
队列深度
TTL消息占比
拒绝率
异常类型分析
重试率统计

关键命令:

# 监控死信队列堆积[^4]
rabbitmqctl list_queues name messages | grep dlq# 检查内存使用率
rabbitmq-diagnostics memory

RabbitMQ 延迟队列

RabbitMQ 实现延迟队列的核心是让消息在指定时间后触发消费,主要用于订单超时关闭、定时通知等场景。以下是两种主流实现方式及其细节:

TTL + 死信队列方案(原生方案)

原理

通过消息/队列的 TTL 特性与死信交换器组合实现:

发送带TTL的消息
TTL过期
延迟到达
生产者
普通队列
死信交换器
死信队列
消费者

实现步骤

  1. 配置死信路由(需提前声明):
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换器
args.put("x-dead-letter-routing-key", "dlx_key");    // 死信路由键
channel.queueDeclare("delay_queue", true, false, false, args);
  1. 发送延迟消息
// 设置消息60秒后过期
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000")  // TTL毫秒数.build();
channel.basicPublish("", "delay_queue", props, message.getBytes());
  1. 消费者处理死信队列
channel.basicConsume("dlx_queue", (consumerTag, delivery) -> {String msg = new String(delivery.getBody());System.out.println("延迟消息到达: " + msg); // 实际延迟业务处理
}, consumerTag -> {});

⚠️ 关键限制

  • 时间精度问题:消息在队列头部阻塞时,后续消息即使过期也需等待
  • 无序性缺陷:后发但 TTL 更短的消息无法优先处理
  • 最大延迟限制:TTL 最长约49天( 2^32毫秒)

延迟消息插件方案(rabbitmq-delayed-message-exchange)

原理

通过官方插件内置的时间轮算法实现精准延时:

发送到延迟交换器
内部时间轮
触发
生产者
延迟插件
到期检测
目标队列
消费者

安装与配置

  1. 安装插件(Docker示例):
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 声明延迟交换器:
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");  // 底层路由类型
channel.exchangeDeclare("delayed_exchange", "x-delayed-message",  // 特殊交换器类型true, false, args
);
  1. 发送延迟消息:
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 60000);  // 延迟60秒AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();channel.basicPublish("delayed_exchange", "", props, message.getBytes());

✅ 方案优势

  • 高精度定时:毫秒级延迟触发,支持乱序消息的时序修正
  • 吞吐量优化:单队列可承载千万级消息
  • 简化架构:无需额外死信队列,降低运维复杂度

⚠️ 注意事项

  • 延迟上限 2天
  • 重启节点时未触发消息会丢失(需持久化保障)
  • 集群环境下需确保插件在所有节点启用

方案对比与选型建议

维度TTL + DLX 方案插件方案
延迟精度低(队列阻塞影响时序)高(时间轮算法)
消息时序无法保证严格保证
最大延迟约49天2天
吞吐量依赖 DLQ 性能单队列千万级
适用场景简单延迟需求高并发精准延迟(如金融交易)

选型建议

  • 短期延迟(≤2天)且需要严格时序 → 插件方案
  • 长期延迟或低资源环境 → TTL+DLX方案
  • 关键业务 → 两种方案叠加(插件为主,DLX兜底)

实际测试中,插件方案在10万/秒消息量下延迟误差<50ms,而TTL方案在队列积压时误差可达分钟级

生产实践建议

  1. 消息持久化

    同时设置消息和队列的持久化标志:

    AMQP.BasicProperties props = new Builder().deliveryMode(2) // 持久化消息.headers(delayHeaders).build();
    
  2. 监控告警

    重点监控指标:

    # 插件方案监控
    rabbitmqctl list_exchanges name type | grep delayed
    # DLQ方案监控
    rabbitmqctl list_queues messages name | grep dlx
    
  3. 延迟补偿机制

    添加重试逻辑防止网络抖动:

    if (System.currentTimeMillis() - sendTime > delayTime + 30000) {log.warn("延迟消息超时补偿: {}", msgId);resendWithBackoff(msg); // 指数退避重发
    }
    

相关文章:

  • Power Query动态追加查询
  • 品牌坚持电商控价的底层逻辑与实施价值
  • 【Elasticsearch】映射:Nested 类型
  • 神经网络-Day46
  • 单元测试与QTestLib框架使用
  • Python打卡训练营学习记录Day46
  • WiFi通信应用开发【保姆级】+配置ESP8266芯片的WiFi station和soft-AP + station工作模式!!!
  • 基于 actix-web 框架的简单 demo
  • Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
  • DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
  • Pycharm中添加不了新建的Conda环境(此篇专门给Daidai写的)
  • 衡量嵌入向量的相似性的方法
  • 服务器磁盘空间被Docker容器日志占满处理方法
  • 青少年编程与数学 01-011 系统软件简介 05 macOS操作系统
  • 基于IDIG-GAN的小样本电机轴承故障诊断
  • Mac下Android Studio扫描根目录卡死问题记录
  • WebRTC 与 WebSocket 的关联关系
  • 代码安全规范1.1
  • QuaggaJS用法详解
  • elasticsearch基本操作笔记
  • 如何做网站内链/怎样在百度上发帖子
  • wordpress仿阿里主题/seo分析报告
  • 模仿网站制作/seo课程心得体会
  • 企业营销型网站类型/软文有哪些推广渠道
  • 做一家购物网站要多少钱/新闻株洲最新
  • 天津网站建站公司/seo优化服务