当前位置: 首页 > news >正文

SpringBoot3.x入门到精通系列:3.2 整合 RabbitMQ 详解

🎯 RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。它是使用Erlang语言编写的,并且基于AMQP协议。

核心概念

  • Producer: 消息生产者,发送消息的应用
  • Consumer: 消息消费者,接收消息的应用
  • Queue: 消息队列,存储消息的缓冲区
  • Exchange: 交换机,负责接收消息并路由到队列
  • Routing Key: 路由键,Exchange根据它来决定消息路由到哪个队列
  • Binding: 绑定,Exchange和Queue之间的连接关系

交换机类型

  • Direct: 直连交换机,完全匹配路由键
  • Topic: 主题交换机,支持通配符匹配
  • Fanout: 扇出交换机,广播到所有绑定的队列
  • Headers: 头交换机,根据消息头属性路由

🚀 快速开始

1. 添加依赖

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. RabbitMQ配置

spring:# RabbitMQ配置rabbitmq:# RabbitMQ服务器地址host: localhost# RabbitMQ服务器端口port: 5672# 用户名username: guest# 密码password: guest# 虚拟主机virtual-host: /# 连接配置connection-timeout: 15000# 生产者配置publisher-confirm-type: correlated # 确认模式publisher-returns: true # 开启return机制# 消费者配置listener:simple:# 手动确认模式acknowledge-mode: manual# 并发消费者数量concurrency: 1# 最大并发消费者数量max-concurrency: 10# 每次从队列获取的消息数量prefetch: 1# 重试机制retry:enabled: trueinitial-interval: 1000max-attempts: 3max-interval: 10000multiplier: 1.0# 日志配置
logging:level:org.springframework.amqp: DEBUG

🔧 RabbitMQ配置类

1. 基础配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 队列名称常量public static final String DIRECT_QUEUE = "direct.queue";public static final String TOPIC_QUEUE_1 = "topic.queue.1";public static final String TOPIC_QUEUE_2 = "topic.queue.2";public static final String FANOUT_QUEUE_1 = "fanout.queue.1";public static final String FANOUT_QUEUE_2 = "fanout.queue.2";public static final String DELAY_QUEUE = "delay.queue";public static final String DLX_QUEUE = "dlx.queue";// 交换机名称常量public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String DELAY_EXCHANGE = "delay.exchange";public static final String DLX_EXCHANGE = "dlx.exchange";/*** 消息转换器 - 使用JSON格式*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** RabbitTemplate配置*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功: " + correlationData);} else {System.out.println("消息发送失败: " + cause);}});// 设置返回回调rabbitTemplate.setReturnsCallback(returned -> {System.out.println("消息返回: " + returned.getMessage());System.out.println("回复码: " + returned.getReplyCode());System.out.println("回复文本: " + returned.getReplyText());System.out.println("交换机: " + returned.getExchange());System.out.println("路由键: " + returned.getRoutingKey());});return rabbitTemplate;}/*** 监听器容器工厂配置*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter());return factory;}// ========================= Direct Exchange =========================@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE).build();}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@Beanpublic Binding directBinding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");}// ========================= Topic Exchange =========================@Beanpublic Queue topicQueue1() {return QueueBuilder.durable(TOPIC_QUEUE_1).build();}@Beanpublic Queue topicQueue2() {return QueueBuilder.durable(TOPIC_QUEUE_2).build();}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.*.message");}@Beanpublic Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");}// ========================= Fanout Exchange =========================@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable(FANOUT_QUEUE_1).build();}@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable(FANOUT_QUEUE_2).build();}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding fanoutBinding1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@Beanpublic Binding fanoutBinding2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}// ========================= 延迟队列 =========================@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE).withArgument("x-message-ttl", 60000) // 消息TTL 60秒.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 死信交换机.withArgument("x-dead-letter-routing-key", "dlx.routing.key") // 死信路由键.build();}@Beanpublic DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");}// ========================= 死信队列 =========================@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(DLX_QUEUE).build();}@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}
}

📊 消息实体类

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import java.io.Serializable;
import java.time.LocalDateTime;public class MessageDto implements Serializable {private static final long serialVersionUID = 1L;private String id;private String content;private String type;private String sender;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 构造函数public MessageDto() {this.timestamp = LocalDateTime.now();}public MessageDto(String id, String content, String type, String sender) {this();this.id = id;this.content = content;this.type = type;this.sender = sender;}// Getter和Setter方法public String getId() { return id; }public void setId(String id) { this.id = id; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public String getType() { return type; }public void setType(String type) { this.type = type; }public String getSender() { return sender; }public void setSender(String sender) { this.sender = sender; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "MessageDto{" +"id='" + id + '\'' +", content='" + content + '\'' +", type='" + type + '\'' +", sender='" + sender + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生产者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送Direct消息*/public void sendDirectMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DIRECT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message);System.out.println("发送Direct消息: " + message);}/*** 发送Topic消息*/public void sendTopicMessage(String routingKey, String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"TOPIC","Producer");rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,routingKey,message);System.out.println("发送Topic消息 [" + routingKey + "]: " + message);}/*** 发送Fanout消息*/public void sendFanoutMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"FANOUT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", // Fanout交换机忽略路由键message);System.out.println("发送Fanout消息: " + message);}/*** 发送延迟消息*/public void sendDelayMessage(String content, int delaySeconds) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DELAY","Producer");// 设置消息属性rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE,"delay.routing.key",message,msg -> {// 设置消息过期时间msg.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));return msg;});System.out.println("发送延迟消息 [" + delaySeconds + "s]: " + message);}/*** 发送带优先级的消息*/public void sendPriorityMessage(String content, int priority) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"PRIORITY","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message,msg -> {msg.getMessageProperties().setPriority(priority);return msg;});System.out.println("发送优先级消息 [" + priority + "]: " + message);}
}

