RabbitMQ 核心原理与Spring Boot整合实战
一、RabbitMQ 核心架构
1.1 AMQP 协议模型
组件 | 作用描述 |
---|
Producer | 消息生产者,发送消息到Exchange |
Consumer | 消息消费者,从队列获取消息处理 |
Exchange | 接收消息并根据规则路由到队列 |
Queue | 存储消息的缓冲区 |
Binding | 定义Exchange和Queue之间的关系规则 |
1.2 交换机类型对比
类型 | 路由规则 | 典型应用场景 |
---|
Direct | 精确匹配Routing Key | 点对点精确路由 |
Fanout | 广播到所有绑定队列 | 发布/订阅模式 |
Topic | 通配符匹配Routing Key | 多条件复杂路由 |
Headers | 根据Header属性匹配 | 非路由键匹配场景 |
二、Spring Boot 整合配置
2.1 基础依赖配置
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件示例
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /listener:simple:prefetch: 10 concurrency: 5 max-concurrency: 10
三、消息生产消费实战
3.1 生产者配置模板
@Configuration
public class RabbitProducerConfig {@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order.exchange");}@Beanpublic Queue orderQueue() {return new Queue("order.queue", true);}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routingKey");}
}@Component
public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrder(Order order) {rabbitTemplate.convertAndSend("order.exchange","order.routingKey",order,message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});}
}
3.2 消费者监听实现
@Component
public class OrderConsumer {@RabbitListener(queues = "order.queue")@RabbitHandlerpublic void processOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {handleOrder(order);channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true); }}@RabbitListener(queues = "dead.letter.queue")public void handleDeadLetter(Message message, Channel channel) throws IOException {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
四、高级消息模式
4.1 延迟消息实现
@Bean
public DirectExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new DirectExchange("delay.exchange", true, false, args);
}
public void sendDelayMessage(String message, int delayTime) {rabbitTemplate.convertAndSend("delay.exchange","delay.routingKey",message,msg -> {msg.getMessageProperties().setHeader("x-delay", delayTime);return msg;});
}
4.2 消息可靠性保证
五、监控与维护
5.1 常用监控指标
指标名称 | 描述 | 健康阈值 |
---|
queue_messages | 队列中待处理消息数 | <1000 |
message_ack_rate | 消息确认率 | >99% |
consumer_utilization | 消费者利用率 | 40%-80% |
deliver_get | 每秒投递消息数 | 根据硬件配置调整 |
5.2 管理命令示例
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_consumers
rabbitmqctl purge_queue order.queue
六、最佳实践指南
6.1 消息设计规范
- 消息体大小:单条消息建议不超过1MB
- 序列化格式:优先使用JSON格式
- 幂等处理:消费端需要保证重复消息处理安全
- 过期时间:设置合理的TTL(Time-To-Live)
6.2 集群配置建议
扩展学习: