当前位置: 首页 > news >正文

RabbitMQ—基础篇

MQ介绍

  • 应用场景

    1. 异步解耦:用于上下游模块通信,是更彻底的一种异步任务
    2. 流量削峰:短时间内高并发场景下,将大量的请求置于MQ中,防止流量冲击
    3. 数据收集:生产者只需将数据发送到 MQ 主题(Topic)或队列(Queue),消费者按需订阅数据
  • MQ分类

    1. ActiveMQ:单机万级吞吐量,消息丢失概率低;但现在已经基本不进行维护

    2. Kafka:为大数据而生的消息中间件,适合处理大量数据

    3. RocketMQ:单机吞吐量十万级,支持分布式,消息不会丢失;适合金融互联网领域

    4. RabbitMQ:万级吞吐量,性能好时效低;适合中小规模场景

  • MQ协议:MQ运行时遵循的传输协议

    1. AMQP:特性是 分布式事务支持、消息的持久化支持、高性能和高可靠的消息处理优势 ,RabbitMQ采用的就是此协议
    2. MQTT:特性是 轻量、结构简单、传输快、不支持事务、没有持久化设计
    3. Kafka:特性是 结构简单、解析速度快、无事务支持、有持久化设计
  • MQ协议和HTTP协议

    1. MQ协议是长连接,注重传输高性能;HTTP协议是短连接,注重资源轻量化
    2. MQ协议采用二进制帧或固定格式消息,传输效率较高;HTTP协议采用请求/响应模式,其安全性要求降低了传输效率

RabbitMQ运行原理

  • 核心组件

    1. 生产者/消费者:MQ的上下游模块,一般为相关微服务
    2. 代理服务器(Broker):一个Broker就是一个RabbitMQ节点,集群部署即多Broker,所有的Vhost共同组成了Broker
    3. 虚拟主机(Vhost):相当于一个小型RabbitMQ,一个Vhost由交换机、绑定、队列等组件组成,Vhost之间相互独立
    4. 交换机(Exchange):接收来自生产者的消息,根据Binding将消息转发给相应队列
    5. 绑定(Binding):指定交换机将消息路由到哪些队列,不同交换机类型有不同的绑定规则
    6. 队列(Queue):队列需要和交换机绑定,是真正存储消息的组件,本质是一个消息缓存区
  • 运行流程

    1. 生产者发出消息请求,获得连接中的一个Channel,进而连接到指定Vhost中的交换机
    2. 消息到达交换机后,根据消息中的routingKey和消息队列匹配(Binding),从而将消息分配到指定队列中
    3. 当消费者订阅队列后,RabbitMQ 会主动将消息推送给消费者
  • 设计思想

    1. Broker:可以认为Broker就是RabbitMQ服务,多Broker可以用于集群部署
    2. Vhost:用于逻辑隔离、权限控制和资源管理,尤其是在复杂系统中的安全、维护和性能问题
    3. Channel连接:解决实现并发安全和连接复用,多个Channel共享同一个Connection,减少网络开销并保证线程安全
    4. Exchang:下游模块只需关注自己的队列,交换机根据业务需求转发到相应队列
      ┌──────────────┐            ┌──────────────┐│   Producer   │            │   Consumer   ││  ┌─────────┐ │            │  ┌─────────┐ ││  │ Channel │ │            │  │ Channel │ ││  └─────────┘ │            │  └─────────┘ │└───────╋──────┘            └───────╋──────┘│                           │┌────────────╋───────────────────────────╋────────────────────────────┐     │            │  Broker(RabbitMQ Server)  │                            ││    ┌───────╋───────────────────────────╋─────────┐   ┌─────────┐    │ │    │       ▼         Vhost1            ▼         │   │  Vhost2 │    │ │    │  ┌────────┐   ┌───────┐   ┌────────────────┐│   │  Vhsot3 │    │ │    │  │Exchange│-->│Binding│-->│Queue1,Queue2...││   │  Vhost4 │    │ │    │  └────────┘   └───────┘   └────────────────┘│   │   ...   │    │ │    └─────────────────────────────────────────────┘   └─────────┘    │  └─────────────────────────────────────────────────────────────────────┘ 

消息确认机制

  • 目的:MQ实现了生产者和消费者之间的数据通信,其中每个步骤都可能导致数据丢失
生产者 ---(步骤1)---> 交换机 ---(步骤2)---> 队列 (步骤3:持久化)---(步骤4)---> 消费者

生产者确认机制

  • 目的:保证生产者成功发送消息到达RabbitMQ

    1. RabbitMQ重连机制:生产者端未成功连接到RabbitMQ,如果客户端连接失败会自动重试直至超时报错
    2. RabbitMQ确认机制:生产者成功发送了消息,但RabbitMQ处理消息失败时,会被检测到并处理
  • 生产者确认机制:支持以下两种机制

    1. Confirm机制:生产者发送消息后,交换机接收到消息即返回ACK,否则返回NACK
    2. Return机制:生产者发送消息后,交换机正确路由队列中返回ACK,否则返回NACK
    3. 工作中不建议使用Return机制,可以用死信队列处理失败:路由失败后会过期,自动转到死信队列
  • 失败处理

    1. requeue=true:生产者收到NACK时,会自动重发消息直至成功
    2. requeue=false:丢弃消息
  • 配置方法:需通过客户端代码或配置文件实现,见下文

