当前位置: 首页 > news >正文

rocketmq 的核心概念讲解

一、核心概念及关系回顾

概念作用核心关联关系
Producer消息生产者,发送消息到 Topic关联 Topic、ProducerGroup
Consumer消息消费者,从 Topic 订阅消息关联 Topic、ConsumerGroup、Tag
Topic消息主题(逻辑分类)包含多个 Queue,关联 Producer/Consumer
Queue物理存储单元,负载均衡最小单位隶属于 Topic,分布在 Broker 上
Broker消息服务器(存储/转发消息)由 NameServer 管理,承载 Topic 的 Queue
NameServer注册中心(维护 Broker 元数据)为 Producer/Consumer 提供路由信息
Tag/Key消息过滤/查询标识隶属于 Message
ProducerGroup生产者组(事务消息回查等场景)管理同类 Producer
ConsumerGroup消费者组(负载均衡/广播)管理同类 Consumer,分摊消费 Queue

二、多框架模型示例

1. RocketMQ Native Client(原生客户端)

原生客户端直接调用 RocketMQ 底层 API,需手动管理生产者/消费者生命周期,适合对底层细节有强控制需求的场景。

(1)生产者(Producer)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class NativeProducer {public static void main(String[] args) throws Exception {// 1. 初始化生产者(指定 ProducerGroup)DefaultMQProducer producer = new DefaultMQProducer("native_producer_group");// 2. 关联 NameServerproducer.setNamesrvAddr("localhost:9876");// 3. 启动生产者producer.start();// 4. 创建消息(关联 Topic、Tag、Key)Message message = new Message("native_topic",       // Topic"tagA",               // Tag"order_123",          // Key"原生客户端消息".getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息producer.send(message);System.out.println("消息发送成功");// 6. 关闭生产者producer.shutdown();}
}
(2)消费者(Consumer,推模式)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class NativeConsumer {public static void main(String[] args) throws Exception {// 1. 初始化消费者(指定 ConsumerGroup)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("native_consumer_group");// 2. 关联 NameServerconsumer.setNamesrvAddr("localhost:9876");// 3. 订阅 Topic(过滤 TagA)consumer.subscribe("native_topic", "tagA");// 4. 注册消息监听器(消费逻辑)consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("消费消息:" +"Topic=" + msg.getTopic() +", Tag=" + msg.getTags() +", Key=" + msg.getKeys() +", 内容=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5. 启动消费者consumer.start();System.out.println("消费者启动成功");}
}
2. RocketMQ Spring Boot Starter

基于 Spring Boot 封装,通过注解简化配置,自动管理生产者/消费者生命周期,适合 Spring Boot 项目快速集成。

(1)依赖配置(pom.xml)
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
(2)配置文件(application.yml)
rocketmq:name-server: localhost:9876  # 关联 NameServerproducer:group: springboot_producer_group  # 默认 ProducerGroup
(3)生产者(使用 RocketMQTemplate)
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class SpringBootProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;  // 封装后的生产者模板@GetMapping("/send")public String sendMessage() {// 发送消息:参数为 "topic:tag",消息体rocketMQTemplate.convertAndSend("springboot_topic:tagB", "Spring Boot 消息");return "消息发送成功";}
}
(4)消费者(使用 @RocketMQMessageListener 注解)
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
// 关联 ConsumerGroup、Topic、Tag
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "springboot_topic",selectorExpression = "tagB"  // 过滤 TagB
)
public class SpringBootConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {  // 消息体自动反序列化System.out.println("消费消息:" + message);}
}
3. Spring Cloud Stream 集成 RocketMQ

Spring Cloud Stream 是更高层次的抽象,通过“绑定器(Binder)”适配不同消息中间件,屏蔽底层差异,适合微服务架构中统一消息编程模型。

