当前位置: 首页 > news >正文

rabbitMq内容整理

一、RabbitMQ基础概念

RabbitMQ是一个开源的、实现AMQP(高级消息队列协议)的消息代理中间件,负责应用间异步消息传递、解耦、削峰、异步处理任务等。

核心概念有:

  • 生产者(Producer):发送消息的程序。

  • 消费者(Consumer):接收并处理消息的程序。

  • Broker:消息服务器,RabbitMQ服务端程序。

  • Exchange(交换机):负责根据规则分发消息到队列。

  • Queue(队列):存放消息的地方。

  • Binding(绑定):Exchange和Queue之间的关系,包含路由规则。

  • Routing Key(路由键):决定消息如何路由到队列。


二、RabbitMQ的消息模型

RabbitMQ支持以下Exchange类型:

交换机类型工作方式使用场景示例
direct根据消息routing key精确匹配分发消息精准消息发送,如支付通知、订单确认
fanout忽略routing key广播到所有绑定的队列广播场景,如通知系统内所有服务刷新缓存
topic根据routing key模式匹配分发消息模糊匹配场景,如日志收集、监控数据分发
headers根据消息头部匹配规则极少使用,功能类似topic,但不关注routing key

常用场景举例:

  • direct模式

    • 一个exchange,一个queue绑定routingKey为user.register,生产者发送时携带user.register路由键,消息被精确发送到该队列。

  • fanout模式

    • 一个exchange绑定多个queue,发送消息时所有队列都能接收,适合发布订阅广播场景。

  • topic模式

    • 可以使用通配符*匹配单个单词,#匹配0或多个单词,例如:

      • log.error -> 精确匹配

      • log.* 匹配log.errorlog.info

      • log.# 匹配log.error.http


三、RabbitMQ重要特性与工作中常用知识点

1、消息可靠性(必掌握)

RabbitMQ实现消息可靠传递的机制:

  • 生产者端确认(Publisher Confirm)

    • RabbitMQ确认生产者的消息已成功到达Broker。

    • 设置channel.confirmSelect()开启确认模式。

  • Broker端持久化

    • Exchange、Queue、Message都需要开启持久化(durable=True),消息会落盘。

    • 注意:消息可靠性会影响性能。

  • 消费者端ACK机制

    • 消费者明确告知消息消费完成,Broker才会删除消息。

    • 自动ACK和手动ACK(推荐)两种模式:

      • 自动ACK (autoAck=true),消费异常时可能导致消息丢失。

      • 手动ACK (channel.basicAck(deliveryTag, false)),保障数据可靠性。

2、消息幂等性(必掌握)

  • 防止重复消息消费引发的数据一致性问题。

  • 一般做法:

    • 使用消息唯一ID(UUID)去重;

    • 利用Redis或数据库的唯一键做去重校验。

3、消息死信队列(DLX)

  • 用于处理未被正常消费的消息(超过重试次数、消息过期、队列满了)。

  • DLX机制:

    • 创建单独死信交换机和队列;

    • 将业务队列绑定到死信交换机,消息异常或超时后自动进入死信队列。

4、消息延迟队列

  • 利用TTL(消息存活时间)和DLX实现延迟队列功能:

    • 设置消息或队列TTL,消息过期进入DLX。

  • 常用场景:

    • 订单超时未支付自动取消;

    • 延迟发送通知。

5、消息优先级队列

  • 设置队列为priority模式:

    Map<String, Object> args = new HashMap<>();
    args.put("x-max-priority", 10);
    channel.queueDeclare(queue, durable, exclusive, autoDelete, args);
    

  • 消息发送时指定优先级:

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().priority(5).build();
    

6、流控与消息积压问题处理(必掌握)

  • 消费端限流(QoS)

    java

    复制编辑

    channel.basicQos(10); // 一次最多处理10条未确认消息

  • 设置合理的消费者数量,防止消息堆积。

  • 监控队列长度和消费速率,及时预警处理。


四、RabbitMQ集群与高可用(常用)

  • 镜像队列(Mirror Queue)

    • 同步队列数据到多个节点,提高可用性,防止单点故障。

  • 集群模式(Cluster)

    • 主节点维护元数据,其他节点从节点复制数据。

  • Federation(联合)模式

    • 异地多数据中心间消息复制。