📥 消息消费者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class MessageConsumer {/*** 消费Direct消息*/@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE)public void consumeDirectMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到Direct消息: " + message);// 模拟业务处理Thread.sleep(1000);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Direct消息处理完成");} catch (Exception e) {System.err.println("处理Direct消息失败: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列1*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_1)public void consumeTopicMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列1接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列1 Topic消息处理完成");} catch (Exception e) {System.err.println("队列1处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列2*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_2)public void consumeTopicMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列2接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列2 Topic消息处理完成");} catch (Exception e) {System.err.println("队列2处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列1*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_1)public void consumeFanoutMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列1接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列1消息处理完成");} catch (Exception e) {System.err.println("Fanout队列1处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列2*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_2)public void consumeFanoutMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列2接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列2消息处理完成");} catch (Exception e) {System.err.println("Fanout队列2处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费死信消息*/@RabbitListener(queues = RabbitConfig.DLX_QUEUE)public void consumeDlxMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到死信消息: " + message);// 处理死信消息的业务逻辑// 比如记录日志、发送告警、人工处理等// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("死信消息处理完成");} catch (Exception e) {System.err.println("处理死信消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}}
}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/api/rabbitmq")
@CrossOrigin(origins = "*")
public class RabbitMQController {@Autowiredprivate MessageProducer messageProducer;/*** 发送Direct消息*/@PostMapping("/direct")public ResponseEntity<Map<String, String>> sendDirectMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendDirectMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Direct消息发送成功");return ResponseEntity.ok(response);}/*** 发送Topic消息*/@PostMapping("/topic")public ResponseEntity<Map<String, String>> sendTopicMessage(@RequestBody Map<String, String> request) {String content = request.get("content");String routingKey = request.getOrDefault("routingKey", "topic.test.message");messageProducer.sendTopicMessage(routingKey, content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Topic消息发送成功");response.put("routingKey", routingKey);return ResponseEntity.ok(response);}/*** 发送Fanout消息*/@PostMapping("/fanout")public ResponseEntity<Map<String, String>> sendFanoutMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendFanoutMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Fanout消息发送成功");return ResponseEntity.ok(response);}/*** 发送延迟消息*/@PostMapping("/delay")public ResponseEntity<Map<String, String>> sendDelayMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer delaySeconds = (Integer) request.getOrDefault("delaySeconds", 10);messageProducer.sendDelayMessage(content, delaySeconds);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "延迟消息发送成功");response.put("delaySeconds", delaySeconds.toString());return ResponseEntity.ok(response);}/*** 发送优先级消息*/@PostMapping("/priority")public ResponseEntity<Map<String, String>> sendPriorityMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer priority = (Integer) request.getOrDefault("priority", 0);messageProducer.sendPriorityMessage(content, priority);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "优先级消息发送成功");response.put("priority", priority.toString());return ResponseEntity.ok(response);}
}

📊 最佳实践

1. 消息可靠性

  • 开启生产者确认机制
  • 使用持久化队列和消息
  • 实现消费者手动确认
  • 配置死信队列处理失败消息

2. 性能优化

  • 合理设置预取数量
  • 使用批量操作
  • 优化序列化方式
  • 监控队列长度

3. 高可用性

  • 配置集群模式
  • 使用镜像队列
  • 实现故障转移
  • 监控系统状态

本文关键词: RabbitMQ, 消息队列, AMQP, 异步通信, 微服务, 解耦

http://www.dtcms.com/a/313881.html

相关文章:

  • Ethereum:智能合约开发者的“瑞士军刀”OpenZeppelin
  • 白杨SEO:百度搜索开放平台发布AI计划是什么?MCP网站红利来了?顺带说说其它
  • 剧本杀小程序系统开发:开启沉浸式推理社交新纪元
  • 力扣 hot100 Day65
  • 《Python 实用项目与工具制作指南》 · 前言
  • [自动化Adapt] GUI交互(窗口/元素) | 系统配置 | 非侵入式定制化
  • [特殊字符]️ 整个键盘控制无人机系统框架
  • Qt按键响应
  • 更智能的 RibbonBar Spread.NET 18.2Crack
  • QT:交叉编译mysql驱动库
  • 基于鼠标位置的相机缩放和平移命令的实现(原理+源码)
  • Prompt Engineering
  • 赛博威携手Dify,助力AI在企业的场景化落地
  • 【数据库】使用Sql Server创建索引优化查询速度,一般2万多数据后,通过非索引时间字段排序查询出现超时情况
  • Linux(centos)安全狗
  • Linux 用户与组管理全解析
  • 采购管理工具的实施方法论:三阶段框架与常见问题解决方案
  • RHCA02
  • How To Say - AI多语言表达工具
  • 【前端:Html】--1.3.基础语法
  • el-table高度自适应vue页面指令
  • 第二十三天(APP应用产权渠道服务资产通讯抓包静态提取动态调试测试范围)
  • Jetbrains IDE总是弹出“需要身份验证”窗口
  • 算法11. 盛最多水的容器
  • leetcode-sql-3497分析订阅转化
  • 优选算法 力扣 11. 盛最多水的容器 双指针降低时间复杂度 贪心策略 C++题解 每日一题
  • 验证码等待时间技术在酒店自助入住、美容自助与社区场景中的应用必要性研究—仙盟创梦IDE
  • Flask + HTML 项目开发思路
  • 【BUUCTF系列】[极客大挑战 2019]LoveSQL 1
  • 同质无向加权图:理论基础、算法演进与应用前沿