消费者确认机制

  • 目的:保证消费者成功消费了消息

    1. 消费者成功消费消息后,返回ACK给RabbitMQ
    2. 消费者消费失败后,返回NACK给RabbitMQ
  • 失败处理

    1. requeue=true:消息重新回到原队列,消费者会再次收到
    2. requeue=false:消息被丢弃或进入死信队列
    3. 消息未确认:消息处于挂起状态,直至消息处理完毕、TTL过期或者消费者端连接断开
  • 配置方法:需通过客户端代码或配置文件实现,见下文

数据持久化

  • 目的:备份数据,支持以下两种刷盘方式

    1. 异步刷盘(默认):RabbitMQ将数据写入内存中后即返回ACK给生产者,后续会异步持久化到硬盘中
    2. 同步刷盘:RabbitMQ将数据写入内存并成功持久化到硬盘后才返回ACK给生产者
  • 配置方法

    1. 创建队列/交换机时,选择durable属性即可开启组件持久化
    2. 发送消息时,设置消息属性 delivery_mode=2即可开启消息持久化
    3. 通过修改broker的配置文件实现同步刷盘/异步刷盘
  • 惰性队列:RabbitMQ提供了惰性队列功能,惰性队列会强制持久化消息,无论是否已配置持久化策略

    1. 创建队列时,加参数x-queue-mode = lazy即可
    2. RabbitMQ3.12版本后,所有队列默认都是惰性队列
    3. 消费消息时消息才会从硬盘加载到内存中,即内存中只保存最近的消息

死信队列

  • 目的:消息丢失的兜底方案,当消息异常时可以放入死信队列,后续单独处理异常信息

    1. 消息消费失败
    2. 消息堆积,即队列中的消息数量超过 x-max-length
    3. 消息TTL过期(消息重新投递不会重置TTL)
  • 配置方法

    1. 死信交换机就是Direct交换机,死信队列就是普通队列
    2. 绑定死信交换机和业务队列 service_queue,如下配置业务队列:
      x-dead-letter-exchange:死信交换机名称
      x-dead-letter-routing-key:死信路由键,死信交换机可以绑定多个队列,可以根据死信路由键路由到指定死信队列中
      其他参数:0(如 x-message-ttl 设置消息过期时间)按需配置
                       硬盘↑|内存↑|
生产者 ---> 交换机 ---> 队列 ---> 消费者↑           │        │ ↑         |└────ACK────┘        | └───ACK───┘↓ 死信交换机 ---> 死信队列 ---> 死信处理器

工作模式

  • RabbitMQ对于常见的业务场景,提供了多种工作模式,这些工作模式封装了相应的功能

  • 配置方法

    1. 调试时可以直接通过图形化控制台快速设置
    2. 开发阶段需要通过客户端代码或配置文件实现

工作队列模式

  • 说明:一个消息队列,多个消费者;多个消费者轮询消费此消息队列且不会重复消费

  • 应用场景:耗时任务的分布式处理,如订单处理、数据同步、文件上传等

  • 控制台模拟

    1. 创建队列,名称为work_queueRouting Key设置为队列名字
    2. 不显式绑定交换机,此时会默认绑定到AMQP default交换机上
    3. AMQP default发送消息,Routing Keywork_queue,观察simple_queue是否收到消息
    4. 多消费者同时消费消息,可以在消费者端设置公平分发,即处理完当前消息并发送 ACK 后,才会接收新消息
生产者 ---> 隐式交换机AMQP default ---> 单消息队列 ---> 消费者1,消费者2,..

广播模式

  • 说明:一个交换机,多个队列,多个消费者;交换机将消息转发给多个队列,一个消费者对应一个队列

  • 应用场景:通知系统、实时数据同步、日志分发等

  • 控制台模拟

    1. 创建交换机,类型为fanout,或者直接使用内置的amq.fanout交换机
    2. 创建多个队列,绑定到该交换机上,忽略Routing Key
    3. 向交换机发送消息,观察所有的队列是否都收到消息
					  ┌── (广播) ---> 队列1 ---> 消费者1,消费者2,..
生产者 ---> fanout交换机 |── (广播) ---> 队列2 ---> 消费者1,消费者2,..└── (广播) ---> 队列3 ---> 消费者1,消费者2,.....

路由模式

  • 说明:一个交换机,多个队列,多个消费者;交换机将消息根据Routing Key转发给指定队列,一个消费者对应一个队列

  • 应用场景:较于广播模式,路由模式按特定规则分发消息的场景,如日志分级处理、多租户系统消息路由等

  • 控制台模拟

    1. 创建交换机,类型为direct,或者直接使用内置的amq.direct
    2. 创建多个队列,绑定到该交换机上,不同的队列Routing Key互异
    3. 向交换机发送多条带有不同Routing Key的消息,观察匹配的队列是否收到消息
					  ┌── (Routing Key精确匹配) ---> 队列1 ---> 消费者,...