五、RabbitMQ性能优化(常用)

  • 使用长连接,避免频繁开关。

  • 批量发送消息,减少网络IO。

  • 调整消息持久化策略,非关键数据可关闭。

  • 合理配置消费并发数线程池


六、工作中常见问题与解决方法(重点)

问题常见原因解决办法
消息重复消费消费端未正确ACK、消息超时重投保证幂等、正确ACK
消息丢失Broker未持久化、生产者未确认、未ACK确认模式、开启持久化
队列消息堆积消费速率慢、消息大量堆积限流、增加消费者数量
连接数过多没用连接池、频繁创建连接连接池、长连接
消息乱序RabbitMQ默认先进先出,但并不完全保证顺序业务逻辑自行排序或用Kafka


七、监控管理(必掌握)

  • 使用RabbitMQ Management Plugin

    • 默认端口15672

    • 查看队列、连接、交换机、消息堆积、消费者情况;

    • 可手动删除消息、重置连接。

  • Prometheus + Grafana 监控方案:

    • 深入监控队列深度、消息消费速率、连接数、资源消耗等。

分界线

-------------------------------------------------------------------------------------------------------------

一、为什么选择 RabbitMQ 而不是其他中间件(如 Kafka、RocketMQ)?

面试官一般想了解你对常用MQ的理解差异,考察你技术选型能力。

功能点/场景RabbitMQKafkaRocketMQ
协议AMQP 标准自定义协议自定义协议
消息可靠性非常高(事务、Confirm、ACK)较高
吞吐量中(万级)极高(百万级)高(十万级)
延迟消息原生TTL+DLX支持不支持原生延迟原生支持
消息顺序性不严格保证分区内严格保证严格保证
场景偏好高可靠事务性消息、业务解耦日志、埋点、大数据传输电商场景
使用难度简单、易管理较难(生态复杂)较容易(偏Java)

举例场景

  • RabbitMQ适合高可靠业务通知场景,如订单支付结果通知;

  • Kafka适合日志传输和大数据处理场景;

  • RocketMQ更适合电商领域,有严格顺序消息需求。


二、如何保证RabbitMQ的消息可靠性、幂等性?

1. 消息可靠性(生产者→MQ→消费者):

生产者

  • 开启confirm确认机制

channel.confirmSelect(); // 开启确认模式
channel.basicPublish(exchange, routingKey, null, message.getBytes());
if (channel.waitForConfirms()) {System.out.println("消息投递成功");
} else {System.out.println("消息投递失败,需要重试或记录");
}

MQ服务器

  • 设置队列和Exchange持久化:

channel.exchangeDeclare("exchange", "direct", true);
channel.queueDeclare("queue", true, false, false, null);
channel.queueBind("queue", "exchange", "routingKey");
  • 消息持久化发送:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2)  // 2代表持久化消息.build();
channel.basicPublish("exchange", "routingKey", props, message.getBytes());

消费者

  • 使用手动ACK机制(关键):

