当前位置: 首页 > news >正文

深入理解 RabbitMQ:从底层原理到实战落地的全维度指南

引言:

  • 本文总字数:约 18500 字
  • 预计阅读时间:45 分钟

为什么我们需要 RabbitMQ?

在当今分布式系统架构中,消息队列已成为不可或缺的核心组件。想象一下,当你在电商平台下单时,系统需要处理库存扣减、订单创建、支付处理、物流通知等一系列操作。如果这些操作都同步执行,任何一个环节的延迟都会导致整个流程卡顿,用户体验将大打折扣。

RabbitMQ 作为一款高性能、可靠的消息中间件,正是为解决这类问题而生。它采用先进的消息传递机制,实现了系统间的异步通信,不仅提高了系统吞吐量,还增强了系统的容错能力和可扩展性。

根据 RabbitMQ 官方文档(RabbitMQ Documentation | RabbitMQ),RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议(AMQP),支持多种消息传递模式,能够满足复杂分布式系统的通信需求。

一、RabbitMQ 核心概念与架构

1.1 核心组件

RabbitMQ 的核心组件包括:

  • 生产者(Producer):发送消息的应用程序
  • 消费者(Consumer):接收并处理消息的应用程序
  • 交换机(Exchange):接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列
  • 队列(Queue):存储消息的缓冲区
  • 绑定(Binding):交换机和队列之间的关联关系,包含路由规则
  • 路由键(Routing Key):消息的属性,用于交换机将消息路由到合适的队列

1.2 工作流程

RabbitMQ 的基本工作流程如下:

  1. 生产者创建消息,并指定消息的路由键和交换机
  2. 生产者将消息发送到指定的交换机
  3. 交换机根据预设的绑定规则和消息的路由键,将消息路由到一个或多个队列
  4. 消费者监听队列,当有消息到达时,接收并处理消息

1.3 为什么选择 RabbitMQ?

与其他消息队列相比,RabbitMQ 具有以下优势:

  1. 可靠性高:支持消息持久化、确认机制和镜像队列,确保消息不丢失
  2. 灵活的路由机制:提供多种交换机类型,支持复杂的路由场景
  3. 多协议支持:除了 AMQP,还支持 STOMP、MQTT 等多种协议
  4. 易于扩展:支持集群部署,可根据需求动态扩展
  5. 丰富的客户端:几乎所有主流编程语言都有 RabbitMQ 客户端
  6. 管理界面友好:提供直观的 Web 管理界面,方便监控和管理

根据 2023 年 JetBrains 开发者生态系统调查,RabbitMQ 在消息队列领域的使用率排名第二,仅次于 Kafka,尤其在企业级应用中广泛采用。

二、RabbitMQ 环境搭建

2.1 安装 RabbitMQ

Docker 安装(推荐)
# 拉取RabbitMQ镜像(带管理界面)
docker pull rabbitmq:3.13-management# 启动RabbitMQ容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \rabbitmq:3.13-management
手动安装(以 Ubuntu 为例)
# 更新包列表
sudo apt update# 安装Erlang(RabbitMQ依赖)
sudo apt install -y erlang# 添加RabbitMQ仓库
echo "deb https://dl.bintray.com/rabbitmq/debian bionic main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list# 导入签名密钥
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -# 安装RabbitMQ
sudo apt update
sudo apt install -y rabbitmq-server# 启动RabbitMQ服务
sudo systemctl start rabbitmq-server# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management# 设置开机自启
sudo systemctl enable rabbitmq-server

安装完成后,可通过访问http://localhost:15672 打开管理界面,使用默认账号 guest/guest 登录(注意:默认账号只允许本地访问)。

2.2 配置 RabbitMQ

创建自定义用户和虚拟主机:

# 进入容器
docker exec -it rabbitmq bash# 创建用户
rabbitmqctl add_user jamguo password# 设置用户角色为管理员
rabbitmqctl set_user_tags jamguo administrator# 创建虚拟主机
rabbitmqctl add_vhost my_vhost# 授权用户访问虚拟主机
rabbitmqctl set_permissions -p my_vhost jamguo ".*" ".*" ".*"

三、Java 客户端开发环境搭建

3.1 Maven 依赖配置

创建一个 Spring Boot 项目,在 pom.xml 中添加以下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.jamguo.rabbitmq</groupId><artifactId>rabbitmq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-demo</name><description>RabbitMQ Demo Project by JamGuo</description><properties><java.version>17</java.version></properties><dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring AMQP (RabbitMQ) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><!-- Commons Lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.14.0</version></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring AMQP Test --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><!-- Swagger3 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.1.0</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.3.0</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

3.2 配置文件

创建 application.yml 配置文件:

spring:application:name: rabbitmq-demorabbitmq:host: localhostport: 5672username: jamguopassword: passwordvirtual-host: my_vhost# 连接超时时间(毫秒)connection-timeout: 10000# 生产者配置publisher-confirm-type: correlatedpublisher-returns: true# 消费者配置listener:simple:# 手动确认模式acknowledge-mode: manual# 并发消费者数量concurrency: 5# 最大并发消费者数量max-concurrency: 10# 限制消费者在单个请求中预取的消息数量prefetch: 10# 数据库配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/rabbitmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: root# MyBatis-Plus配置
mybatis-plus:mapper-locations: classpath*:mapper/**/*.xmltype-aliases-package: com.jamguo.rabbitmq.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# Swagger配置
springdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: method# 日志配置
logging:level:com.jamguo.rabbitmq: debugorg.springframework.amqp: info

3.3 启动类

package com.jamguo.rabbitmq;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;/*** RabbitMQ示例应用启动类* * @author jamguo*/
@SpringBootApplication
@MapperScan("com.jamguo.rabbitmq.mapper")
@EnableScheduling
@OpenAPIDefinition(info = @Info(title = "RabbitMQ示例API",version = "1.0",description = "RabbitMQ各种用法示例接口文档")
)
public class RabbitmqDemoApplication {public static void main(String[] args) {SpringApplication.run(RabbitmqDemoApplication.class, args);}}

四、RabbitMQ 交换机类型详解

RabbitMQ 提供了多种交换机类型,每种类型有不同的路由策略,适用于不同的业务场景。

4.1 直接交换机(Direct Exchange)

直接交换机是最简单的交换机类型,它根据消息的路由键(Routing Key)与绑定的路由键进行精确匹配,将消息路由到对应的队列。

工作原理

代码实现

1. 常量定义

package com.jamguo.rabbitmq.constant;/*** RabbitMQ常量类* * @author jamguo*/
public class RabbitMqConstant {/*** 直接交换机名称*/public static final String DIRECT_EXCHANGE = "direct_exchange";/*** 订单创建队列*/public static final String ORDER_CREATE_QUEUE = "order_create_queue";/*** 订单支付队列*/public static final String ORDER_PAY_QUEUE = "order_pay_queue";/*** 订单创建路由键*/public static final String ORDER_CREATE_ROUTING_KEY = "order.create";/*** 订单支付路由键*/public static final String ORDER_PAY_ROUTING_KEY = "order.pay";}

