RabbitMQ基础
RabbitMQ基础知识
一、RabbitMQ简介
1.1 什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
1.2 核心概念
- Producer:消息生产者
- Consumer:消息消费者
- Exchange:交换机,负责消息路由
- Queue:队列,存储消息
- Binding:绑定,交换机和队列之间的关系
- Channel:信道,建立在Connection上的虚拟连接
- Connection:TCP连接
1.3 应用场景
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
- 消息通讯
二、安装与配置
2.1 安装RabbitMQ
# Windows安装
# 1. 安装Erlang
# 2. 下载并安装RabbitMQ
# 3. 启用管理插件
rabbitmq-plugins enable rabbitmq_management
# Linux安装
sudo apt-get update
sudo apt-get install rabbitmq-server
# Docker安装
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:management
2.2 基础配置
# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
三、交换机类型
3.1 Direct Exchange
// 声明交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
// 声明队列
@Bean
public Queue directQueue() {
return new Queue("direct.queue");
}
// 绑定关系
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue())
.to(directExchange())
.with("direct.routingKey");
}
3.2 Topic Exchange
// 声明交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
// 声明队列
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
// 绑定关系
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with("topic.#"); // #匹配0个或多个单词
}
3.3 Fanout Exchange
// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
// 声明队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 绑定关系
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1())
.to(fanoutExchange());
}
3.4 Headers Exchange
// 声明交换机
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.exchange");
}
// 声明队列
@Bean
public Queue headersQueue() {
return new Queue("headers.queue");
}
// 绑定关系
@Bean
public Binding headersBinding() {
return BindingBuilder.bind(headersQueue())
.to(headersExchange())
.whereAll("header1", "value1").match();
}
四、消息发送与接收
4.1 消息生产者
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息
public void sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
// 发送带确认的消息
public void sendMessageWithConfirm(String exchange, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}
4.2 消息消费者
@Component
public class MessageConsumer {
// 简单消费
@RabbitListener(queues = "direct.queue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
// 手动确认消费
@RabbitListener(queues = "direct.queue")
public void receiveMessageManual(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理消息
System.out.println("Received message: " + message);
// 手动确认
channel.basicAck(tag, false);
} catch (Exception e) {
// 消息重回队列
channel.basicNack(tag, false, true);
}
}
}
五、高级特性
5.1 消息确认机制
// 配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
// 实现确认回调
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败:" + cause);
}
};
}
// 实现返回回调
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return (message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("消息路由失败:" + replyText);
};
}
5.2 死信队列
// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange");
}
// 声明死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue("dlx.queue");
}
// 声明普通队列,并指定死信交换机
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingKey");
return new Queue("normal.queue", true, false, false, args);
}
5.3 延迟队列
// 声明延迟交换机
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay.exchange", "x-delayed-message",
true, false, args);
}
// 发送延迟消息
public void sendDelayMessage(String message, int delayTime) {
rabbitTemplate.convertAndSend("delay.exchange", "delay.routingKey",
message, messagePostProcessor -> {
messagePostProcessor.getMessageProperties()
.setDelay(delayTime);
return messagePostProcessor;
});
}
六、集群部署
6.1 普通集群模式
# 1. 配置hosts
192.168.1.101 rabbit1
192.168.1.102 rabbit2
192.168.1.103 rabbit3
# 2. 同步Erlang Cookie
scp /var/lib/rabbitmq/.erlang.cookie root@rabbit2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@rabbit3:/var/lib/rabbitmq/
# 3. 加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app
6.2 镜像集群模式
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
七、监控与运维
7.1 监控指标
- 队列数量
- 消息数量
- 连接数量
- 消费者数量
- 交换机数量
7.2 常用命令
# 查看队列
rabbitmqctl list_queues
# 查看交换机
rabbitmqctl list_exchanges
# 查看绑定关系
rabbitmqctl list_bindings
# 查看连接
rabbitmqctl list_connections
# 查看消费者
rabbitmqctl list_consumers
八、最佳实践
8.1 开发规范
- 命名规范
- 使用有意义的交换机和队列名称
- 使用统一的命名规则
- 避免特殊字符
- 消息处理
- 保证消息可靠性
- 处理消息幂等性
- 合理设置重试机制
- 性能优化
- 合理使用预取数量
- 控制消息大小
- 适当的并发数量
8.2 运维规范
- 监控告警
- 监控队列积压
- 监控消费者状态
- 监控集群状态
- 容量规划
- 评估消息量
- 规划硬件资源
- 合理设置集群规模
- 安全管理
- 访问控制
- 加密传输
- 定期备份