channel.basicConsume("queue", false, (consumerTag, delivery) -> {try {// 处理消息System.out.println("收到消息: " + new String(delivery.getBody()));// 手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 拒绝消息并重回队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);}
}, consumerTag -> {});

2. 消息幂等性(防止消息重复消费):

幂等性就是同一消息多次处理结果不变。

  • 常见实现方式:

数据库唯一索引去重

-- 使用订单号做唯一索引,消费前insert去重 CREATE UNIQUE INDEX order_unique ON order_table(order_id);

Redis去重(推荐)

String msgId = message.getMsgId(); // 消息唯一标识
if (redis.setnx("msg:"+msgId, "1") == 1) {redis.expire("msg:"+msgId, 3600); // 设置过期时间1小时processMessage(message);
} else {// 消息重复,直接丢弃
}

三、RabbitMQ消费端如何限流?

消费者限流主要依靠basicQos:

// 同一时间最多允许处理10条消息(未确认状态) channel.basicQos(10);

解释
RabbitMQ不会一次性推送超过设定值的消息给消费者,未ACK的消息数量达到10时,队列暂停向该消费者发送新消息。


四、RabbitMQ消息堆积如何处理?

常见处理办法:

  • 增加消费者数量或并发数;

  • 消费者限流合理设置;

  • 临时创建新队列和消费者分流;

  • 严重时,考虑清理无用消息。

优化方案举例

  • 增加消费者(并发处理):

// 启动多个consumer实例(独立进程或线程) for (int i = 0; i < 10; i++) { new Thread(new ConsumerRunnable()).start(); }


五、如何使用RabbitMQ实现延迟消息功能?

延迟消息通过TTL(消息生存时间)+ DLX(死信队列)实现。

实现步骤(详细示例)

① 定义正常队列与死信队列

// 死信交换机和队列
channel.exchangeDeclare("dlx_exchange", "direct", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "timeout");// 正常业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "timeout");
args.put("x-message-ttl", 60000); // 队列TTL 60秒channel.queueDeclare("order_queue", true, false, false, args);

② 发送消息到order_queue,过期进入死信队列:

channel.basicPublish("", "order_queue", null, message.getBytes());

③ 消费死信队列,实现延迟处理:

channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {System.out.println("延迟消息:" + new String(delivery.getBody()));
}, consumerTag -> {});

六、RabbitMQ的确认机制与死信队列机制怎么用?

  • 确认机制前文(第二节)已详细解释。

  • 死信队列适用于消息无法被消费(超时、异常):

关键场景

  • 消息TTL过期;

  • 队列满了;

  • 消费者拒绝消息(basicNack)。

实际使用

  • 常用于异常消息降级处理、失败重试,防止无限重试导致队列拥堵。


七、RabbitMQ集群有哪些方式?如何实现高可用?

1. 集群类型:

  • 普通模式(集群元数据同步,无队列同步)

  • 镜像队列模式(常用,高可用)

  • Federation模式(跨数据中心场景)

2. 镜像队列实现高可用(推荐):

通过策略实现自动镜像队列:

shell

复制编辑

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

解释

  • ^表示所有队列;

  • ha-mode:all表示复制到所有节点。

http://www.dtcms.com/a/315654.html

相关文章:

  • PromptPilot搭配Doubao-seed-1.6:定制你需要的AI提示prompt
  • 云计算一阶段Ⅱ——11. Linux 防火墙管理
  • LeetCood算法题~水果成篮
  • [element-plus] ClickOutside点击其他地方
  • 【IDEA】IntelliJ IDEA 中文官方文档全面介绍与总结
  • Docker 部署工程基本命令记录
  • uniapp renderjs 逻辑层,视图层互相传递数据封装
  • 星图云开发者平台赋能商储油安全管控数字化转型
  • 漏洞分析:90分钟安全革命
  • NLP自然语言处理 03 Transformer架构
  • 基于 FFmpeg 与 V4L2 的多路摄像头视频采集,图像处理处理与 RTMP 推流项目(开源)
  • GPU 基础矩阵精规组织教程:从基础作用到实战应用
  • EAGLE-2:通过动态草稿树加速语言模型推理
  • 国内办公安全平台新标杆:iOA一体化办公安全解决方案
  • 用 PyTorch 实现一个简单的神经网络:从数据到预测
  • Tdengine 时序库年月日小时分组汇总问题
  • EP01:【DL 第二弹】张量(Tensor)的创建和常用方法
  • 利用DeepSeek编写带缓冲输出的V语言程序
  • centos通过DockerCompose搭建开源MediaCMS
  • 信息收集--基础篇
  • 高效稳定:Spring Boot集成腾讯云OSS实现大文件分片上传与全路径获取
  • systemui 的启动流程是怎么样的?
  • 深入浅出 RabbitMQ-交换机详解与发布订阅模型实战
  • 软件版本、Nodejs中 ~、*、^
  • centos7 个人网站搭建之gitlab私有化部署实现线上发布
  • 鸿蒙OS 系统安全
  • 14.Linux : nfs与autofs的使用
  • 计算机基础速通--数据结构·栈与队列应用
  • 国内外大模型体验与评测技术
  • 安科瑞智慧能源管理系统在啤酒厂5MW分布式光伏防逆流控制实践