当前位置: 首页 > news >正文

RabbitMQ 之仲裁队列

1. 问题引入

在日常开发中,通常会在集群中部署 RabbitMQ 服务,而不是使用单机程序。在集群模式下,就造成了数据不同步的问题。

现有三台机器组成了一个集群模式,并且均部署了 RabbitMQ 服务。现声明一个普通队列,主节点为 rabbit,如下图所示:

向这个队列中发送一条消息,由于这三台机器构成了一个集群,那么每一台机器都能访问这个队列,如图所示:

当我们停止 rabbit 服务后,观察另外两个服务,发现这个队列中的消息全都不见了,并且队列也处于无法访问的阶段,如下图所示:

那么在这种情况下,就会造成数据丢失,影响服务进行。于是,就需要引入仲裁队列。

2. 什么是仲裁队列

仲裁队列是基于 Raft 一致性算法实现的持久化、复制的 FIFO 队列。仲裁队列提供队列复制的能力,保障数据的高可用性与安全性,使用仲裁队列可以在节点间进行数据复制,从而达到在一个节点宕机时,队列数据不丢失,依然可以提供服务。

3. 使用仲裁队列

在 RabbitMQ 客户端中创建仲裁队列,如下图所示:

这里选择 rabbit2 作为主节点,队列创建成功后如下图所示:

使用 amqp-client 创建,代码如下:

Map<String, Object> param = new HashMap<>();
param.put("x-queue-type", "quorum");
channel.queueDeclare("quorum_queue",true,false,false,param);

 使用 spring 框架创建,代码如下:

@Configuration
public class RabbitMQConfig {/*** 创建仲裁队列* @return*/@Bean("quorumQueue")public Queue quorumQueue() {return QueueBuilder.durable(Constants.QUORUM_QUEUE).quorum().build();}
}

使用 quorum() 方法表示声明的队列为仲裁队列。

生产者代码如下:

    @RequestMapping("/quorum")public String quorum() {String message = "quorum ...";rabbitTemplate.convertAndSend("", Constants.QUORUM_QUEUE, message);return "消息发送成功";}

代码运行后,观察 RabbitMQ 客户端:

该仲裁队列是以 rabbitmq 为主节点,并且队列中已经有一条消息。

现在将 rabbitmq 服务终止,观察队列以及队列中的消息是否存在:

 

现在 rabbitmq 服务已经停止

 

队列依然存在,队列中的消息也依然存在,并且由于主节点宕机,根据 Raft 算法重新选举了 rabbit3 作为主节点。

从上面的测试结果中可以看出,使用仲裁队列后可以有效防止集群中某个节点宕机导致消息丢失。 

http://www.dtcms.com/a/277331.html

相关文章:

  • Matplotlib 中 plt.pcolormesh 函数的使用详解
  • 【sql学习之拉链表】
  • 【LLM-Agent】Qwen-Agent智能体框架使用
  • trySend、Channel 和 Flow 的工作原理
  • 【CMake】CMake创建、安装、使用静态库和动态库
  • 操作系统-第四章存储器管理和第五章设备管理-知识点整理(知识点学习 / 期末复习 / 面试 / 笔试)
  • 【hivesql 已知维度父子关系加工层级表】
  • C++每日刷题day2025.7.13
  • 什么是RAG(Retrieval-Augmented Generation)?一文读懂检索增强生成
  • RabbitMQ面试精讲 Day 2:RabbitMQ工作模型与消息流转
  • 12.I/O复用
  • 前端性能与可靠性工程:资源优化 - 加载性能的“低垂果实”
  • 从零开始学习深度学习-水果分类之PyQt5App
  • SpringBoot集成Redis、SpringCache
  • C++ 强制类型转换
  • 【操作系统】strace 跟踪系统调用(一)
  • (LeetCode 每日一题) 2410. 运动员和训练师的最大匹配数(排序、双指针)
  • es里为什么node和shard不是一对一的关系
  • Augment AI 0.502.0版本深度解析:Task、Guidelines、Memory三大核心功能实战指南
  • 将 NumPy 数组展平并转换为 Python 列表
  • 1.1.5 模块与包——AI教你学Django
  • OpenLayers 入门指南【二】:坐标系与投影转换
  • 把 DNA 当 PCIe:一条 365 nt 链实现 64 Gbps 片上光互连——基于链式 Förster 共振的分子级波分复用链路
  • 理解 Robots 协议:爬虫该遵守的“游戏规则”
  • MySQL逻辑删除与唯一索引冲突解决
  • M00224-小范围疫情防控元胞自动机模拟matlab
  • 【unitrix】 5.1 第二套类型级二进制数基本结构体(types2.rs)
  • 深入解析Hadoop架构设计:原理、组件与应用
  • OpenLayers使用
  • (2)从零开发 Chrome 插件:实现 API 登录与本地存储功能