(1)依赖配置(pom.xml)
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2.2.3.RELEASE</version>
</dependency>
(2)配置文件(application.yml)
spring:cloud:stream:rocketmq:binder:name-server: localhost:9876  # 关联 NameServerbindings:output:  # 生产者绑定producer:group: stream_producer_group  # ProducerGroupinput:   # 消费者绑定consumer:group: stream_consumer_group  # ConsumerGroupselector-expression: tagC  # 过滤 TagCbindings:output:  # 输出通道(生产者)destination: stream_topic  # 关联 Topicinput:   # 输入通道(消费者)destination: stream_topic  # 关联 Topicgroup: stream_consumer_group  # 显式指定 ConsumerGroup
(3)生产者(使用 StreamBridge)
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StreamProducer {@Autowiredprivate StreamBridge streamBridge;  // Stream 消息发送桥接器@GetMapping("/stream/send")public String send() {// 发送消息:参数为输出通道名、消息体streamBridge.send("output", "Spring Cloud Stream 消息");return "Stream 消息发送成功";}
}
(4)消费者(使用函数式编程)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;@Configuration
public class StreamConsumer {// 定义消费函数(函数名需与输入通道名一致,或通过配置映射)@Beanpublic Consumer<String> input() {return message -> {System.out.println("Stream 消费消息:" + message);};}
}

三、框架对比与核心概念映射

核心概念Native ClientRocketMQ Spring Boot StarterSpring Cloud Stream
ProducerDefaultMQProducerRocketMQTemplateStreamBridge
ConsumerDefaultMQPushConsumer@RocketMQMessageListener 注解类函数式 Consumer(如 input() 方法)
Topic直接指定字符串消息发送时指定(“topic:tag”)通过 destination 配置绑定
Tag消息构造时指定消息发送时通过 “topic:tag” 指定通过 selector-expression 配置
ProducerGroup构造函数指定配置文件 rocketmq.producer.group配置文件 rocketmq.bindings.output.producer.group
ConsumerGroup构造函数指定@RocketMQMessageListener 的 consumerGroup 属性配置文件 bindings.input.group
NameServersetNamesrvAddr 方法指定配置文件 rocketmq.name-server配置文件 rocketmq.binder.name-server

四 必须指定的核心概念

必须指定的核心概念
角色必须指定的概念原因
生产者Topic、消息体、NameServer确定消息目的地、内容和路由
消费者TopicConsumerGroupNameServer确定订阅来源、消费组(负载均衡)和路由
非必须但常用的概念(可选)
  • Tag:用于 Topic 内消息的进一步分类,消费者可通过 Tag 过滤消息(不指定则消费所有 Tag)。
  • Key:消息的业务标识,用于消息查询和追踪(不指定则为 null)。
  • ProducerGroup:生产者组,主要用于事务消息回查(不指定时框架会生成默认组,但建议显式指定)。

这些可选概念用于增强消息的灵活性和可维护性,但不影响消息的基本传输流程。

http://www.dtcms.com/a/575133.html

相关文章:

  • 注册了自己的网站中华始祖堂室内设计
  • 定制化网站建设假网站连接怎么做的
  • 中小企业网站建设免费注册电子邮箱
  • 建设银行企业版网站电脑上免费制作ppt的软件
  • 顺企网属于什么网站江西建设监理协会网站
  • 洛阳网站建设哪家公司好php网站建设文献综述
  • 功能测试与接口测试规范SOP流程
  • 网站项目计划书模板范文有做外贸个人网站
  • 可以做专利聚类分析的免费网站深圳自定义网站开发
  • 公司网站年费网站死链接怎么提交
  • 沈阳市绿云网站建设2023企业所得税300万以上
  • 网站设计机构排行榜福建省建设注册管理中心网站
  • 旅游网站开发研究现状模板建站费用
  • 学生模拟网站开发项目wordpress 移动建站
  • 五金配件网站建设报价广东省做网站推广公司
  • 【保研经验】双非26届计算机——中农ai、央民ai、北科ai、北邮网安、北交cs
  • 顺德大良那里做网站好wordpress您找的页面不存在
  • 丝路建设网站服务器做视频网站吗
  • C++之static_cast关键字
  • Ubuntu 20.04中复现LeRobot-ALOHA的仿真
  • wap网站开发协议浏览器无法打开住房和建设网站
  • 网站标题 关键词 描述之间的关系深圳龙岗建网站
  • 响应式网站企业明空网络做网站好不好
  • 基于轻量化卷积神经的甜瓜白粉病田间快速检测系统
  • 仿站容易还是建站容易个人网站 如何做推广
  • 基于python大数据的房价数据分析系统
  • 微信h5网站开发wordpress站点备份
  • 市场营销的十大理论网站优化排名易下拉用法
  • 淘宝店铺 发布网站建设餐饮行业网站建设
  • 做游戏 做网站wordpress文章长