RabbitMQ 之仲裁队列
1. 问题引入
在日常开发中,通常会在集群中部署 RabbitMQ 服务,而不是使用单机程序。在集群模式下,就造成了数据不同步的问题。
现有三台机器组成了一个集群模式,并且均部署了 RabbitMQ 服务。现声明一个普通队列,主节点为 rabbit,如下图所示:
向这个队列中发送一条消息,由于这三台机器构成了一个集群,那么每一台机器都能访问这个队列,如图所示:
当我们停止 rabbit 服务后,观察另外两个服务,发现这个队列中的消息全都不见了,并且队列也处于无法访问的阶段,如下图所示:
那么在这种情况下,就会造成数据丢失,影响服务进行。于是,就需要引入仲裁队列。
2. 什么是仲裁队列
仲裁队列是基于 Raft 一致性算法实现的持久化、复制的 FIFO 队列。仲裁队列提供队列复制的能力,保障数据的高可用性与安全性,使用仲裁队列可以在节点间进行数据复制,从而达到在一个节点宕机时,队列数据不丢失,依然可以提供服务。
3. 使用仲裁队列
在 RabbitMQ 客户端中创建仲裁队列,如下图所示:
这里选择 rabbit2 作为主节点,队列创建成功后如下图所示:
使用 amqp-client 创建,代码如下:
Map<String, Object> param = new HashMap<>();
param.put("x-queue-type", "quorum");
channel.queueDeclare("quorum_queue",true,false,false,param);
使用 spring 框架创建,代码如下:
@Configuration
public class RabbitMQConfig {/*** 创建仲裁队列* @return*/@Bean("quorumQueue")public Queue quorumQueue() {return QueueBuilder.durable(Constants.QUORUM_QUEUE).quorum().build();}
}
使用 quorum() 方法表示声明的队列为仲裁队列。
生产者代码如下:
@RequestMapping("/quorum")public String quorum() {String message = "quorum ...";rabbitTemplate.convertAndSend("", Constants.QUORUM_QUEUE, message);return "消息发送成功";}
代码运行后,观察 RabbitMQ 客户端:
该仲裁队列是以 rabbitmq 为主节点,并且队列中已经有一条消息。
现在将 rabbitmq 服务终止,观察队列以及队列中的消息是否存在:
现在 rabbitmq 服务已经停止
队列依然存在,队列中的消息也依然存在,并且由于主节点宕机,根据 Raft 算法重新选举了 rabbit3 作为主节点。
从上面的测试结果中可以看出,使用仲裁队列后可以有效防止集群中某个节点宕机导致消息丢失。