生产者 ---> direct交换机 |── (Routing Key精确匹配) ---> 队列2 ---> 消费者,...└── (Routing Key精确匹配) ---> 队列3 ---> 消费者,......

主题模式

  • 说明:一个交换机,多个队列,多组消费者;`一个队列可以被多个消费者轮询,相当于 路由模式+工作模式

  • 应用场景:较于路由模式,主题模式的Routing Key带有通配符,可以模糊匹配队列,适合模糊匹配的复杂路由场景

  • 控制台模拟

    1. 创建交换机,类型为topic,或者直接使用内置的amq.topic
    2. 创建多个队列,绑定到该交换机上,设置相应的Routing Key
    3. 向交换机发送多条带有不同Routing Key的消息,观察匹配的队列是否收到消息
  • 通配符使用以下两种符号:

    1. *(星号):匹配 单个单词(以 . 分隔)
    2. #(井号):匹配 零个或多个单词(包括空单词)
					 ┌── (Routing Key模糊匹配) ---> 队列1 ---> 消费者,...
生产者 ---> topic交换机 |── (Routing Key模糊匹配) ---> 队列2 ---> 消费者,...└── (Routing Key模糊匹配) ---> 队列3 ---> 消费者,......

头模式

  • 说明:RabbitMQ的消息支持传递Headers参数(键值对形式),头模式即采用headers匹配

    1. 至少两个参数:一个参数必须是x-match = all或any,其他参数为自定义的key-value对,例如type=1
    2. 如果x-match = all,消息中的所有headers必须与绑定中的参数完全匹配
    3. 如果x-match = any,消息中的headers中只要有一个与绑定中的参数匹配即可
    4. 如果匹配成功,消息将被路由到绑定的队列中;如果匹配失败,消息将被丢弃或发送到死信队列
  • 应用场景:需要基于消息元数据(如 JSON、XML 格式消息)路由的场景

  • 控制台模拟

    1. 创建交换机,类型为headers,或者直接使用内置的amq.headers
    2. 创建多个队列,绑定到该交换机上,设置相应的Arguments
    3. 向交换机发送多条带有不同headers的消息,观察匹配的队列是否收到消息
					   ┌── (headers匹配) ---> 队列1 ---> 消费者,...
生产者 ---> headers交换机 |── (headers匹配) ---> 队列2 ---> 消费者,...└── (headers匹配) ---> 队列3 ---> 消费者,......

Java客户端

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

快速配置

spring:rabbitmq:# 连接配置host: your.rabbitmq.server.com  # RabbitMQ服务器地址port: 5672                     # 默认端口5672username: your_username        # 用户名password: your_password        # 密码virtual-host: /     # 虚拟主机,默认为/# 连接超时和心跳设置connection-timeout: 5000       # 连接超时时间(毫秒)requested-heartbeat: 60        # 心跳间隔(秒)# 缓存配置cache:channel:size: 25                   # 通道缓存大小connection:mode: channel              # 连接缓存模式(channel|connection)size: 1                    # 连接缓存大小# 发布者配置publisher-confirm-type: correlated  # 发布者确认类型(none|simple|correlated)publisher-returns: true              # 是否启用返回机制   # 模板配置template:retry:enabled: true              # 是否启用重试initial-interval: 1000ms   # 初始重试间隔max-attempts: 3            # 最大重试次数multiplier: 1.0            # 重试间隔乘数mandatory: true              # 是否强制消息确认receive-timeout: 1000ms      # 接收超时时间# 监听器配置listener:simple:acknowledge-mode: auto     # 确认模式(auto|manual|none)auto-startup: true         # 是否自动启动监听器concurrency: 1             # 最小消费者数量max-concurrency: 1         # 最大消费者数量prefetch: 250              # 预取消息数量,防止消息堆积default-requeue-rejected: true # 是否重新排队被拒绝的消息retry:enabled: true            # 是否启用监听器重试initial-interval: 1000ms # 初始重试间隔max-attempts: 3          # 最大重试次数multiplier: 1.0          # 重试间隔乘数stateless: true          # 是否无状态重试# 声明队列queues:- name: my-queuedurable: trueexclusive: falseauto-delete: falsearguments:x-queue-type: quorum # 仲裁队列# 声明交换机exchanges:- name: my-exchangetype: directdurable: trueauto-delete: false# 声明绑定bindings:- queue: my-queueexchange: my-exchangerouting-key: my.routing.key

高级配置

  • 连接配置
@Configuration
public class RabbitMQConfig {// 配置连接工厂,用于配置rabbitmq连接@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/");// 可选配置connectionFactory.setConnectionTimeout(5000); // 连接超时connectionFactory.setRequestedHeartBeat(60);   // 心跳间隔connectionFactory.setPublisherConfirms(true); // 开启生产者comfirm机制// connectionFactory.setPublisherReturns(true); // 开启生产者return机制return connectionFactory;}// 配置RabbitTemplate,用于发送和接收消息@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());// 配置生产者comfirm机制rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {if (ack) {log.info("消息 {} 发送成功", correlation.getId());} else {log.error("消息 {} 发送失败: {}", correlation.getId(), cause);}});// 配置生产者return机制// rabbitTemplate.setReturnsCallback(returned -> {//     System.out.println("消息无法路由到队列: " + returned.getMessage());// });// rabbitTemplate.setMandatory(true); // 设置mandatory标志为true,确保消息无法路由时触发returnsCallback// 配置重试规则(哪些异常需要重试)RetryTemplate retryTemplate = new RetryTemplate();Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();retryableExceptions.put(org.springframework.amqp.AmqpIOException.class, true); // 网络异常retryableExceptions.put(org.springframework.amqp.AmqpConnectException.class, true); // 连接异常retryableExceptions.put(org.springframework.amqp.UncategorizedAmqpException.class, true); // 其他 AMQP 异常SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions); // 最多重试 3 次retryTemplate.setRetryPolicy(retryPolicy);// 配置退避策略(重试间隔)ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(1000); // 初始间隔 1sbackOffPolicy.setMultiplier(2.0);      // 间隔倍数(1s -> 2s -> 4s)backOffPolicy.setMaxInterval(10000);   // 最大间隔 10sretryTemplate.setBackOffPolicy(backOffPolicy);        // 设置 RetryTemplaterabbitTemplate.setRetryTemplate(retryTemplate());return rabbitTemplate;}// 配置RabbitAdmin,用于管理RabbitMQ的拓扑结构(如交换机、队列、绑定关系)@Beanpublic AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}
}
  • 配置组件:如果组件已存在,则不会覆盖创建,但如果同名组件参数不一致会报错
@Configuration
public class RabbitMQDeclarablesConfig {// ========== 交换机和队列 ==========@Beanpublic DirectExchange directExchange() { // 创建交换机return ExchangeBuilder // 优先使用建造者模式创建.directExchange("direct.exchange") // 交换机名称.durable(true) // 是否持久化// .autoDelete() // 默认自动删除功能为false,如果显式加了autoDelete()就是true.build();// 构造函数创建,可以支持所有属性和扩展参数// return new DirectExchange("direct.exchange", true, false); // return new TopicExchange("topic.exchange", true, false);// return new FanoutExchange("fanout.exchange", true, false);}@Beanpublic Queue directQueue() { // 创建队列,并绑定死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换器args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键args.put("x-message-ttl", 10000); // 可选:设置消息TTL,10秒后过期成为死信args.put("x-max-length", 10); // 可选:设置队列最大长度,超过10条消息后,新消息成为死信return new Queue("direct.queue", true, false, false, args);/*public Queue(String name,          // 队列名称boolean durable,      // 是否持久化boolean exclusive,    // 是否排他(仅限当前连接)boolean autoDelete,   // 是否自动删除Map<String, Object> arguments // 额外参数(如死信队列、TTL 等))*/}@Beanpublic Binding bindingDirectQueue() { // 绑定Direct交换机和业务队列return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key"); // 主题交换机可以含有通配符,例如direct.#}// ==========  死信交换机(DLX)和死信队列 ==========@Beanpublic DirectExchange dlxExchange() { // 死信交换机return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true); // 死信队列}@Beanpublic Binding bindingDLX() { // 绑定死信交换机和死信队列return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); // 死信路由键}// ========== Headers 交换机配置 ==========@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers.exchange", true, false);}@Beanpublic Queue headersQueue() {return new Queue("headers.queue", true);}@Beanpublic Binding bindingHeadersQueue(Queue headersQueue, HeadersExchange headersExchange) {return BindingBuilder.bind(headersQueue).to(headersExchange).whereAll("key1", "key2").match("value1", "value2");}
}
  • 配置监听器工厂:@RabbitListener标注的每一个Bean都会据此创建配置
@Configuration
@Slf4j
public class RabbitMQListenerConfig {private final ConnectionFactory connectionFactory;public RabbitMQListenerConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}// 简单消息监听容器工厂配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3); // 初始消费者数量factory.setMaxConcurrentConsumers(10); // 最大消费者数量factory.setPrefetchCount(50); // 消费者一次性处理未确认消息的最大数量factory.setDefaultRequeueRejected(false); // 不重新排队被拒绝的消息// 配置消息确认模式:框架自动处理|手动确认|消息异常直接丢弃factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // AUTO | MANUAL | NONEfactory.setBatchListener(true); // 启用批量监听factory.setBatchSize(10); // 每批处理10条消息return factory;}
}

生产者发送消息

@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {// 如果入参是对象,RabbitTemplate会默认进行jdk序列化,建议先手动Json序列化后再发送ObjectMapper objectMapper = new ObjectMapper();String json = objectMapper.writeValueAsString(new Book("西游记", "吴承恩"));rabbitTemplate.convertAndSend("direct.exchange", // 交换机名称"routing.key",     // Routing KeyjsonMessage,msg -> {// 设置消息 TTL(单位:毫秒),如果同时设置了消息级 TTL 和队列级 TTL,以两者中较小的值为准msg.getMessageProperties().setExpiration(String.valueOf(5000));return msg;});}public void sendCompleteMessage(String payload) {// 创建消息并设置 HeaderMessage message = MessageBuilder.withBody(payload.getBytes()).setHeader("messageId", UUID.randomUUID().toString()) // 唯一ID.setHeader("timestamp", System.currentTimeMillis())    // 时间戳.setHeader("type", "order_created")                   // 业务类型.build();// 发送消息rabbitTemplate.send("exchange_name", "routing_key", message);}
}

消费者监听

  • 手动ACK说明

    1. channel.basicNack(deliveryTag(消息标识符),multiple(是否批量处理),requeue(true会无限重试,false会进入死信));
    2. 单条消息处理:channel.basicReject(deliveryTag(消息标识符), requeue(true会无限重试,false会进入死信));
  • @RabbitListener 标注的监听器Bean默认使用多线程异步处理消息

    • Spring AMQP 会为每个监听容器创建一个线程池(线程数量由PrefetchCount决定)
    • 每个消息由线程池中的空闲线程处理,无需等待上一条消息处理完成
    • 并发消费:如果线程池有足够线程,可以同时处理多条消息
@Component
public class OrderConsumer {@RabbitListener(queues = "order.queue") // 声明监听队列// @RabbitListener(queues = {"order.queue", "payment.queue"}) // 可以监听多个队列public void handleOrderWithManualAck(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("Received: " + message);channel.basicAck(deliveryTag, false); // 手动ACK} catch (Exception e) {channel.basicNack(deliveryTag, false, false); // 进入死信队列}}@RabbitListener(queues = "order.queue") // 工作队列模式下,可以多个消费者监听同一个队列public void handleBatch(List<String> messages, // 需要监听容器开启了批量功能,入参使用List接受Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { // 一批消息对应一个标识符try {for (String message : messages) {System.out.println("Received: " + message);}channel.basicAck(deliveryTag, true); // 批量确认} catch (Exception e) {channel.basicNack(deliveryTag, true, false); // 进入死信队列}}
}

消费异常处理

  • 对于核心业务,建议加入重试机制

    1. 如果未达到最大重试次数 → NACK(requeue=true)(继续重试)
    2. 如果达到最大次数 → NACK(requeue=false)Reject(requeue=false),消息进入死信队列
  • 方法1:使用Spring Retry拦截器,直接重试,失败后手动入死信

    1. 此方法保证消息顺序一致
    2. 放弃重试后会自动进入对应死信,或者直接丢弃
    3. factory.setAdviceChain也可以实现重试,但配置后不可动态调整且Spring Retry更灵活
@Configuration
@Slf4j
public class RabbitMQListenerConfig {private final ConnectionFactory connectionFactory;public RabbitMQListenerConfig(ConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}// 简单消息监听容器工厂配置@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3); // 初始消费者数量factory.setMaxConcurrentConsumers(10); // 最大消费者数量factory.setPrefetchCount(50); // 预取消息数量factory.setDefaultRequeueRejected(false); // 不重新排队被拒绝的消息// 配置消息确认模式:框架自动处理|手动确认|消息异常直接丢弃factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // AUTO | MANUAL | NONEfactory.setBatchListener(true); // 启用批量监听factory.setBatchSize(10); // 每批处理10条消息// 配置重试模板(RetryTemplate)RetryTemplate retryTemplate = new RetryTemplate();// 设置重试策略(最多重试3次)Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();retryableExceptions.put(Exception.class, true); // 所有异常均触发重试SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);retryTemplate.setRetryPolicy(retryPolicy);// 设置退避策略(指数退避:首次2s,后续翻倍,最大10s)ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();backOffPolicy.setInitialInterval(2000);backOffPolicy.setMultiplier(2);backOffPolicy.setMaxInterval(10000);retryTemplate.setBackOffPolicy(backOffPolicy);factory.setRetryTemplate(retryTemplate);// 也可以快速配置重试策略
//        factory.setAdviceChain(
//                RetryInterceptorBuilder.stateless()
//                        .maxAttempts(3)
//                        .backOffOptions(1000, 2.0, 10000)
//                        .recoverer((message, throwable) ->
//                                log.error("消息重试失败:{},已转入死信队列:{}",
//                                        throwable.getMessage(),
//                                        new String(message.getBody()))
//                        )
//                        .build()
//        );return factory;}
}
  • 方法2:手动实现重试逻辑,消费异常后重新入队,失败后进入死信队列

    1. 此方法可能会打乱消息顺序,但性能高
    2. 此方法可以对于不同的队列进行自定义处理
@Component
@Slf4j
public class RabbitMQConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = "direct.queue") // 声明监听队列public void handleOrderWithManualAck(Message message, Channel channel) throws IOException {try {System.out.println(new String(message.getBody()));throw new RuntimeException();// 业务处理成功// channel.basicAck(deliveryTag, false);} catch (Exception e) {int retryCount = Optional.ofNullable((Integer) message.getMessageProperties().getHeaders().get("x-retry-count")).orElse(0);if (retryCount < 3) {message.getMessageProperties().setHeader("x-retry-count", retryCount + 1);System.out.println("重试中");// 拒绝当前消息(不重新入队)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);// 手动重新发布消息(确保 Header 被持久化)rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(),message);} else {// 重试耗尽,拒绝消息(如果配置了DLX会路由到死信队列)System.out.println("重试失败");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}}
}

RabbitMQ应用

异步解耦

  • 目的:将耗时操作或不需要立即返回结果的操作通过MQ异步执行,提高系统的响应速度和吞吐量
  • 举例:用户抢购完成后,异步操作创建订单,发送通知等
@Service
public class OrderServiceImpl implements IOrderService {@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate RabbitTemplate rabbitTemplate;/** 抢购商品* @param goodId 商品id* @return 抢购结果*/@Overridepublic Result panicBuy(Long goodId) {Long userId = ThreadLocalDto.threadLocal.get().getUserId();RMap<String, String> goodMap = redissonClient.getMap("good:" + goodId); //商品信息bucket// 1.不在抢购期内直接失败Long begin = Long.parseLong(goodMap.get("beginTime"));Long end = Long.parseLong(goodMap.get("endTime"));if (LocalDateTime.ofInstant(Instant.ofEpochMilli(begin),ZoneOffset.of("+8")).isAfter(LocalDateTime.now()) || LocalDateTime.ofInstant(Instant.ofEpochMilli(end),ZoneOffset.of("+8")).isBefore(LocalDateTime.now())) {return Result.fail("不在秒杀时间中");}// 分布式锁:用商品ID作为锁的KeyRLock lock = redissonClient.getLock("lock:good:" + goodId);try{// 2.尝试拿redisson分布式锁boolean isLock = lock.tryLock(2, 10, TimeUnit.SECONDS);if (!isLock) { //重试后最终拿锁失败,返回错误信息return Result.fail("系统繁忙,请重试");}// 3.拿锁成功,开始尝试扣减库存if (Long.parseLong(goodMap.get("stoke")) <= 0){return Result.fail("没有库存");}goodMap.addAndGet("stoke", -1); // 4.抢购成功,使用MQ异步创建临时订单 OrderDto orderDto = OrderDto.builder().goodId(goodId).userId(userId).createTime(LocalDateTime.now()).build();rabbitTemplate.convertAndSend("direct.order", "direct.order", JSON.toJSONString(orderDto));return Result.ok();}finally{if(isLock){// 5.释放锁lock.unlock();}}    }
}
@Component
@Slf4j
public class OrderConsumer {@Autowiredprivate RedissonClient redissonClient;@RabbitListener(queues = "direct.order") // 声明监听队列public void handleOrderWithManualAck(String order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {OrderDto orderDto = JSON.parseObject(order, OrderDto.class);Long orderId = RedisIdUtil.getId("order"); //分布式ID作为订单idorderDto.setOrderId(orderId);redissonClient.getBucket("order:"+orderId).set(JSON.toJSONString(orderDto)); //存入订单信息channel.basicAck(deliveryTag, false); // 手动ACK} catch (Exception e) {log.error("Processing failed: " + e.getMessage());channel.basicReject(deliveryTag, false); // 拒绝单条消息(不重试,直接进入死信队列)}}
}

流量削峰

  • 目的:短时间高并发访问时限流
  • 原理:先将请求发给MQ,消费者再处理请求,通过配置消费者并发数量控制流量
  • 配置交换机和队列
@Configuration
public class RabbitMQConfig {// 创建队列@Beanpublic Queue peakCuttingQueue() {return QueueBuilder.durable("peak.cutting.queue").build();}// 创建直连交换机@Beanpublic DirectExchange peakCuttingExchange() {return new DirectExchange("peak.cutting.exchange");}// 绑定队列和交换机@Beanpublic Binding bindingPeakCutting() {return BindingBuilder.bind(peakCuttingQueue()).to(peakCuttingExchange()).with("peak.cutting.routing.key");}
}
  • Controller转发请求
@RestController
@RequestMapping("/api/traffic")
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMapping("/peak")public String handlePeakRequest(@RequestBody String payload) {// 将请求发送到削峰队列String messageId = UUID.randomUUID().toString();CorrelationData corrData = new CorrelationData(messageId);Message message = MessageBuilder.withBody(payload.getBytes()).setHeader("x-message-id", messageId).build();rabbitTemplate.send("exchange", "routingKey", message, corrData);return Result.ok();}
}
  • 异步处理请求
@Component
public class MessageConsumer {/*** 监听削峰队列* @param message 消息内容* @param channel 通道* @param deliveryTag 消息标签*/@RabbitListener(queues = "peak.cutting.queue")public void processPeakCuttingMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {System.out.println("开始处理消息: " + message);// 模拟业务处理时间(实际业务中替换为你的业务逻辑)Thread.sleep(1000);// 手动确认消息channel.basicAck(deliveryTag, false);System.out.println("消息处理完成: " + message);} catch (InterruptedException | IOException e) {try {// 处理失败,拒绝消息并重新入队(可以根据需要调整)channel.basicNack(deliveryTag, false, true);System.err.println("消息处理失败,已重新入队: " + message);} catch (IOException ex) {System.err.println("消息确认失败: " + ex.getMessage());}}}
}

消费幂等

  • 目的:保证消费者不会重复消费消息

    1. 生产者可能会因为网络等原因重复发消息
    2. 如果消费者未 ACK 消息且连接断开(如进程崩溃、网络中断),RabbitMQ 会重新投递给其他消费者*
  • 原理

    1. 在redis中记录消息状态,消费消息前先查询消息状态:消费中|未消费|已消费
    2. 可以根据数据库的一些特性(例如集合唯一性,MySQL唯一索引)保证幂等性
@Component
@Slf4j
public class RabbitMQConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;@RabbitListener(queues = "my.queue")public void handleMessage(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (messageId == null) {// 如果没有唯一ID,拒绝消息并记录日志channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);return;}// 使用 Redis 检查是否已处理Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent("msg:processed:" + messageId,  // Redis Key"1",                           // Value(无意义)24,                            // TTL(小时),避免长期占用内存TimeUnit.HOURS);if (Boolean.TRUE.equals(isProcessed)) {try {// 首次处理:执行业务逻辑doBusiness(message);// 成功则ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,删除Redis记录(可选)redisTemplate.delete("msg:processed:" + messageId);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}} else {// 已处理过,直接ACK(避免重复入队)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}

延迟消息

  • 目的:消息投递给RabbitMQ后,指定时间段后再进行消费

  • 方案

    1. 利用TTL-死信队列实现(推荐):死信队列中的消息是到期消息,消费端关注死信队列即可
    2. 使用RabbitMQ的延迟队列插件:暂存在交换机的内部存储,等暂存时间过了再将此消息发送给消费者
    3. 使用定时任务功能:循环查询headers中的时间参数,校验过期就消费

死信队列实现

  • 配置方法

    1. 对业务队列绑定死信队列
    2. 设置队列级别TTL,或者发送消息时设置消息级别TTL(以较小TTL为准)
    3. 消费者监听死信队列消费
  • 绑定死信队列

@Configuration
public class RabbitMQDeclarablesConfig {@Beanpublic DirectExchange directExchange() { // 创建交换机return ExchangeBuilder // 优先使用建造者模式创建.directExchange("direct.exchange") // 交换机名称.durable(true) // 是否持久化.build();}@Beanpublic Queue directQueue() { // 创建队列,并绑定死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换器args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 指定死信路由键args.put("x-message-ttl", 10000); // 可选:设置消息TTL,10秒后过期成为死信args.put("x-max-length", 10); // 可选:设置队列最大长度,超过10条消息后,新消息成为死信return new Queue("direct.queue", true, false, false, args);}@Beanpublic Binding bindingDirectQueue() { // 绑定Direct交换机和业务队列return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key"); // 主题交换机可以含有通配符,例如direct.#}@Beanpublic DirectExchange dlxExchange() { // 死信交换机return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true); // 死信队列}@Beanpublic Binding bindingDLX() { // 绑定死信交换机和死信队列return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); // 死信路由键}
}
  • 发送延迟消息
@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendWithMessageTTL(String message, int ttl) {rabbitTemplate.convertAndSend("direct.exchange", "direct.routing.key", message, msg -> {msg.getMessageProperties().setExpiration(String.valueOf(ttl));return msg;});}
}

延迟插件实现

  • rabbitmq_delayed_message_exchange 插件版本与RabbitMQ大版本一致即可: rabbitmq/rabbitmq-delayed-message-exchange

  • 配置方法

    1. 将插件放入挂载目录:rabbitmq-plugins映射的目录下
    2. 进入容器执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    3. 重启RabbitMQ:docker restart rabbitmq
  • 声明延迟队列

@Configuration
public class DelayedQueueConfig {// 声明延迟交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定底层交换机类型为 directreturn new CustomExchange("delayed.exchange",  // 交换机名称"x-delayed-message", // 交换机类型(延迟消息交换机)true,                // 是否持久化false,               // 是否自动删除args                 // 交换机参数);}// 声明队列@Beanpublic Queue delayedQueue() {return new Queue("delayed.queue", true);}// 绑定队列与交换机@Beanpublic Binding bindingDelayedQueue() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed.routing.key").noargs();}
}
  • 发送延迟消息
@RestController
public class DelayedMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDelayed")public String sendDelayedMessage(@RequestParam String message, @RequestParam long delayTime) {rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setDelay(delayTime); // 设置延迟时间(毫秒)return msg;});return "Delayed message sent: " + message + " (delay: " + delayTime + "ms)";}
}

RPC通信

  • RabbitMQ并没有直接提供RPC模式的功能,需要开发者手动实现的一种通信模式

    1. 直接让客户端和服务端进行通信虽然简单直接,但在某些场景下会面临扩展性、解耦、容错性等方面的限制
    2. RabbitMQ实现RPC模式可以解决上述问题,客户端发送请求并等待响应,服务端处理请求后返回结果
  • 模拟步骤

    1. 创建至少两个队列,rpc_queue用于接收请求,callback_queue用于接收响应,建议附correlation_id 为客户端唯一标识,方便多客户端时互相辨别
    2. 客户端通过交换机匹配correlation_id发消息给rpc_queue
    3. 服务端消费指定服务端rpc_queue中的消息,处理完毕后将响应发给callback_queue
---> 微服务1 ---> direct交换机 ---> rpc_queue队列 ---> 微服务2 ---> direct交换机 ---> callback_queue队列 ---> 下游模块

日志收集

  • 自定义 Logback Appender
@Component
public class RabbitMQLogAppender extends AppenderBase<ILoggingEvent> {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overrideprotected void append(ILoggingEvent event) {// 将日志转换为 JSON 或字符串String logMessage = String.format("[%s] %s - %s",event.getLevel(),event.getLoggerName(),event.getFormattedMessage());// 发送到 RabbitMQrabbitTemplate.convertAndSend(RabbitMQLogConfig.LOG_EXCHANGE,"", // Fanout 交换机不需要 routingKeylogMessage);}
}
  • 配置 Logback:logback-spring.xml
<configuration><appender name="RABBITMQ" class="com.example.RabbitMQLogAppender"/><root level="INFO"><appender-ref ref="RABBITMQ"/></root>
</configuration>
  • 消费者监听队列
@Service
public class LogConsumerService {@Autowiredprivate SystemLogRepository logRepository;@RabbitListener(queues = RabbitMQLogConfig.LOG_QUEUE)public void receiveLog(String logMessage) {// 解析日志(简单示例,实际可用正则或JSON解析)String[] parts = logMessage.split(" - ");if (parts.length >= 2) {String level = parts[0].substring(1, parts[0].indexOf(']'));String loggerName = parts[0].substring(parts[0].indexOf(']') + 2);String message = parts[1];// 存储到数据库SystemLog log = new SystemLog();log.setLevel(level);log.setLoggerName(loggerName);log.setMessage(message);log.setTimestamp(LocalDateTime.now());logRepository.save(log);}}
}

文章转载自:

http://GXPPhL2b.nrgdc.cn
http://Zkhf0g7n.nrgdc.cn
http://QFf0EPxa.nrgdc.cn
http://3cA81Dgj.nrgdc.cn
http://t0skNj4K.nrgdc.cn
http://xU0BlFiM.nrgdc.cn
http://cGjEahRU.nrgdc.cn
http://4syCS5vn.nrgdc.cn
http://CS76EnPP.nrgdc.cn
http://ZnhHWBtg.nrgdc.cn
http://fisbIjrQ.nrgdc.cn
http://Ci2g4MPB.nrgdc.cn
http://WWQVEBUz.nrgdc.cn
http://MS18GoMS.nrgdc.cn
http://z6ZGsXZB.nrgdc.cn
http://XP6idu7r.nrgdc.cn
http://0h7Fi7oz.nrgdc.cn
http://qTTsm1nF.nrgdc.cn
http://KLw9e5BZ.nrgdc.cn
http://NLqyTJv9.nrgdc.cn
http://5K22vvWv.nrgdc.cn
http://ETgJVFQB.nrgdc.cn
http://m0rzCu0c.nrgdc.cn
http://xCwsXVez.nrgdc.cn
http://gb2VRQCT.nrgdc.cn
http://ALywYpTR.nrgdc.cn
http://7Z2jYcmh.nrgdc.cn
http://zU8e5Ls1.nrgdc.cn
http://ROdJ9bvh.nrgdc.cn
http://l2W73SKw.nrgdc.cn
http://www.dtcms.com/a/385535.html

相关文章:

  • 介绍一下 Test-Time Training 技术
  • 【LangChain指南】Document loaders
  • 日语学习-日语知识点小记-进阶-JLPT-N1阶段蓝宝书,共120语法(10):91-100语法+考え方13
  • 2021/07 JLPT听力原文 问题四
  • MySQL 视图的更新与删除:从操作规范到风险防控
  • 【SQLMap】获取 Shell
  • Java之异常处理
  • C# 通过 TCP/IP 控制 Keysight 34465A 万用表(保姆级教程)
  • TVS二极管详解:原理、选型与应用实战
  • C++实现文件中单词统计等
  • 数据库(四)MySQL读写分离原理和实现
  • 关于数据库的导入和导出
  • 【氮化镓】GaN中受主的氢相关钝化余激活
  • AI 进课堂 - 语文教学流程重塑
  • 最近一些机器github解析到本地回环地址127.0.0.1
  • P6352 [COCI 2007/2008 #3] CETIRI
  • 【LeetCode 每日一题】37. 解数独
  • 多项式回归:线性回归的扩展
  • AI生成到无缝PBR材质:Firefly+第三方AI+Substance工作流
  • Java分布式锁实战指南:从理论到实践
  • 【CSS】层叠上下文和z-index
  • inline-block元素错位原因及解决方法
  • 【Java】P3 Java基础:关键字、标识符与变量详解
  • Golang语言入门篇003_Go源代码结构
  • 【Docker】报错Data page checksums are disabled.
  • Viper:Go语言中强大的配置管理库入门教程
  • ISO/PAS 5112 附录A 与21434 WPs的映射关系
  • 机器学习-Bagging
  • OpenCV 图像拼接实战:从特征检测到全景融合
  • Atlas-Chain:一个灵活的Java责任链框架设计与实现