2. 配置类

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 直接交换机配置类* * @author jamguo*/
@Configuration
@Slf4j
public class DirectExchangeConfig {/*** 创建直接交换机* * @return 直接交换机实例*/@Beanpublic DirectExchange directExchange() {// 参数说明:交换机名称、是否持久化、是否自动删除、附加参数DirectExchange exchange = new DirectExchange(RabbitMqConstant.DIRECT_EXCHANGE, true, false, null);log.info("创建直接交换机: {}", RabbitMqConstant.DIRECT_EXCHANGE);return exchange;}/*** 创建订单创建队列* * @return 队列实例*/@Beanpublic Queue orderCreateQueue() {// 参数说明:队列名称、是否持久化、是否排他、是否自动删除、附加参数Queue queue = new Queue(RabbitMqConstant.ORDER_CREATE_QUEUE, true, false, false, null);log.info("创建订单创建队列: {}", RabbitMqConstant.ORDER_CREATE_QUEUE);return queue;}/*** 创建订单支付队列* * @return 队列实例*/@Beanpublic Queue orderPayQueue() {Queue queue = new Queue(RabbitMqConstant.ORDER_PAY_QUEUE, true, false, false, null);log.info("创建订单支付队列: {}", RabbitMqConstant.ORDER_PAY_QUEUE);return queue;}/*** 绑定订单创建队列到直接交换机* * @param orderCreateQueue 订单创建队列* @param directExchange 直接交换机* @return 绑定关系*/@Beanpublic Binding bindOrderCreateQueue(Queue orderCreateQueue, DirectExchange directExchange) {Binding binding = BindingBuilder.bind(orderCreateQueue).to(directExchange).with(RabbitMqConstant.ORDER_CREATE_ROUTING_KEY);log.info("绑定队列 {} 到交换机 {},路由键: {}", RabbitMqConstant.ORDER_CREATE_QUEUE,RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY);return binding;}/*** 绑定订单支付队列到直接交换机* * @param orderPayQueue 订单支付队列* @param directExchange 直接交换机* @return 绑定关系*/@Beanpublic Binding bindOrderPayQueue(Queue orderPayQueue, DirectExchange directExchange) {Binding binding = BindingBuilder.bind(orderPayQueue).to(directExchange).with(RabbitMqConstant.ORDER_PAY_ROUTING_KEY);log.info("绑定队列 {} 到交换机 {},路由键: {}", RabbitMqConstant.ORDER_PAY_QUEUE,RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_PAY_ROUTING_KEY);return binding;}}

3. 消息实体类

package com.jamguo.rabbitmq.entity;import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 订单消息实体类* * @author jamguo*/
@Data
@Schema(description = "订单消息实体")
public class OrderMessage implements Serializable {private static final long serialVersionUID = 1L;@Schema(description = "订单ID")private Long orderId;@Schema(description = "用户ID")private Long userId;@Schema(description = "订单金额")private BigDecimal amount;@Schema(description = "订单状态")private Integer status;@Schema(description = "创建时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)private LocalDateTime createTime;}

4. 生产者

package com.jamguo.rabbitmq.producer;import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.OrderMessage;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;/*** 直接交换机生产者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "直接交换机生产者", description = "用于发送消息到直接交换机")
public class DirectExchangeProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic DirectExchangeProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;// 设置确认回调this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {String messageId = correlationData != null ? correlationData.getId() : "unknown";if (ack) {log.info("消息 [{}] 成功发送到交换机", messageId);} else {log.error("消息 [{}] 发送到交换机失败,原因: {}", messageId, cause);}});// 设置返回回调this.rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("消息路由失败: 交换机={}, 路由键={}, 消息={}, 回复码={}, 回复文本={}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyCode(),returnedMessage.getReplyText());});}/*** 发送订单创建消息* * @param orderId 订单ID* @param userId 用户ID* @param amount 订单金额*/@Operation(summary = "发送订单创建消息", description = "创建订单后发送消息到订单创建队列")public void sendOrderCreateMessage(Long orderId, Long userId, BigDecimal amount) {Objects.requireNonNull(orderId, "订单ID不能为空");Objects.requireNonNull(userId, "用户ID不能为空");Objects.requireNonNull(amount, "订单金额不能为空");// 创建订单消息OrderMessage orderMessage = new OrderMessage();orderMessage.setOrderId(orderId);orderMessage.setUserId(userId);orderMessage.setAmount(amount);orderMessage.setStatus(1); // 1表示订单创建orderMessage.setCreateTime(LocalDateTime.now());// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送订单创建消息: {}", orderMessage);// 发送消息rabbitTemplate.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY,orderMessage,correlationData);}/*** 发送订单支付消息* * @param orderId 订单ID* @param userId 用户ID* @param amount 支付金额*/@Operation(summary = "发送订单支付消息", description = "订单支付后发送消息到订单支付队列")public void sendOrderPayMessage(Long orderId, Long userId, BigDecimal amount) {Objects.requireNonNull(orderId, "订单ID不能为空");Objects.requireNonNull(userId, "用户ID不能为空");Objects.requireNonNull(amount, "支付金额不能为空");// 创建订单消息OrderMessage orderMessage = new OrderMessage();orderMessage.setOrderId(orderId);orderMessage.setUserId(userId);orderMessage.setAmount(amount);orderMessage.setStatus(2); // 2表示订单已支付orderMessage.setCreateTime(LocalDateTime.now());// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送订单支付消息: {}", orderMessage);// 发送消息rabbitTemplate.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_PAY_ROUTING_KEY,orderMessage,correlationData);}}

5. 消费者

package com.jamguo.rabbitmq.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.ObjectMapper;
import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.OrderMessage;
import com.rabbitmq.client.Channel;import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.Objects;/*** 直接交换机消费者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "直接交换机消费者", description = "用于消费直接交换机路由的消息")
public class DirectExchangeConsumer {private final ObjectMapper objectMapper;public DirectExchangeConsumer(ObjectMapper objectMapper) {this.objectMapper = objectMapper;}/*** 处理订单创建消息* * @param message 消息对象* @param channel 通道对象* @throws IOException IO异常*/@RabbitListener(queues = RabbitMqConstant.ORDER_CREATE_QUEUE)public void handleOrderCreateMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);log.info("收到订单创建消息: {}", orderMessage);// 处理订单创建业务逻辑(示例)processOrderCreation(orderMessage);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("订单创建消息处理完成,消息ID: {}", message.getMessageProperties().getMessageId());} catch (Exception e) {log.error("处理订单创建消息失败", e);// 判断消息是否已经被处理过if (message.getMessageProperties().getRedelivered()) {log.error("消息已经重试过,拒绝再次处理并丢弃: {}", new String(message.getBody()));// 拒绝消息,并设置为不重新入队channel.basicReject(deliveryTag, false);} else {log.error("消息将重新入队,等待再次处理: {}", new String(message.getBody()));// 拒绝消息,并设置为重新入队channel.basicNack(deliveryTag, false, true);}}}/*** 处理订单支付消息* * @param message 消息对象* @param channel 通道对象* @throws IOException IO异常*/@RabbitListener(queues = RabbitMqConstant.ORDER_PAY_QUEUE)public void handleOrderPayMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息OrderMessage orderMessage = objectMapper.readValue(message.getBody(), OrderMessage.class);log.info("收到订单支付消息: {}", orderMessage);// 处理订单支付业务逻辑(示例)processOrderPayment(orderMessage);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("订单支付消息处理完成,消息ID: {}", message.getMessageProperties().getMessageId());} catch (Exception e) {log.error("处理订单支付消息失败", e);// 判断消息是否已经被处理过if (message.getMessageProperties().getRedelivered()) {log.error("消息已经重试过,拒绝再次处理并丢弃: {}", new String(message.getBody()));// 拒绝消息,并设置为不重新入队channel.basicReject(deliveryTag, false);} else {log.error("消息将重新入队,等待再次处理: {}", new String(message.getBody()));// 拒绝消息,并设置为重新入队channel.basicNack(deliveryTag, false, true);}}}/*** 处理订单创建业务逻辑* * @param orderMessage 订单消息*/private void processOrderCreation(OrderMessage orderMessage) {// 这里是订单创建的业务逻辑处理// 例如:更新订单状态、通知库存系统等log.info("处理订单创建: 订单ID={}, 用户ID={}, 金额={}",orderMessage.getOrderId(),orderMessage.getUserId(),orderMessage.getAmount());}/*** 处理订单支付业务逻辑* * @param orderMessage 订单消息*/private void processOrderPayment(OrderMessage orderMessage) {// 这里是订单支付的业务逻辑处理// 例如:更新支付状态、通知物流系统等log.info("处理订单支付: 订单ID={}, 用户ID={}, 支付金额={}",orderMessage.getOrderId(),orderMessage.getUserId(),orderMessage.getAmount());}}

6. 测试控制器

package com.jamguo.rabbitmq.controller;import java.math.BigDecimal;
import java.util.Random;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.jamguo.rabbitmq.producer.DirectExchangeProducer;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;/*** 直接交换机测试控制器* * @author jamguo*/
@RestController
@RequestMapping("/api/direct")
@Slf4j
@Tag(name = "直接交换机测试接口", description = "用于测试直接交换机的消息发送")
public class DirectExchangeController {private final DirectExchangeProducer directExchangeProducer;private final Random random = new Random();@Autowiredpublic DirectExchangeController(DirectExchangeProducer directExchangeProducer) {this.directExchangeProducer = directExchangeProducer;}/*** 发送订单创建消息* * @param userId 用户ID* @param amount 订单金额* @return 响应信息*/@PostMapping("/sendOrderCreate")@Operation(summary = "发送订单创建消息", description = "创建订单并发送消息到订单创建队列")public String sendOrderCreateMessage(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "订单金额", required = true) @RequestParam BigDecimal amount) {// 生成随机订单IDLong orderId = System.currentTimeMillis() + random.nextLong(1000);directExchangeProducer.sendOrderCreateMessage(orderId, userId, amount);return "订单创建消息已发送,订单ID: " + orderId;}/*** 发送订单支付消息* * @param orderId 订单ID* @param userId 用户ID* @param amount 支付金额* @return 响应信息*/@PostMapping("/sendOrderPay")@Operation(summary = "发送订单支付消息", description = "订单支付后发送消息到订单支付队列")public String sendOrderPayMessage(@Parameter(description = "订单ID", required = true) @RequestParam Long orderId,@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "支付金额", required = true) @RequestParam BigDecimal amount) {directExchangeProducer.sendOrderPayMessage(orderId, userId, amount);return "订单支付消息已发送,订单ID: " + orderId;}}
适用场景

直接交换机适用于需要精确路由的场景,例如:

  • 不同类型的事件处理(如订单创建、支付、取消等)
  • 任务分发系统,根据任务类型路由到不同的处理队列

4.2 主题交换机(Topic Exchange)

主题交换机是一种更灵活的交换机类型,它支持使用通配符进行路由键匹配,适合实现消息的多播路由。

工作原理

主题交换机使用点分隔的路由键(如 "order.create"、"user.login"),并支持两种通配符:

  • *:匹配一个单词
  • #:匹配零个或多个单词

例如:

  • "order.*" 可以匹配 "order.create"、"order.pay",但不能匹配 "order.create.success"
  • "order.#" 可以匹配 "order.create"、"order.pay"、"order.create.success"

代码实现

1. 常量定义(在 RabbitMqConstant 中添加)

/*** 主题交换机名称*/
public static final String TOPIC_EXCHANGE = "topic_exchange";/*** 订单服务队列*/
public static final String ORDER_SERVICE_QUEUE = "order_service_queue";/*** 登录日志队列*/
public static final String LOGIN_LOG_QUEUE = "login_log_queue";/*** 全量日志队列*/
public static final String FULL_LOG_QUEUE = "full_log_queue";/*** 订单相关路由键前缀*/
public static final String ORDER_ROUTING_KEY_PREFIX = "order.";/*** 登录相关路由键*/
public static final String USER_LOGIN_ROUTING_KEY = "user.login";

2. 配置类

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 主题交换机配置类* * @author jamguo*/
@Configuration
@Slf4j
public class TopicExchangeConfig {/*** 创建主题交换机* * @return 主题交换机实例*/@Beanpublic TopicExchange topicExchange() {TopicExchange exchange = new TopicExchange(RabbitMqConstant.TOPIC_EXCHANGE,true,false,null);log.info("创建主题交换机: {}", RabbitMqConstant.TOPIC_EXCHANGE);return exchange;}/*** 创建订单服务队列* * @return 队列实例*/@Beanpublic Queue orderServiceQueue() {Queue queue = new Queue(RabbitMqConstant.ORDER_SERVICE_QUEUE,true,false,false,null);log.info("创建订单服务队列: {}", RabbitMqConstant.ORDER_SERVICE_QUEUE);return queue;}/*** 创建登录日志队列* * @return 队列实例*/@Beanpublic Queue loginLogQueue() {Queue queue = new Queue(RabbitMqConstant.LOGIN_LOG_QUEUE,true,false,false,null);log.info("创建登录日志队列: {}", RabbitMqConstant.LOGIN_LOG_QUEUE);return queue;}/*** 创建全量日志队列* * @return 队列实例*/@Beanpublic Queue fullLogQueue() {Queue queue = new Queue(RabbitMqConstant.FULL_LOG_QUEUE,true,false,false,null);log.info("创建全量日志队列: {}", RabbitMqConstant.FULL_LOG_QUEUE);return queue;}/*** 绑定订单服务队列到主题交换机* * @param orderServiceQueue 订单服务队列* @param topicExchange 主题交换机* @return 绑定关系*/@Beanpublic Binding bindOrderServiceQueue(Queue orderServiceQueue, TopicExchange topicExchange) {// 绑定键为 "order.*",匹配所有以 "order." 开头的路由键Binding binding = BindingBuilder.bind(orderServiceQueue).to(topicExchange).with("order.*");log.info("绑定队列 {} 到交换机 {},绑定键: order.*",RabbitMqConstant.ORDER_SERVICE_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}/*** 绑定登录日志队列到主题交换机* * @param loginLogQueue 登录日志队列* @param topicExchange 主题交换机* @return 绑定关系*/@Beanpublic Binding bindLoginLogQueue(Queue loginLogQueue, TopicExchange topicExchange) {// 绑定键为 "*.login",匹配所有以 ".login" 结尾的路由键Binding binding = BindingBuilder.bind(loginLogQueue).to(topicExchange).with("*.login");log.info("绑定队列 {} 到交换机 {},绑定键: *.login",RabbitMqConstant.LOGIN_LOG_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}/*** 绑定全量日志队列到主题交换机* * @param fullLogQueue 全量日志队列* @param topicExchange 主题交换机* @return 绑定关系*/@Beanpublic Binding bindFullLogQueue(Queue fullLogQueue, TopicExchange topicExchange) {// 绑定键为 "#",匹配所有路由键Binding binding = BindingBuilder.bind(fullLogQueue).to(topicExchange).with("#");log.info("绑定队列 {} 到交换机 {},绑定键: #",RabbitMqConstant.FULL_LOG_QUEUE,RabbitMqConstant.TOPIC_EXCHANGE);return binding;}}

3. 消息实体类

package com.jamguo.rabbitmq.entity;import java.io.Serializable;
import java.time.LocalDateTime;import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** 系统日志消息实体类* * @author jamguo*/
@Data
@Schema(description = "系统日志消息实体")
public class SystemLogMessage implements Serializable {private static final long serialVersionUID = 1L;@Schema(description = "日志ID")private String logId;@Schema(description = "日志类型")private String logType;@Schema(description = "用户ID")private Long userId;@Schema(description = "用户名")private String username;@Schema(description = "操作内容")private String operation;@Schema(description = "操作时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@JsonSerialize(using = LocalDateTimeSerializer.class)@JsonDeserialize(using = LocalDateTimeDeserializer.class)private LocalDateTime operationTime;@Schema(description = "IP地址")private String ipAddress;}

4. 生产者

package com.jamguo.rabbitmq.producer;import java.time.LocalDateTime;
import java.util.UUID;import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.SystemLogMessage;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;import java.util.Objects;/*** 主题交换机生产者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "主题交换机生产者", description = "用于发送消息到主题交换机")
public class TopicExchangeProducer {private final RabbitTemplate rabbitTemplate;@Autowiredpublic TopicExchangeProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;// 设置确认回调this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {String messageId = correlationData != null ? correlationData.getId() : "unknown";if (ack) {log.info("主题消息 [{}] 成功发送到交换机", messageId);} else {log.error("主题消息 [{}] 发送到交换机失败,原因: {}", messageId, cause);}});// 设置返回回调this.rabbitTemplate.setReturnsCallback(returnedMessage -> {log.error("主题消息路由失败: 交换机={}, 路由键={}, 消息={}, 回复码={}, 回复文本={}",returnedMessage.getExchange(),returnedMessage.getRoutingKey(),new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyCode(),returnedMessage.getReplyText());});}/*** 发送订单相关消息* * @param logType 日志类型* @param userId 用户ID* @param username 用户名* @param operation 操作内容* @param ipAddress IP地址*/@Operation(summary = "发送订单相关消息", description = "发送订单相关的日志消息")public void sendOrderLogMessage(String logType, Long userId, String username, String operation, String ipAddress) {StringUtils.hasText(logType, "日志类型不能为空");Objects.requireNonNull(userId, "用户ID不能为空");StringUtils.hasText(username, "用户名不能为空");StringUtils.hasText(operation, "操作内容不能为空");// 创建系统日志消息SystemLogMessage logMessage = new SystemLogMessage();logMessage.setLogId(UUID.randomUUID().toString());logMessage.setLogType(logType);logMessage.setUserId(userId);logMessage.setUsername(username);logMessage.setOperation(operation);logMessage.setOperationTime(LocalDateTime.now());logMessage.setIpAddress(StringUtils.defaultIfBlank(ipAddress, "unknown"));// 生成路由键:order.{logType}String routingKey = RabbitMqConstant.ORDER_ROUTING_KEY_PREFIX + logType;// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送订单日志消息,路由键: {}, 消息: {}", routingKey, logMessage);// 发送消息rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE,routingKey,logMessage,correlationData);}/*** 发送用户登录消息* * @param userId 用户ID* @param username 用户名* @param ipAddress IP地址*/@Operation(summary = "发送用户登录消息", description = "发送用户登录的日志消息")public void sendUserLoginMessage(Long userId, String username, String ipAddress) {Objects.requireNonNull(userId, "用户ID不能为空");StringUtils.hasText(username, "用户名不能为空");// 创建系统日志消息SystemLogMessage logMessage = new SystemLogMessage();logMessage.setLogId(UUID.randomUUID().toString());logMessage.setLogType("login");logMessage.setUserId(userId);logMessage.setUsername(username);logMessage.setOperation("用户登录系统");logMessage.setOperationTime(LocalDateTime.now());logMessage.setIpAddress(StringUtils.defaultIfBlank(ipAddress, "unknown"));// 生成消息IDString messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送用户登录消息,路由键: {}, 消息: {}", RabbitMqConstant.USER_LOGIN_ROUTING_KEY, logMessage);// 发送消息rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE,RabbitMqConstant.USER_LOGIN_ROUTING_KEY,logMessage,correlationData);}}

5. 消费者

package com.jamguo.rabbitmq.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.ObjectMapper;
import com.jamguo.rabbitmq.constant.RabbitMqConstant;
import com.jamguo.rabbitmq.entity.SystemLogMessage;
import com.rabbitmq.client.Channel;import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Objects;/*** 主题交换机消费者* * @author jamguo*/
@Component
@Slf4j
@Tag(name = "主题交换机消费者", description = "用于消费主题交换机路由的消息")
public class TopicExchangeConsumer {private final ObjectMapper objectMapper;public TopicExchangeConsumer(ObjectMapper objectMapper) {this.objectMapper = objectMapper;}/*** 处理订单服务消息* * @param message 消息对象* @param channel 通道对象* @throws IOException IO异常*/@RabbitListener(queues = RabbitMqConstant.ORDER_SERVICE_QUEUE)public void handleOrderServiceMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("订单服务收到消息: {}", logMessage);// 处理订单相关业务逻辑processOrderMessage(logMessage);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("订单服务消息处理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("处理订单服务消息失败", e);// 消息处理失败,根据业务需求决定是否重新入队channel.basicNack(deliveryTag, false, true);}}/*** 处理登录日志消息* * @param message 消息对象* @param channel 通道对象* @throws IOException IO异常*/@RabbitListener(queues = RabbitMqConstant.LOGIN_LOG_QUEUE)public void handleLoginLogMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("登录日志服务收到消息: {}", logMessage);// 处理登录日志业务逻辑processLoginLog(logMessage);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("登录日志消息处理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("处理登录日志消息失败", e);// 消息处理失败,根据业务需求决定是否重新入队channel.basicNack(deliveryTag, false, true);}}/*** 处理全量日志消息* * @param message 消息对象* @param channel 通道对象* @throws IOException IO异常*/@RabbitListener(queues = RabbitMqConstant.FULL_LOG_QUEUE)public void handleFullLogMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 解析消息SystemLogMessage logMessage = objectMapper.readValue(message.getBody(), SystemLogMessage.class);log.info("全量日志服务收到消息: {}", logMessage);// 处理全量日志业务逻辑processFullLog(logMessage);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("全量日志消息处理完成,消息ID: {}", logMessage.getLogId());} catch (Exception e) {log.error("处理全量日志消息失败", e);// 消息处理失败,根据业务需求决定是否重新入队channel.basicNack(deliveryTag, false, true);}}/*** 处理订单相关消息* * @param logMessage 日志消息*/private void processOrderMessage(SystemLogMessage logMessage) {// 处理订单相关业务逻辑log.info("处理订单消息: 用户[{}]在[{}]进行了[{}]操作",logMessage.getUsername(),logMessage.getOperationTime(),logMessage.getOperation());}/*** 处理登录日志* * @param logMessage 日志消息*/private void processLoginLog(SystemLogMessage logMessage) {// 处理登录日志业务逻辑log.info("记录登录日志: 用户[{}]在[{}]从IP[{}]登录系统",logMessage.getUsername(),logMessage.getOperationTime(),logMessage.getIpAddress());}/*** 处理全量日志* * @param logMessage 日志消息*/private void processFullLog(SystemLogMessage logMessage) {// 处理全量日志业务逻辑log.info("记录系统日志: 类型[{}], 用户[{}], 操作[{}]",logMessage.getLogType(),logMessage.getUsername(),logMessage.getOperation());}}

6. 测试控制器

package com.jamguo.rabbitmq.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.jamguo.rabbitmq.producer.TopicExchangeProducer;import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;/*** 主题交换机测试控制器* * @author jamguo*/
@RestController
@RequestMapping("/api/topic")
@Slf4j
@Tag(name = "主题交换机测试接口", description = "用于测试主题交换机的消息发送")
public class TopicExchangeController {private final TopicExchangeProducer topicExchangeProducer;public TopicExchangeController(TopicExchangeProducer topicExchangeProducer) {this.topicExchangeProducer = topicExchangeProducer;}/*** 发送订单创建日志消息* * @param userId 用户ID* @param username 用户名* @param ipAddress IP地址* @return 响应信息*/@PostMapping("/sendOrderCreateLog")@Operation(summary = "发送订单创建日志消息", description = "发送订单创建的日志消息")public String sendOrderCreateLog(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "用户名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendOrderLogMessage("create", userId, username, "创建新订单", ipAddress);return "订单创建日志消息已发送";}/*** 发送订单支付日志消息* * @param userId 用户ID* @param username 用户名* @param ipAddress IP地址* @return 响应信息*/@PostMapping("/sendOrderPayLog")@Operation(summary = "发送订单支付日志消息", description = "发送订单支付的日志消息")public String sendOrderPayLog(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "用户名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendOrderLogMessage("pay", userId, username, "支付订单", ipAddress);return "订单支付日志消息已发送";}/*** 发送用户登录消息* * @param userId 用户ID* @param username 用户名* @param ipAddress IP地址* @return 响应信息*/@PostMapping("/sendUserLogin")@Operation(summary = "发送用户登录消息", description = "发送用户登录的日志消息")public String sendUserLogin(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "用户名", required = true) @RequestParam String username,@Parameter(description = "IP地址") @RequestParam(required = false) String ipAddress) {topicExchangeProducer.sendUserLoginMessage(userId, username, ipAddress);return "用户登录消息已发送";}}
适用场景

主题交换机适用于需要灵活路由的场景,例如:

  • 日志收集系统,根据日志类型和级别进行分类处理
  • 新闻推送系统,根据用户兴趣标签推送相关新闻
  • 事件总线,实现不同模块间的解耦通信

4.3 扇形交换机(Fanout Exchange)

扇形交换机是一种广播类型的交换机,它会将消息路由到所有绑定到它的队列,忽略路由键的存在。

工作原理

扇形交换机不处理路由键,无论消息携带什么路由键,都会被发送到所有绑定到该交换机的队列。这是一种最简单的广播模式。

代码实现

这里只展示关键代码,其他辅助代码(如常量定义、消息实体等)与前面类似,不再重复。

1. 配置类

package com.jamguo.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.jamguo.rabbitmq.constant.RabbitMqConstant;import lombok.extern.slf4j.Slf4j;/*** 扇形交换机配置类* * @author jamguo*/
@Configuration
@Slf4j
public class FanoutExchangeConfig {/*** 创建扇形交换机* * @return 扇形交换机实例*/@Beanpublic FanoutExchange fanoutExchange() {FanoutExchange exchange = new FanoutExchange(RabbitMqConstant.FANOUT_EXCHANGE,true,false,null);log.info("创建扇形交换机: {}", RabbitMqConstant.FANOUT_EXCHANGE);return exchange;}/*** 创建库存服务队列* * @return 队列实例*/@Beanpublic Queue inventoryServiceQueue() {Queue queue = new Queue(RabbitMqConstant.INVENTORY_SERVICE_QUEUE,true,false,false,null);log.info("创建库存服务队列: {}", RabbitMqConstant.INVENTORY_SERVICE_QUEUE);return queue;}/*** 创建物流服务队列* * @return 队列实例*/@Beanpublic Queue logisticsServiceQueue() {Queue queue = new Queue(RabbitMqConstant.LOGISTICS_SERVICE_QUEUE,true,false,false,null);log.info("创建物流服务队列: {}", RabbitMqConstant.LOGISTICS_SERVICE_QUEUE);return queue;}/*** 创建通知服务队列* * @return 队列实例*/@Beanpublic Queue notificationServiceQueue() {Queue queue = new Queue(RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE,true,false,false,null);log.info("创建通知服务队列: {}", RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE);return queue;}/*** 绑定库存服务队列到扇形交换机* * @param inventoryServiceQueue 库存服务队列* @param fanoutExchange 扇形交换机* @return 绑定关系*/@Beanpublic Binding bindInventoryServiceQueue(Queue inventoryServiceQueue, FanoutExchange fanoutExchange) {// 扇形交换机忽略路由键,所以这里可以随便填Binding binding = BindingBuilder.bind(inventoryServiceQueue).to(fanoutExchange);log.info("绑定队列 {} 到交换机 {}",RabbitMqConstant.INVENTORY_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}/*** 绑定物流服务队列到扇形交换机* * @param logisticsServiceQueue 物流服务队列* @param fanoutExchange 扇形交换机* @return 绑定关系*/@Beanpublic Binding bindLogisticsServiceQueue(Queue logisticsServiceQueue, FanoutExchange fanoutExchange) {Binding binding = BindingBuilder.bind(logisticsServiceQueue).to(fanoutExchange);log.info("绑定队列 {} 到交换机 {}",RabbitMqConstant.LOGISTICS_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}/*** 绑定通知服务队列到扇形交换机* * @param notificationServiceQueue 通知服务队列* @param fanoutExchange 扇形交换机* @return 绑定关系*/@Beanpublic Binding bindNotificationServiceQueue(Queue notificationServiceQueue, FanoutExchange fanoutExchange) {Binding binding = BindingBuilder.bind(notificationServiceQueue).to(fanoutExchange);log.info("绑定队列 {} 到交换机 {}",RabbitMqConstant.NOTIFICATION_SERVICE_QUEUE,RabbitMqConstant.FANOUT_EXCHANGE);return binding;}}
适用场景

扇形交换机适用于需要广播消息的场景,例如:

  • 系统通知,需要多个服务同时收到通知
  • 数据同步,多个节点需要保持数据一致
  • 分布式系统中的事件广播

4.4 Headers 交换机(Headers Exchange)

Headers 交换机与其他类型的交换机不同,它不依赖路由键进行路由,而是根据消息的 headers 属性进行匹配。

工作原理

Headers 交换机使用消息的 headers 属性进行路由匹配,类似于 HTTP 请求的 headers。绑定队列时可以指定一组键值对,消息被路由到队列的条件是:

  • 消息的 headers 包含绑定中指定的所有键
  • 消息 headers 中对应的值与绑定中指定的值匹配(或者使用 x-match=any,表示只要有一个匹配即可)

Headers 交换机在实际应用中使用较少,因为主题交换机通常可以更简洁地实现类似功能。

4.5 默认交换机(Default Exchange)

默认交换机是 RabbitMQ 自带的一个直接交换机,它有以下特点:

  • 没有名字(空字符串)
  • 每个队列都会自动绑定到默认交换机,绑定键是队列的名称
  • 当发送消息时不指定交换机,消息会被发送到默认交换机

默认交换机简化了简单场景的使用,例如只需要将消息发送到指定队列的情况。

五、RabbitMQ 高级特性

5.1 消息确认机制

为了确保消息的可靠传递,RabbitMQ 提供了完善的消息确认机制,包括生产者确认和消费者确认。

生产者确认(Publisher Confirm)

生产者确认机制确保消息成功发送到交换机。在配置文件中开启:

spring:rabbitmq:publisher-confirm-type: correlated
  • none:禁用确认机制
  • simple:简单确认模式,同步等待确认
  • correlated:关联确认模式,异步回调确认

代码实现如前面生产者示例中所示,通过设置ConfirmCallback来处理确认结果。

生产者返回(Publisher Return)

当消息成功发送到交换机,但无法路由到任何队列时,可以通过返回机制获取通知:

spring:rabbitmq:publisher-returns: true

代码实现如前面生产者示例中所示,通过设置ReturnsCallback来处理返回消息。

消费者确认(Consumer Acknowledgment)

消费者确认机制确保消息被成功处理。在配置文件中设置:

spring:rabbitmq:listener:simple:acknowledge-mode: manual
  • none:自动确认,消息一旦被接收,立即确认
  • auto:根据消息处理情况自动确认
  • manual:手动确认,需要调用channel.basicAck()方法

手动确认的三种方式:

  1. channel.basicAck(deliveryTag, multiple):确认消息成功处理
  2. channel.basicNack(deliveryTag, multiple, requeue):否定确认,可以选择是否重新入队
  3. channel.basicReject(deliveryTag, requeue):否定单个消息,可以选择是否重新入队

5.2 消息持久化

为了防止 RabbitMQ 服务器重启导致消息丢失,需要配置消息持久化:

  1. 交换机持久化:创建交换机时设置durable=true
  2. 队列持久化:创建队列时设置durable=true
  3. 消息持久化:发送消息时设置消息的deliveryMode=2

在 Spring AMQP 中,默认情况下交换机和队列都是持久化的,消息持久化可以通过以下方式设置:

// 发送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;
});

5.3 死信队列(Dead Letter Queue)

死信队列用于处理无法被正常消费的消息,当消息满足以下条件之一时,会被发送到死信队列:

  1. 消息被消费者拒绝(basicRejectbasicNack),并且requeue=false
  2. 消息过期
  3. 队列达到最大长度,无法再添加新消息
死信队列配置
/*** 死信交换机*/
@Bean
public DirectExchange deadLetterExchange() {return new DirectExchange(RabbitMqConstant.DEAD_LETTER_EXCHANGE, true, false);
}/*** 死信队列*/
@Bean
public Queue deadLetterQueue() {return QueueBuilder.durable(RabbitMqConstant.DEAD_LETTER_QUEUE).build();
}/*** 绑定死信队列到死信交换机*/
@Bean
public Binding bindDeadLetterQueue(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(RabbitMqConstant.DEAD_LETTER_ROUTING_KEY);
}/*** 业务队列(设置死信相关参数)*/
@Bean
public Queue businessQueue() {return QueueBuilder.durable(RabbitMqConstant.BUSINESS_QUEUE)// 设置死信交换机.withArgument("x-dead-letter-exchange", RabbitMqConstant.DEAD_LETTER_EXCHANGE)// 设置死信路由键.withArgument("x-dead-letter-routing-key", RabbitMqConstant.DEAD_LETTER_ROUTING_KEY)// 设置消息过期时间(毫秒).withArgument("x-message-ttl", 60000)// 设置队列最大长度.withArgument("x-max-length", 1000).build();
}

5.4 延迟队列

延迟队列用于处理需要延迟执行的任务,例如订单超时未支付自动取消、定时提醒等。RabbitMQ 本身不直接支持延迟队列,但可以通过以下两种方式实现:

  1. 使用消息的过期时间(TTL)结合死信队列
  2. 使用 rabbitmq-delayed-message-exchange 插件
使用插件实现延迟队列

首先安装插件:

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 复制到容器中
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/plugins# 进入容器
docker exec -it rabbitmq bash# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

配置延迟交换机:

/*** 创建延迟交换机*/
@Bean
public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>(1);// 设置交换机类型args.put("x-delayed-type", "direct");// 参数说明:交换机名称、类型、是否持久化、是否自动删除、附加参数return new CustomExchange(RabbitMqConstant.DELAYED_EXCHANGE,"x-delayed-message",true,false,args);
}

发送延迟消息:

/*** 发送延迟消息*/
public void sendDelayedMessage(OrderMessage message, long delayMillis) {Objects.requireNonNull(message, "消息不能为空");if (delayMillis <= 0) {throw new IllegalArgumentException("延迟时间必须大于0");}String messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送延迟消息,延迟时间: {}ms,消息: {}", delayMillis, message);// 发送延迟消息,通过x-delay header设置延迟时间(毫秒)rabbitTemplate.convertAndSend(RabbitMqConstant.DELAYED_EXCHANGE,RabbitMqConstant.DELAYED_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setHeader("x-delay", delayMillis);return msg;},correlationData);
}

5.5 优先级队列

优先级队列允许消息按照优先级进行排序,优先级高的消息会被优先消费。适用于需要处理紧急任务的场景。

配置优先级队列:

/*** 创建优先级队列*/
@Bean
public Queue priorityQueue() {// 设置队列最大优先级为10Map<String, Object> args = new HashMap<>(1);args.put("x-max-priority", 10);return QueueBuilder.durable(RabbitMqConstant.PRIORITY_QUEUE).withArguments(args).build();
}

发送带优先级的消息:

/*** 发送带优先级的消息*/
public void sendPriorityMessage(String content, int priority) {StringUtils.hasText(content, "消息内容不能为空");if (priority < 0 || priority > 10) {throw new IllegalArgumentException("优先级必须在0-10之间");}String messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);log.info("准备发送优先级消息,优先级: {},内容: {}", priority, content);// 发送带优先级的消息rabbitTemplate.convertAndSend(RabbitMqConstant.PRIORITY_EXCHANGE,RabbitMqConstant.PRIORITY_ROUTING_KEY,content,msg -> {msg.getMessageProperties().setPriority(priority);return msg;},correlationData);
}

六、RabbitMQ 集群与高可用

对于生产环境,单节点的 RabbitMQ 存在单点故障风险,因此需要部署 RabbitMQ 集群以提高可用性。

6.1 集群架构

RabbitMQ 集群通常由多个节点组成,分为以下角色:

  • 磁盘节点(Disk Node):存储集群元数据(交换机、队列、绑定等)
  • 内存节点(RAM Node):元数据存储在内存中,性能更好

集群中至少需要一个磁盘节点,建议生产环境配置 3 个节点(2 个磁盘节点 + 1 个内存节点)。

6.2 集群搭建(Docker Compose)

创建 docker-compose.yml 文件:

version: '3.8'services:rabbitmq1:image: rabbitmq:3.13-managementcontainer_name: rabbitmq1ports:- "5672:5672"- "15672:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq1_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkrabbitmq2:image: rabbitmq:3.13-managementcontainer_name: rabbitmq2ports:- "5673:5672"- "15673:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq2_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkdepends_on:- rabbitmq1rabbitmq3:image: rabbitmq:3.13-managementcontainer_name: rabbitmq3ports:- "5674:5672"- "15674:15672"environment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=admin- RABBITMQ_ERLANG_COOKIE=rabbitmq_cookievolumes:- rabbitmq3_data:/var/lib/rabbitmqnetworks:- rabbitmq_networkdepends_on:- rabbitmq1networks:rabbitmq_network:driver: bridgevolumes:rabbitmq1_data:rabbitmq2_data:rabbitmq3_data:

启动集群:

docker-compose up -d

将节点加入集群:

# 进入rabbitmq2容器
docker exec -it rabbitmq2 bash# 停止节点
rabbitmqctl stop_app# 加入集群(以内存节点方式)
rabbitmqctl join_cluster --ram rabbit@rabbitmq1# 启动节点
rabbitmqctl start_app# 退出容器
exit# 进入rabbitmq3容器
docker exec -it rabbitmq3 bash# 停止节点
rabbitmqctl stop_app# 加入集群(以内存节点方式)
rabbitmqctl join_cluster --ram rabbit@rabbitmq1# 启动节点
rabbitmqctl start_app# 退出容器
exit

查看集群状态:

docker exec -it rabbitmq1 rabbitmqctl cluster_status

6.3 镜像队列(Mirror Queue)

镜像队列是 RabbitMQ 提供的高可用方案,它会将队列镜像到集群中的多个节点,当主节点故障时,镜像节点会自动接管。

设置镜像策略:

# 为所有队列设置镜像策略,同步到所有节点
docker exec -it rabbitmq1 rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'# 为特定队列设置镜像策略,同步到2个节点
docker exec -it rabbitmq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • ha-mode: 镜像模式
    • all: 镜像到所有节点
    • exactly: 镜像到指定数量的节点
    • nodes: 镜像到指定的节点列表
  • ha-params: 配合 ha-mode 使用的参数
  • ha-sync-mode: 同步模式
    • manual: 手动同步
    • automatic: 自动同步

6.4 客户端连接集群

Spring Boot 客户端配置集群:

spring:rabbitmq:addresses: localhost:5672,localhost:5673,localhost:5674username: adminpassword: adminvirtual-host: /connection-timeout: 10000# 开启自动恢复连接publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:acknowledge-mode: manualconcurrency: 5max-concurrency: 10prefetch: 10

七、RabbitMQ 性能优化

为了充分发挥 RabbitMQ 的性能,需要进行合理的配置和优化。

7.1 生产者优化

  1. 使用批量发送:减少网络交互次数
// 批量发送消息
public void batchSendMessages(List<OrderMessage> messages) {if (CollectionUtils.isEmpty(messages)) {log.warn("批量发送消息列表为空,不执行发送操作");return;}log.info("准备批量发送 {} 条消息", messages.size());rabbitTemplate.invoke(operations -> {for (OrderMessage message : messages) {operations.convertAndSend(RabbitMqConstant.DIRECT_EXCHANGE,RabbitMqConstant.ORDER_CREATE_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));}return null;});log.info("批量发送消息完成,共发送 {} 条", messages.size());
}
  1. 使用异步发送:避免阻塞主线程
  2. 合理设置消息大小:太大的消息会影响性能,建议拆分或使用外部存储

7.2 消费者优化

  1. 合理设置并发数:根据服务器性能调整消费者并发数
spring:rabbitmq:listener:simple:concurrency: 5    # 最小并发数max-concurrency: 10 # 最大并发数
  1. 设置 prefetch count:控制消费者一次预取的消息数量
spring:rabbitmq:listener:simple:prefetch: 10  # 每次预取10条消息
  1. 消息处理异步化:消费者接收到消息后,尽快确认,然后异步处理业务逻辑

7.3 服务器优化

  1. 调整内存限制:默认情况下,当 RabbitMQ 使用的内存超过节点内存的 40% 时,会停止接收新消息
# 设置内存限制为50%
rabbitmqctl set_vm_memory_high_watermark 0.5# 设置内存限制为固定值(如2GB)
rabbitmqctl set_vm_memory_high_watermark absolute 2GB
  1. 设置磁盘空间限制:当磁盘空间低于指定阈值时,RabbitMQ 会停止接收新消息
# 设置磁盘空间限制为500MB
rabbitmqctl set_disk_free_limit 500MB
  1. 合理选择存储类型:根据业务需求选择合适的存储类型(如持久化或非持久化)

八、RabbitMQ 在实际项目中的应用

8.1 异步处理

场景:用户下单后,需要发送短信通知、更新库存、记录日志等操作。

使用 RabbitMQ 将这些操作异步化,提高主流程的响应速度:

/*** 订单服务*/
@Service
@Slf4j
public class OrderService {private final OrderMapper orderMapper;private final RabbitTemplate rabbitTemplate;@Autowiredpublic OrderService(OrderMapper orderMapper, RabbitTemplate rabbitTemplate) {this.orderMapper = orderMapper;this.rabbitTemplate = rabbitTemplate;}/*** 创建订单*/@Transactionalpublic Order createOrder(OrderCreateDTO orderCreateDTO) {// 参数校验Objects.requireNonNull(orderCreateDTO, "订单创建参数不能为空");StringUtils.hasText(orderCreateDTO.getUserId(), "用户ID不能为空");Objects.requireNonNull(orderCreateDTO.getAmount(), "订单金额不能为空");// 创建订单Order order = new Order();order.setOrderId(System.currentTimeMillis());order.setUserId(orderCreateDTO.getUserId());order.setAmount(orderCreateDTO.getAmount());order.setStatus(OrderStatus.CREATED.getCode());order.setCreateTime(LocalDateTime.now());// 保存订单int rows = orderMapper.insert(order);if (rows != 1) {throw new BusinessException("创建订单失败");}log.info("订单创建成功,订单ID: {}", order.getOrderId());// 发送订单创建消息(异步处理后续操作)sendOrderCreatedMessage(order);return order;}/*** 发送订单创建消息*/private void sendOrderCreatedMessage(Order order) {OrderMessage message = new OrderMessage();message.setOrderId(order.getOrderId());message.setUserId(order.getUserId());message.setAmount(order.getAmount());message.setStatus(order.getStatus());message.setCreateTime(order.getCreateTime());rabbitTemplate.convertAndSend(RabbitMqConstant.ORDER_EXCHANGE,RabbitMqConstant.ORDER_CREATED_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));log.info("订单创建消息已发送,订单ID: {}", order.getOrderId());}
}

8.2 应用解耦

场景:电商系统中,订单系统、库存系统、支付系统、物流系统需要相互协作,但又要保持松耦合。

使用 RabbitMQ 实现系统间的解耦,每个系统只需要关注自己感兴趣的事件:

  • 订单系统:发送订单创建、订单取消事件
  • 库存系统:监听订单创建事件,扣减库存;监听订单取消事件,恢复库存
  • 支付系统:监听订单创建事件,生成支付单
  • 物流系统:监听支付成功事件,创建物流单

8.3 流量削峰

场景:秒杀活动中,瞬间会有大量请求涌入,可能导致系统过载。

使用 RabbitMQ 作为缓冲,控制请求处理速度:

  1. 前端将秒杀请求发送到 RabbitMQ
  2. 后端消费者以固定速率处理消息,避免系统过载
  3. 对于超出处理能力的请求,直接返回 "秒杀失败" 或 "排队中"
/*** 秒杀服务*/
@Service
@Slf4j
public class SeckillService {private final RabbitTemplate rabbitTemplate;private final StringRedisTemplate redisTemplate;// 秒杀商品库存Redis键前缀private static final String SECKILL_STOCK_KEY_PREFIX = "seckill:stock:";@Autowiredpublic SeckillService(RabbitTemplate rabbitTemplate, StringRedisTemplate redisTemplate) {this.rabbitTemplate = rabbitTemplate;this.redisTemplate = redisTemplate;}/*** 提交秒杀请求*/public SeckillResultDTO submitSeckill(SeckillRequestDTO requestDTO) {// 参数校验Objects.requireNonNull(requestDTO, "秒杀请求参数不能为空");StringUtils.hasText(requestDTO.getUserId(), "用户ID不能为空");Objects.requireNonNull(requestDTO.getGoodsId(), "商品ID不能为空");log.info("用户 {} 提交秒杀请求,商品ID: {}", requestDTO.getUserId(), requestDTO.getGoodsId());// 先检查库存(Redis中预减库存)String stockKey = SECKILL_STOCK_KEY_PREFIX + requestDTO.getGoodsId();Long remainStock = redisTemplate.opsForValue().decrement(stockKey);if (remainStock == null || remainStock < 0) {// 库存不足,恢复计数器if (remainStock != null && remainStock < 0) {redisTemplate.opsForValue().increment(stockKey);}log.warn("商品 {} 库存不足,用户 {} 秒杀失败", requestDTO.getGoodsId(), requestDTO.getUserId());return new SeckillResultDTO(false, "商品已抢完", null);}// 库存充足,发送秒杀消息到队列SeckillMessage message = new SeckillMessage();message.setUserId(requestDTO.getUserId());message.setGoodsId(requestDTO.getGoodsId());message.setRequestTime(LocalDateTime.now());String messageId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend(RabbitMqConstant.SECKILL_EXCHANGE,RabbitMqConstant.SECKILL_ROUTING_KEY,message,new CorrelationData(messageId));log.info("用户 {} 秒杀请求已加入队列,商品ID: {}", requestDTO.getUserId(), requestDTO.getGoodsId());// 返回排队中状态,前端可以轮询查询结果return new SeckillResultDTO(true, "正在排队中", messageId);}
}

九、常见问题与解决方案

9.1 消息丢失问题

可能的原因及解决方案:

  1. 生产者发送消息丢失

    • 启用生产者确认机制(publisher confirm)
    • 实现消息重发机制
  2. RabbitMQ 服务器丢失消息

    • 配置交换机、队列、消息的持久化
    • 部署 RabbitMQ 集群,使用镜像队列
  3. 消费者处理消息丢失

    • 使用手动确认模式(manual ack)
    • 确保消息处理完成后再确认

9.2 消息重复消费问题

可能的原因及解决方案:

  1. 网络问题导致确认消息丢失

    • 实现消息幂等性处理
    • 使用消息 ID 去重
  2. 消费者处理超时

    • 合理设置消息处理超时时间
    • 拆分长任务为多个短任务

实现幂等性的示例:

/*** 幂等性处理器*/
@Component
public class IdempotentHandler {private final StringRedisTemplate redisTemplate;// 消息处理记录前缀private static final String MESSAGE_PROCESSED_PREFIX = "message:processed:";// 锁前缀private static final String LOCK_PREFIX = "lock:message:";// 锁过期时间(秒)private static final long LOCK_EXPIRE_SECONDS = 30;@Autowiredpublic IdempotentHandler(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}/*** 处理消息,确保幂等性* * @param messageId 消息ID* @param handler 消息处理器* @return 处理结果*/public <T> T handleWithIdempotency(String messageId, Supplier<T> handler) {StringUtils.hasText(messageId, "消息ID不能为空");Objects.requireNonNull(handler, "消息处理器不能为空");// 检查消息是否已处理String processedKey = MESSAGE_PROCESSED_PREFIX + messageId;Boolean isProcessed = redisTemplate.hasKey(processedKey);if (Boolean.TRUE.equals(isProcessed)) {log.info("消息 {} 已处理,跳过重复处理", messageId);return null;}// 获取分布式锁String lockKey = LOCK_PREFIX + messageId;Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);if (Boolean.FALSE.equals(locked)) {log.warn("消息 {} 获取锁失败,可能有其他进程正在处理", messageId);return null;}try {// 再次检查,防止并发问题isProcessed = redisTemplate.hasKey(processedKey);if (Boolean.TRUE.equals(isProcessed)) {log.info("消息 {} 已处理,跳过重复处理", messageId);return null;}// 处理消息T result = handler.get();// 标记消息已处理,设置过期时间(例如24小时)redisTemplate.opsForValue().set(processedKey, "1", 24, TimeUnit.HOURS);return result;} finally {// 释放锁redisTemplate.delete(lockKey);}}
}

9.3 消息堆积问题

可能的原因及解决方案:

  1. 生产者发送速度过快

    • 实现流量控制
    • 使用令牌桶算法限制发送速度
  2. 消费者处理速度过慢

    • 增加消费者数量
    • 优化消费逻辑,提高处理速度
    • 拆分大消息为小消息
  3. 系统资源不足

    • 增加服务器资源
    • 优化 JVM 参数

监控消息堆积情况:

/*** 消息队列监控服务*/
@Component
@Slf4j
public class QueueMonitorService {private final RabbitAdmin rabbitAdmin;// 队列最大消息数阈值private static final int QUEUE_MAX_MESSAGE_THRESHOLD = 10000;@Autowiredpublic QueueMonitorService(RabbitAdmin rabbitAdmin) {this.rabbitAdmin = rabbitAdmin;}/*** 检查队列消息堆积情况*/@Scheduled(fixedRate = 60000) // 每分钟检查一次public void checkQueueMessageCount() {log.info("开始检查队列消息堆积情况");// 获取所有队列List<Queue> queues = rabbitAdmin.getQueues();if (CollectionUtils.isEmpty(queues)) {log.info("没有找到任何队列");return;}for (Queue queue : queues) {// 获取队列信息QueueInformation queueInfo = rabbitAdmin.getQueueInfo(queue.getName());if (queueInfo == null) {log.warn("获取队列 {} 信息失败", queue.getName());continue;}// 队列消息数long messageCount = queueInfo.getMessageCount();log.info("队列 {} 当前消息数: {}", queue.getName(), messageCount);// 如果消息数超过阈值,发送告警if (messageCount > QUEUE_MAX_MESSAGE_THRESHOLD) {log.error("队列 {} 消息堆积严重,当前消息数: {},阈值: {}",queue.getName(), messageCount, QUEUE_MAX_MESSAGE_THRESHOLD);// 发送告警(可以是邮件、短信、企业微信等)sendQueueAlert(queue.getName(), messageCount);}}log.info("队列消息堆积情况检查完成");}/*** 发送队列告警*/private void sendQueueAlert(String queueName, long messageCount) {// 实现告警逻辑// 例如:调用告警服务发送通知}
}

十、总结与展望

RabbitMQ 作为一款成熟稳定的消息中间件,凭借其灵活的路由机制、可靠的消息传递和丰富的高级特性,在分布式系统中得到了广泛应用。本文从核心概念、环境搭建、交换机类型、高级特性、集群部署、性能优化等多个方面详细介绍了 RabbitMQ,并通过实例代码展示了其在实际项目中的应用。

随着微服务架构的普及,消息队列作为服务间通信的重要手段,其作用将更加凸显。RabbitMQ 社区也在不断发展,未来可能会推出更多新特性,如更好的流处理支持、更优的性能等。

对于开发者而言,掌握 RabbitMQ 不仅能够解决实际项目中的问题,还能加深对分布式系统设计思想的理解。在使用 RabbitMQ 时,应根据具体业务场景选择合适的交换机类型和高级特性,同时关注消息的可靠性、一致性和系统的性能。

    参考资料

    1. RabbitMQ 官方文档:RabbitMQ Documentation | RabbitMQ
    2. Spring AMQP 官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/
    3. 《RabbitMQ 实战指南》,朱忠华著
    4. 《Spring Cloud 微服务实战》,翟永超
    5. RabbitMQ GitHub 仓库:https://github.com/rabbitmq/rabbitmq-server
    http://www.dtcms.com/a/358136.html

    相关文章:

  • (纯新手教学)计算机视觉(opencv)实战十——轮廓特征(轮廓面积、 轮廓周长、外接圆与外接矩形)
  • 在Kotlin中安全的管理资源
  • 突破视界的边界:16公里远距离无人机图传模块全面解析
  • 神经网络激活函数:从ReLU到前沿SwiGLU
  • 华为对“业务对象”是怎样定义与应用的?
  • Linux网络服务发现在VPS云服务器自动化配置的关键技术与实践
  • 运维底线:一场关于原则与妥协的思辨
  • 4-ATSAM3X8E-FLASH写入
  • var maxScore = Int.MinValue 详解
  • 简易TCP网络程序
  • Kafka 主题级配置从创建到优化
  • CSS学习与心得分享
  • 【lua】table基础操作
  • 欧司朗对Spider Farmer提起专利诉讼
  • Vue常用指令和生命周期
  • TimeDP Learning to Generate Multi-Domain Time Series with Domain Prompts论文阅读笔记
  • Kubernetes 部署与发布完全指南:从 Pod 到高级发布策略
  • 一款支持动态定义路径的JAVA内存马维权工具Agenst
  • ifconfig 和 ip addr show 输出详细解读
  • `basic_filebuf`、`basic_ifstream`、`basic_ofstream`和 `basic_fstream`。
  • 【高级机器学习】 4. 假设复杂度与泛化理论详解
  • 【超全汇总】MySQL服务启动命令手册(Linux+Windows+macOS)(上)
  • React前端开发_Day10
  • 针对 “TCP 连接建立阶段” 的攻击
  • PAT 1088 Rational Arithmetic
  • android adb调试 鸿蒙
  • 微信小程序长按识别图片二维码
  • mysql的内置函数
  • psql介绍(PostgreSQL命令行工具)(pgAdmin内置、DBeaver、Azure Data Studio)数据库命令行工具
  • 三数之和,leetCode热题100,C++实现