当前位置: 首页 > news >正文

RabbitMQ--消息顺序性

看本章之前强烈建议先去看博主的这篇博客

              RabbitMQ--消费端单线程与多线程-CSDN博客

一、消息顺序性概念

消息顺序性是指消息在生产者发送的顺序消费者接收处理的顺序保持一致。


二、RabbitMQ 顺序性保证机制

情况顺序保证情况备注
单队列,单消费者消息严格按发送顺序消费最简单且唯一保证顺序的场景
单队列,多个消费者无法保证全局顺序,但可以设置 QoS 保证消费者串行处理自己收到的消息通过 basicQos(1) 保证每个消费者一次只处理一条消息,但整体队列消息按消费者分配,顺序不保证
消息确认和重发机制如果未正确使用 ack,消息重发可能导致顺序乱需开启手动确认,确保消息处理完毕后才 ack
消息重试与死信机制可能导致消息顺序错乱需要设计合理的重试策略和死信队列策略


三、顺序性的保证方式

  1. 单队列单消费者

    • 保证消息完全顺序消费。适合严格顺序场景。

  2. 消息确认机制

    • 使用手动确认 autoAck=false,处理完后再 basicAck,防止消息乱序重发。

  3. QoS(basicQos)

    • 设置 basicQos(1),保证消费者一次只处理一条消息,避免多条消息并发处理导致乱序。

  4. 业务分区设计

    • 按某个字段(比如订单ID)分区到不同队列,保证分区内顺序。


四、原生 Java 示例


1. 依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

2. 生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100);  // 模拟发送间隔}}}
}

3. 消费者代码(单个消费者,保证顺序)

import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置每次只处理一条消息,避免乱序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模拟处理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 关闭自动确认,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

4. 多消费者并发消费注意事项

  • 多个消费者消费同一队列,消息分发是轮询,整体消息顺序无法保证

  • basicQos(1) 只保证单个消费者串行处理自己拿到的消息,但多个消费者间消息顺序无保证。

  • 若需要严格顺序,需要保证单消费者消费或者分队列处理。


五、Spring Boot 示例


1. pom.xml 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. application.yml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每个消费者预取消息数量,类似 basicQos(1)prefetch: 1

3. RabbitMQ 配置类

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化队列}
}

4. 生产者代码

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}}
}

5. 消费者代码

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模拟消息处理时间,确保消息顺序Thread.sleep(500);System.out.println("Processed: " + message);}
}

6. 主启动类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();}
}

六、总结

方面说明
单队列单消费者保证严格消息顺序,消息先进先出。
单队列多消费者消息轮询分发,整体顺序无法保证;设置 basicQos(1) 保证单个消费者顺序处理自己的消息。
消息确认机制手动 ack,避免消息未处理完成就确认导致顺序乱。
Spring Boot 配置spring.rabbitmq.listener.simple.prefetch=1 控制每个消费者预取消息数。
业务设计建议对于严格顺序场景,推荐单队列单消费者或消息分区+单消费者方案。

如果要严格保证消息顺序性:

        1. 单队列单消费者 

        2. 多消费者分区顺序

                当你只要求 “某一类业务 ID 下的顺序”一致,如订单、用户、设备号等,而不要求全局顺序时,这种方案很好。

                不能做到全局顺序消费!

                        不同队列之间顺序是无法控制的

                        比如 order_1order_5 属于不同分区,它们的处理时间会交叉,整体顺序就乱了。

多消费者分区顺序代码样例

  • 利用多个队列(分区),每个队列绑定一个消费者,保证队列内消息顺序;

  • 生产者根据某个分区键(如订单ID哈希)选择发送到对应队列,保证同一个分区的消息顺序。


多消费者分区顺序消费示例(Spring Boot)


1. 项目结构与依赖

pom.xml 添加:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置类:定义多个队列与交换机绑定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");}
}

3. 生产者:根据订单ID哈希选择分区,发送到对应RoutingKey

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);}
}

4. 消费者:为每个队列配置单独消费者,保证分区顺序

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 业务处理,保证队列内顺序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);}
}

5. 测试调用示例(主程序)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送多条订单消息,orderId不同分区for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}}
}

6. 说明

  • 消息根据订单ID哈希决定发送哪个队列

  • 每个队列由单个消费者消费,保证该分区消息顺序

  • 多个队列+多消费者,实现并发消费和分区顺序

🔁 顺序保证范围

粒度保证情况
同一个 orderId✅ 顺序消费(始终落在同一队列)
不同 orderId❌ 不保证顺序(本来就不是要求)

✅ 结论

你这套方案:

  • 👍 是 Spring Boot 下 RabbitMQ 顺序消费的推荐做法

  • 👍 保证了“每个订单 ID 的消息顺序

  • 👍 可扩展,增加分区数提升并发能力

http://www.dtcms.com/a/295220.html

相关文章:

  • Java集合去重
  • OpenMed 项目深度分析:推动医疗 NLP 领域的开源革命
  • pcie常用的查看寄存器方法
  • node.js中的path模块
  • 低速信号设计之 QSPI 篇
  • 【LeetCode数据结构】二叉树的应用(一)——单值二叉树问题、相同的树问题、对称二叉树问题、另一棵树的子树问题详解
  • Faiss中L2欧式距离与余弦相似度:究竟该如何选择?
  • Web前端入门:JavaScript 哪些地方需要 try...catch 异常捕获
  • 【图论】倍增与lca
  • Avalonia 基于MVVM的时间统计/系统时间显示 示例
  • EPSON爱普生全系列废墨垫已满清零工具分享附教程下载
  • EasyExcel 模板导出数据 + 自定义策略(合并单元格)
  • 基于深度学习的胸部 X 光图像肺炎分类系统(三)
  • Turbo Intruder 并发插件无法试用--更换新版Burpsuit解决(简单解决安装、破解问题)
  • 开源Qwen凌晨暴击闭源Claude!刷新AI编程SOTA,支持1M上下文
  • 跨境支付入门~国际支付结算(结算篇)
  • AtCoder Beginner Contest 415(ABCDE)
  • `neutron router-gateway-set` 操作失败的可能原因及解决方案
  • 深度分析Java多线程机制
  • 【智能协同云图库】智能协同云图库第六弹:空间模块开发
  • 微服务的编程测评系统6-管理员登录前端-前端路由优化
  • 【开源】WPF的数据可视化大屏解决方案——WpfMap
  • 洛谷 P11378 [GESP202412 七级] 燃烧-普及/提高-
  • fdbus4.2 timer的使用
  • AI时代,我的编程工作搭子
  • ospf单区域实验
  • Windows批量工具,直接起飞!
  • 外部存档(External Archive)机制
  • MNIST 手写数字识别模型分析
  • 无人机电池通讯接口应用:CANFD工业级芯片的选型与技术要点