使用MQ解耦点赞通知功能
SpringBoot异步化存在的问题
线程占用问题:使用@Async注解实现异步化时,若异步任务执行耗时较长或容易出错,会导致线程长时间占用。当业务量大时,会开启大量线程。
线程池限制:即使配置线程池,当超过线程池容量时,异步任务会退化为同步执行,影响主业务流程。
资源耗尽风险:未配置线程池时,异步线程数量会持续增长直至耗尽服务器资源,导致原有业务无法执行。
使用RocketMQ解耦
1)RocketMQ介绍
解耦原理:将业务逻辑拆分为发送端和消费端,通过消息队列进行通信。即使消费端处理缓慢或故障,发送端仍可正常运作。
架构优势:MQ的发送与接收可部署在不同服务器,实现逻辑隔离。常见MQ产品包括RocketMQ、Kafka、ActiveMQ等。
核心组件:
NameServer:类似注册中心,用于管理Broker
Broker:实际处理消息的服务端
2)集成RocketMQ
环境准备:
下载RocketMQ压缩包并解压
链接: 下载链接
配置环境变量ROCKETMQ_HOME指向安装目录
启动顺序:先启动NameServer(mqnamesrv.cmd),再启动Broker(mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true)
在application.properties中配置
runbroker
和 mqnamesrv
命令名称 | 名称分解 | 核心职责 |
---|---|---|
mqnamesrv | mq + name + srv(Name Server) | NameServer 服务进程,负责路由信息管理、服务注册与发现。 |
runbroker | run + broker(代理) | 启动 Broker 服务进程,负责消息的接收、存储、投递等核心功能。 |
-
mqnamesrv
这个名称直接点明了其代表的 RocketMQ 核心组件 NameServer。mq
代表 Message Queue,表明了它是 RocketMQ 消息队列系统的一部分。name
+srv
代表 Name Server,即命名服务。这精准地描述了它的核心功能:在 RocketMQ 集群中扮演服务发现和路由管理中心的角色。Broker 会向所有 NameServer 注册自己的信息,生产者和消费者则通过查询 NameServer 来获取路由信息,从而找到正确的 Broker 进行消息收发。
-
runbroker
这个名称则清晰地表明了它的执行动作和目标对象。run
是一个在计算机领域中非常通用的命令,意为运行或启动。broker
在消息队列系统中,通常指代理,也就是真正处理消息(接收、存储和转发消息)的核心服务器。在 RocketMQ 中,Broker 负责消息的持久化、投递、查询以及高可用保证等。- 因此,
runbroker
组合起来,意思就是启动 Broker 服务进程。当你在 RocketMQ 的bin
目录下执行mqbroker.cmd
(Windows) 或mqbroker
(Linux/Mac) 脚本时,这些脚本内部最终会调用runbroker
相关的脚本来实际启动 JVM 并运行 Broker 的主类。
小结
总的来说,mqnamesrv
和 runbroker
(通过 mqbroker
脚本调用)这两个命令的命名,都非常直观地反映了它们各自在 RocketMQ 系统中承担的关键角色:一个作为路由协调者(NameServer),另一个作为消息处理者(Broker)。
项目集成:
添加依赖:rocketmq-spring-boot-starter
<!-- rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
3)应用案例
发送消息:
在DocService中
注入RocketMQTemplate
rocketMQTemplate.convertAndSend("VOTE_TOPIC",docDB.getName()+"被点赞了");
使用convertAndSend方法发送消息,需指定topic和消息内容
接收消息:
创建消费者类实现RocketMQListener接口
使用@RocketMQMessageListener注解指定consumerGroup和topic
在onMessage方法中处理消息内容
package com.panda.wiki.rocketmq;import com.panda.wiki.service.WebSocketService;
import com.panda.wiki.websocket.WebSocketServer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** 投票主题消息消费者* 用于接收和处理投票相关的RocketMQ消息* * 使用说明:* 1. 本消费者会自动订阅 VOTE_TOPIC 主题的消息* 2. 消费者组为 default,确保同一消费者组的多个实例负载均衡消费消息* 3. 实现 RocketMQListener<MessageExt> 接口以获取完整的消息元数据*/
@Service
// RocketMQ消息监听器注解,配置消费者属性和订阅关系
@RocketMQMessageListener(consumerGroup = "default", // 消费者组名称,用于集群消费和负载均衡topic = "VOTE_TOPIC" // 订阅的主题名称,接收该主题的所有消息
)
public class VoteTopicConsumer implements RocketMQListener<MessageExt> {// 日志记录器,用于输出消费日志private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);@Resourceprivate WebSocketServer webSocketServer;/*** 消息处理回调方法* 当消费者收到消息时,RocketMQ会自动调用此方法* * @param messageExt RocketMQ消息对象,包含消息体、属性、标签等元数据* * 方法执行流程:* 1. 从MessageExt中提取消息体(byte[])* 2. 将字节数组转换为字符串* 3. 记录消息消费日志* 4. 实际业务中可在此处添加投票业务处理逻辑*/@Overridepublic void onMessage(MessageExt messageExt) {// 获取消息内容字节数组byte[] body = messageExt.getBody();// 将字节数组转换为字符串并记录日志// 实际项目中建议指定字符编码,如:new String(body, "UTF-8")LOG.info("ROCKETMQ收到消息: {}", new String(body));webSocketServer.sendInfo(new String(body));// 例如:投票统计、数据入库、通知其他系统等// 可以通过 messageExt 获取更多消息属性:// - messageExt.getMsgId() // 消息ID// - messageExt.getTopic() // 主题名称// - messageExt.getTags() // 消息标签(用于过滤)// - messageExt.getKeys() // 业务键(用于消息查询)// - messageExt.getBornTimestamp()// 消息产生时间// - messageExt.getBornHost() // 消息来源地址}
}
4)完整流程测试
测试验证:
点赞操作触发消息发送
消费端接收并打印消息日志
消费端调用WebSocket推送通知
前端展示点赞通知
5)代码提交
架构选择建议:
技术选型应根据实际需求,避免过度设计
对于简单业务,异步化已足够满足需求
复杂场景才需要考虑引入消息队列解耦
二、知识小结