【 mq】 mq学习笔记
目录
MQ的主要作用有以下几个方面
异步
解耦合
削峰
RocketMq的核心概念
主题 (Topic)
标签 (Tag)
生产者 (Producer)
消费者 (Consumer)
代理服务器 (Broker)
名称服务器 (NameServer)
消息队列 (MessageQueue)
RocketMq代码实现
一、pom.xml文件
二、核心代码实现
1. 配置文件 (application.yml)
2. 实体类 (Entity)
3. MQ消息体 (DTO)
4. 控制器 (Controller)
5. MQ消费者 (Consumer)
消费者的调用链路是什么样的
Spring Boot应用启动 (入口起点):
监听器容器初始化:
MQ客户端长连接拉取消息:
消息到达,触发回调 (真正的调用入口):
你的业务逻辑执行:
6. 服务层 (Service)
三、流程总结与学习要点
MQ的主要作用有以下几个方面
异步
举一个例子:如果快递员发送快递直接发送给收件人,每寄一个然后对方收一个,收完之后再返回,这样显然效率太低了。因为快递员只有等待上一个邮件签收完毕之后才可以发送下一个。
如果这个时候引入了一个“蜂巢”或者“菜鸟驿站”这样的产品,快递员把快递放在这些地方,然后收件人等有时间了就去收件,这样子效率就会明显提高了。
解耦合
比如,点击了“下单”之后,订单服务会生产出来一个订单。同时库存服务,物流系统,营销系统也会对这些订单进行“消费”。但是这个时候,如果订单服务必须要等待其他服务都创建完成,这样才可以完成下单成功,这样明显是不合理的。因此,有必要实现“解耦合”,也就是:
订单服务发送了一个消息给到“中间件”,然后其他服务就通过这个“中间价”去消费这个内容,不必等到其他服务消费完成,这样就可以实现“解耦合”了。
削峰
MQ就像一个巨大的水库或缓冲带,挡在应用服务器和数据库之间。
流程变成了:
用户请求 -> 应用服务器 -> [消息队列MQ] -> 下游系统(如数据库)
-
秒杀开始,一秒内依然来了10万个下单请求。
-
应用服务器 的处理逻辑变得非常简单:只做最基本的校验(如用户是否登录),然后迅速地将订单信息作为一个“消息”扔到MQ里,就立即返回用户“排队中,请等待结果”的提示。这个操作非常快,可能一秒钟就能处理好几千个。
-
此时,用户的请求压力在MQ这里被挡住了。10万个订单消息在MQ的队列里整齐地排着队,而不是直接压垮数据库。这就是“削峰”——把一瞬间的巨峰,削成了一个平缓的小山坡。
-
下游系统(消费者),按照自己能承受的速度,平稳地从MQ里拉取消息(比如每秒拉取200个),然后慢慢地写入数据库。
最终效果:
前端用户:不会看到系统崩溃,只会收到“排队中”的友好提示,体验更好。
应用服务器:因为快速投递消息后就能释放连接,所以不会被拖垮,始终保持健康。
数据库:按照自己最舒服的节奏处理任务,压力平稳,永远不会被冲垮。
整体系统:可用性和稳定性极大提高。
RocketMq的核心概念
-
主题 (Topic)
-
比喻:邮件的目的地或类型。比如“家书”、“商品订单”、“物流通知”。你想寄哪种类型的信,就放到对应的邮筒里。
-
解释:这是消息的分类标签,生产者和消费者都基于Topic进行通信。比如,所有下单的消息都发到
ORDER_TOPIC
,所有支付的消息都发到PAYMENT_TOPIC
。
-
-
标签 (Tag)
-
比喻:对Topic的进一步细分。比如“家书”这个Topic下,可以有“急件”、“平信”、“问候卡”等Tag。
-
解释:用来对同一个Topic下的消息进行更精细的分类。消费者可以只订阅他们感兴趣的特定Tag,实现消息过滤。例如,可以只处理
ORDER_TOPIC
下TagA
为“创建订单”的消息,而不处理“取消订单”的消息。
-
-
生产者 (Producer)
-
比喻:寄信的人。负责写好信、填好地址、贴上邮票,然后把信投递到邮筒。
-
解释:消息的发送方(通常是你的业务系统)。它创建消息,指定Topic和Tag,然后发送给RocketMQ服务器。
-
-
消费者 (Consumer)
-
比喻:收信的人。每天会去邮箱检查有没有寄给自己的信,如果有就取出来处理(阅读、执行信里的要求)。
-
解释:消息的接收方(另一个业务系统)。它向RocketMQ订阅它关心的Topic(和Tag),然后MQ就会把相关的消息推送给它(或它主动拉取),它拿到消息后执行自己的业务逻辑(如更新库存、发短信)。
-
-
代理服务器 (Broker)
-
比喻:邮局。是整个邮政系统的核心枢纽。负责从邮筒(接收Producer的信件)、分拣、存储、最终把信派送到收件人的邮箱。
-
解释:RocketMQ的核心节点,负责消息的接收、存储、投递。消息真正是存在Broker上的。它非常强大,要处理高并发、海量数据,保证消息不丢。
-
-
名称服务器 (NameServer)
-
比喻:通讯录或问讯处。它知道整个城市里所有邮局(Broker)的地址列表以及每个邮局负责哪些区域(Topic)。
-
解释:一个简单的注册中心,功能有点像DNS。Producer和Consumer在发送或消费消息前,先去NameServer问:“
ORDER_TOPIC
这个Topic在哪个Broker上?” NameServer返回Broker的地址,然后客户端再直接和Broker打交道。它是无状态的,意味着每个NameServer节点都很简单,互相独立,因此非常稳定,不容易出故障。
-
-
消息队列 (MessageQueue)
-
比喻:邮局(Broker)内部的分拣流水线。一个邮局里有多个流水线并行工作,效率更高。
-
解释:一个Topic在物理上会被分成多个Queue,分布在不同的Broker上。这是RocketMQ实现高并发、高吞吐量和顺序消息的关键机制。
-
并行消费:多个消费者可以同时从同一个Topic的不同Queue消费消息,极大提高速度。
-
顺序消息:保证发送到同一个Queue的消息,被同一个消费者按顺序处理。(比如,一个订单的创建、付款、发货消息必须按顺序处理,那就让它们都进入同一个Queue)。
-
-
RocketMq代码实现
一、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.taobao.demo</groupId><artifactId>618-promotion</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version> <!-- 选用一个稳定版本 --><relativePath/></parent><properties><java.version>1.8</java.version><rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version><mybatis-plus-boot-starter.version>3.5.5</mybatis-plus-boot-starter.version></properties><dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter.version}</version></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus-boot-starter.version}</version></dependency><!-- MySQL Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- Redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Lombok (简化实体类代码) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- HikariCP (Spring Boot 2.x 默认自带,也可显式引入) --><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency><!-- Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
二、核心代码实现
1. 配置文件 (application.yml
)
server:port: 8080spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/taobao_618?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: 123456redis:host: localhostport: 6379database: 0# password: 如果你的Redis有密码rocketmq:name-server: localhost:98766 # RocketMQ NameServer 地址producer:group: 618-producer-group # 生产者组名
2. 实体类 (Entity
)
// Order.java
@Data
@TableName("t_order")
public class Order {@TableId(type = IdType.AUTO)private Long id;private String orderSn; // 订单号private Long userId; // 用户IDprivate Long productId; // 商品IDprivate Integer quantity; // 购买数量private BigDecimal amount; // 订单金额private Integer status; // 订单状态 (0:待付款,1:已付款,2:已发货...)private Date createTime;
}// Product.java (商品实体,简化)
@Data
@TableName("t_product")
public class Product {@TableId(type = IdType.AUTO)private Long id;private String name;private BigDecimal price;private Integer stock; // 数据库库存
}
3. MQ消息体 (DTO
)
// OrderMessage.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage implements Serializable { // 必须序列化private Long userId;private Long productId;private Integer quantity;
}
4. 控制器 (Controller
)
// SecKillController.java
@RestController
@RequestMapping("/api/seckill")
@Slf4j
public class SecKillController {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final String PRODUCT_STOCK_PREFIX = "product:stock:";private static final String ORDER_TOPIC = "618_ORDER_TOPIC";/*** 秒杀接口* 1. 检查Redis预库存* 2. 扣减Redis预库存* 3. 发送创建订单消息到MQ*/@PostMapping("/{productId}")public String seckill(@PathVariable Long productId, @RequestParam Long userId, @RequestParam Integer quantity) {String stockKey = PRODUCT_STOCK_PREFIX + productId;// 1. 检查库存 (使用Redis的原子操作,避免超卖)Long stock = redisTemplate.opsForValue().decrement(stockKey, quantity);if (stock == null || stock < 0) {// 库存不足,需要把减掉的库存加回来redisTemplate.opsForValue().increment(stockKey, quantity);log.warn("商品[{}]库存不足,抢购失败", productId);return "库存不足,抢购失败!";}// 2. 库存扣减成功,组装订单消息,发送到MQOrderMessage orderMessage = new OrderMessage(userId, productId, quantity);try {// 使用同步发送,确保消息一定发送成功,如果失败会抛异常,进入catch块SendResult sendResult = rocketMQTemplate.syncSend(ORDER_TOPIC, orderMessage);log.info("用户[{}]抢购商品[{}]成功,消息已发送至MQ。MessageId: {}", userId, productId, sendResult.getMsgId());return "抢购成功,订单处理中...";} catch (Exception e) {log.error("MQ消息发送失败:", e);// MQ发送失败,代表订单创建可能会失败,需要回滚Redis库存redisTemplate.opsForValue().increment(stockKey, quantity);return "系统繁忙,请重试!";}}
}
5. MQ消费者 (Consumer
)
// OrderConsumer.java
@Component
@RocketMQMessageListener(topic = "618_ORDER_TOPIC",consumerGroup = "618-order-consumer-group"
)
@Slf4j
public class OrderConsumer implements RocketMQListener<OrderMessage> {@Autowiredprivate OrderService orderService;@Overridepublic void onMessage(OrderMessage message) {log.info("收到创建订单消息: {}", message);try {// 真正地创建订单、扣减数据库库存等耗时操作orderService.createOrder(message);} catch (Exception e) {log.error("创建订单失败,消息: {}, 错误原因: {}", message, e.getMessage());// 此处可以根据业务需要进行重试(RocketMQ自带重试机制)或者将消息存入死信队列人工处理// 如果创建订单失败,也需要回滚Redis中的库存(可选,根据业务严谨性决定)// throw new RuntimeException(e); // 抛出异常,MQ会自动进行重试}}
}
消费者的调用链路是什么样的
-
Spring Boot应用启动 (
入口起点
):-
当你的SpringBoot应用启动时,它会进行自动配置和组件扫描。
-
它扫描到你的
OrderConsumer
类带有@Component
和@RocketMQMessageListener
注解。
-
-
监听器容器初始化:
-
RocketMQSpringBoot
的自动配置类会为你创建一个消息监听器容器(MessageListenerContainer)。 -
这个容器会根据
@RocketMQMessageListener
注解上的配置(topic
,consumerGroup
,nameServer
等),在后台自动创建一个RocketMQ的消费者实例(DefaultMQPushConsumer),并订阅你指定的Topic。
-
-
MQ客户端长连接拉取消息:
-
这个RocketMQ消费者实例会与
NameServer
通信,找到对应的Broker
,并与Broker建立长连接。 -
它会不断地、自动地向Broker拉取(Pull) 或者等待Broker推送(Push) 属于它消费组的消息。这个过程对你来说是完全透明的,你不需要写任何网络通信代码。
-
-
消息到达,触发回调 (
真正的调用入口
):-
当Broker有新的消息到达
618_ORDER_TOPIC
时,RocketMQ的客户端SDK会接收到这条消息。 -
客户端SDK然后会回调它内部注册好的消息处理方法。
-
因为你的
OrderConsumer
类实现了RocketMQListener<OrderMessage>
接口,所以SDK最终调用的就是这个接口唯一的onMessage(OrderMessage message)
方法。
-
-
你的业务逻辑执行:
-
现在,流程就进入了你写的
onMessage
方法内部。message
参数已经被Spring和RocketMQ的客户端自动反序列化成了OrderMessage
对象。 -
接着,你调用
orderService.createOrder(message)
,执行你自己的业务逻辑。
-
类比帮助理解
你可以把它类比成一个更加熟悉的东西:Spring MVC中的
@RestController
。
在Web中:
你用一个
@RestController
和@PostMapping("/orders")
标注一个方法。Tomcat(Servlet容器) 在启动时注册了这个路由。
当有HTTP POST请求发到
/orders
时,Tomcat接收到请求,然后Spring MVC框架负责解析请求参数,最后自动调用你的方法。你不需要自己写代码去监听HTTP端口。
在RocketMQ中:
你用一个
@Component
和@RocketMQMessageListener
标注一个类。RocketMQListenerContainer(监听器容器) 在启动时注册了这个消费者。
当Broker有消息到达指定Topic时,RocketMQ客户端SDK接收到消息,然后RocketMQ-Spring框架负责解析消息体,最后自动调用你的
onMessage
方法。你不需要自己写代码去连接MQ和拉取消息。
6. 服务层 (Service
)
// OrderService.java
public interface OrderService {void createOrder(OrderMessage message);
}// OrderServiceImpl.java
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate ProductMapper productMapper;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final String PRODUCT_STOCK_PREFIX = "product:stock:";private static final String ORDER_SUCCESS_TOPIC = "ORDER_SUCCESS_TOPIC";@Transactional(rollbackFor = Exception.class) // 本地事务@Overridepublic void createOrder(OrderMessage message) {Long productId = message.getProductId();// 1. 再次检查数据库库存(严谨性检查)Product product = productMapper.selectById(productId);if (product.getStock() < message.getQuantity()) {log.error("数据库库存不足,商品ID: {}", productId);// 回滚Redis库存redisTemplate.opsForValue().increment(PRODUCT_STOCK_PREFIX + productId, message.getQuantity());throw new RuntimeException("数据库库存不足");}// 2. 扣减数据库库存int updateCount = productMapper.deductStock(productId, message.getQuantity());if (updateCount == 0) { // 使用乐观锁等方式更新,影响行数为0代表扣减失败log.error("扣减数据库库存失败,商品可能已被抢光,商品ID: {}", productId);redisTemplate.opsForValue().increment(PRODUCT_STOCK_PREFIX + productId, message.getQuantity());throw new RuntimeException("扣减库存失败");}// 3. 创建订单Order order = new Order();order.setUserId(message.getUserId());order.setProductId(productId);order.setQuantity(message.getQuantity());order.setAmount(product.getPrice().multiply(BigDecimal.valueOf(message.getQuantity())));order.setOrderSn(generateOrderSn()); // 生成唯一订单号order.setStatus(0); // 待付款order.setCreateTime(new Date());orderMapper.insert(order);log.info("订单创建成功,订单ID: {}, 订单号: {}", order.getId(), order.getOrderSn());// 4. 订单创建成功,发送成功消息到另一个Topic,通知其他服务(库存服务、短信服务、积分服务等)rocketMQTemplate.convertAndSend(ORDER_SUCCESS_TOPIC, order);}private String generateOrderSn() {return "ORDER" + System.currentTimeMillis() + (new Random().nextInt(9000) + 1000);}
}
三、流程总结与学习要点
-
用户请求
/api/seckill/{productId}
:-
瞬间流量被
Redis
的原子操作decrement
承接,性能极高,防止数据库被击垮。 -
判断结果立即返回给用户(成功/失败),用户体验好。
-
-
RocketMQ 削峰与异步化:
-
抢购成功的请求被瞬间转换成消息存入
RocketMQ
,实现了流量削峰。 -
主流程(抢购)和后续复杂流程(创建订单)解耦。
-
-
订单消费者:
-
OrderConsumer
慢慢从MQ中取出消息,以数据库能承受的速率执行真正的创建订单、扣减数据库库存等操作。
-
-
最终一致性:
-
通过
Redis
预扣库存和MQ
重试机制,保证了库存数据和订单状态的最终一致性。
-