rocketmq 的核心概念讲解
一、核心概念及关系回顾
| 概念 | 作用 | 核心关联关系 |
|---|---|---|
| Producer | 消息生产者,发送消息到 Topic | 关联 Topic、ProducerGroup |
| Consumer | 消息消费者,从 Topic 订阅消息 | 关联 Topic、ConsumerGroup、Tag |
| Topic | 消息主题(逻辑分类) | 包含多个 Queue,关联 Producer/Consumer |
| Queue | 物理存储单元,负载均衡最小单位 | 隶属于 Topic,分布在 Broker 上 |
| Broker | 消息服务器(存储/转发消息) | 由 NameServer 管理,承载 Topic 的 Queue |
| NameServer | 注册中心(维护 Broker 元数据) | 为 Producer/Consumer 提供路由信息 |
| Tag/Key | 消息过滤/查询标识 | 隶属于 Message |
| ProducerGroup | 生产者组(事务消息回查等场景) | 管理同类 Producer |
| ConsumerGroup | 消费者组(负载均衡/广播) | 管理同类 Consumer,分摊消费 Queue |
二、多框架模型示例
1. RocketMQ Native Client(原生客户端)
原生客户端直接调用 RocketMQ 底层 API,需手动管理生产者/消费者生命周期,适合对底层细节有强控制需求的场景。
(1)生产者(Producer)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class NativeProducer {public static void main(String[] args) throws Exception {// 1. 初始化生产者(指定 ProducerGroup)DefaultMQProducer producer = new DefaultMQProducer("native_producer_group");// 2. 关联 NameServerproducer.setNamesrvAddr("localhost:9876");// 3. 启动生产者producer.start();// 4. 创建消息(关联 Topic、Tag、Key)Message message = new Message("native_topic", // Topic"tagA", // Tag"order_123", // Key"原生客户端消息".getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息producer.send(message);System.out.println("消息发送成功");// 6. 关闭生产者producer.shutdown();}
}
(2)消费者(Consumer,推模式)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class NativeConsumer {public static void main(String[] args) throws Exception {// 1. 初始化消费者(指定 ConsumerGroup)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("native_consumer_group");// 2. 关联 NameServerconsumer.setNamesrvAddr("localhost:9876");// 3. 订阅 Topic(过滤 TagA)consumer.subscribe("native_topic", "tagA");// 4. 注册消息监听器(消费逻辑)consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("消费消息:" +"Topic=" + msg.getTopic() +", Tag=" + msg.getTags() +", Key=" + msg.getKeys() +", 内容=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者consumer.start();System.out.println("消费者启动成功");}
}
2. RocketMQ Spring Boot Starter
基于 Spring Boot 封装,通过注解简化配置,自动管理生产者/消费者生命周期,适合 Spring Boot 项目快速集成。
(1)依赖配置(pom.xml)
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
(2)配置文件(application.yml)
rocketmq:name-server: localhost:9876 # 关联 NameServerproducer:group: springboot_producer_group # 默认 ProducerGroup
(3)生产者(使用 RocketMQTemplate)
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SpringBootProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate; // 封装后的生产者模板@GetMapping("/send")public String sendMessage() {// 发送消息:参数为 "topic:tag",消息体rocketMQTemplate.convertAndSend("springboot_topic:tagB", "Spring Boot 消息");return "消息发送成功";}
}
(4)消费者(使用 @RocketMQMessageListener 注解)
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
// 关联 ConsumerGroup、Topic、Tag
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "springboot_topic",selectorExpression = "tagB" // 过滤 TagB
)
public class SpringBootConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) { // 消息体自动反序列化System.out.println("消费消息:" + message);}
}
3. Spring Cloud Stream 集成 RocketMQ
Spring Cloud Stream 是更高层次的抽象,通过“绑定器(Binder)”适配不同消息中间件,屏蔽底层差异,适合微服务架构中统一消息编程模型。
(1)依赖配置(pom.xml)
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version>
</dependency>
(2)配置文件(application.yml)
spring:cloud:stream:rocketmq:binder:name-server: localhost:9876 # 关联 NameServerbindings:output: # 生产者绑定producer:group: stream_producer_group # ProducerGroupinput: # 消费者绑定consumer:group: stream_consumer_group # ConsumerGroupselector-expression: tagC # 过滤 TagCbindings:output: # 输出通道(生产者)destination: stream_topic # 关联 Topicinput: # 输入通道(消费者)destination: stream_topic # 关联 Topicgroup: stream_consumer_group # 显式指定 ConsumerGroup
(3)生产者(使用 StreamBridge)
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StreamProducer {@Autowiredprivate StreamBridge streamBridge; // Stream 消息发送桥接器@GetMapping("/stream/send")public String send() {// 发送消息:参数为输出通道名、消息体streamBridge.send("output", "Spring Cloud Stream 消息");return "Stream 消息发送成功";}
}
(4)消费者(使用函数式编程)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;@Configuration
public class StreamConsumer {// 定义消费函数(函数名需与输入通道名一致,或通过配置映射)@Beanpublic Consumer<String> input() {return message -> {System.out.println("Stream 消费消息:" + message);};}
}
三、框架对比与核心概念映射
| 核心概念 | Native Client | RocketMQ Spring Boot Starter | Spring Cloud Stream |
|---|---|---|---|
| Producer | DefaultMQProducer | RocketMQTemplate | StreamBridge |
| Consumer | DefaultMQPushConsumer | @RocketMQMessageListener 注解类 | 函数式 Consumer(如 input() 方法) |
| Topic | 直接指定字符串 | 消息发送时指定(“topic:tag”) | 通过 destination 配置绑定 |
| Tag | 消息构造时指定 | 消息发送时通过 “topic:tag” 指定 | 通过 selector-expression 配置 |
| ProducerGroup | 构造函数指定 | 配置文件 rocketmq.producer.group | 配置文件 rocketmq.bindings.output.producer.group |
| ConsumerGroup | 构造函数指定 | @RocketMQMessageListener 的 consumerGroup 属性 | 配置文件 bindings.input.group |
| NameServer | setNamesrvAddr 方法指定 | 配置文件 rocketmq.name-server | 配置文件 rocketmq.binder.name-server |
四 必须指定的核心概念
必须指定的核心概念
| 角色 | 必须指定的概念 | 原因 |
|---|---|---|
| 生产者 | Topic、消息体、NameServer | 确定消息目的地、内容和路由 |
| 消费者 | Topic、ConsumerGroup、NameServer | 确定订阅来源、消费组(负载均衡)和路由 |
非必须但常用的概念(可选)
Tag:用于 Topic 内消息的进一步分类,消费者可通过 Tag 过滤消息(不指定则消费所有 Tag)。Key:消息的业务标识,用于消息查询和追踪(不指定则为 null)。ProducerGroup:生产者组,主要用于事务消息回查(不指定时框架会生成默认组,但建议显式指定)。
这些可选概念用于增强消息的灵活性和可维护性,但不影响消息的基本传输流程。
