当前位置: 首页 > news >正文

17、Rocket MQ快速实战以及核⼼概念详解

 MQ简介

MQMessageQueue消息队列。是在互联⽹中使⽤⾮常⼴泛的—系列服务中间件。 这个词可以分两个部分来看,

—是Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同—台机器上,也可以 分布在不同机器上。

⼆是Queue:队列。队列原意是指—种具有FI FO(先进先出)特性的数据结构,是⽤来缓存  数据的。对于消息中间件产品来说,能不能保证FI FO特性 尚值得考量。但是,所有消息队列都是需要具备存 储消息,让消息排队的能⼒ 

⼴义上来说, 只要能够实现消息跨进程传输以及队列数据缓存,就可以称之为消息队列。例如我们常⽤的QQ 、微信 、阿⾥旺旺等就都具备了这样的功能。只不过他们对接的使⽤对象是⼈ ,⽽我们这⾥讨论的MQ产品 需要对接的使⽤对象是应⽤程序。

1、MQ的作⽤主要有以下三个⽅⾯:

  (1)异步

例⼦: 快递员发快递,直接到客户家效率会很低。引⼊菜⻦驿站后,快递员只需要把快递放到菜⻦驿站, 就可以继续发其他快递去了。客户再按⾃⼰的时间安排去菜⻦驿站取快递。

作⽤ :异步能提⾼系统的响应速度 、吞吐量。

 (2)解耦

例⼦:《Thinking in JAVA》很经典,但是都是英⽂ ,我们看不懂,所以需要编辑社,将⽂章翻译成其他 语⾔, 这样就可以完成英语与其他语⾔的交流。

作⽤ 

1 、服务之间进⾏解耦,才可以减少服务之间的影响。提⾼系统整体的稳定性以及可扩展性。

2 、另外,解耦后可以实现数据分发。⽣产者发送—个消息后,可以由—个或者多个消费者进⾏消费,并 且消费者的增加或者减少对⽣产者没有影响。

 

(3)削峰

例⼦: ⻓江每年都会涨⽔ ,但是下游出⽔⼝的速度是基本稳定的,所以会涨⽔ 。引⼊三峡⼤坝后,可以把 ⽔储存起来,下游慢慢排⽔。

作⽤  以稳定的系统资源应对突发的流量冲击。

 

 Rocket MQ产品特点

 1 、Rocket MQ介绍


        Rocket MQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的—个顶级项目。
        早期阿里使用ActiveMQ ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是, 阿⾥开始关注Kafka 。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时, 由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件, 最早叫做MetaQ ,后来改名成为Rocket MQ 。最早他所希望解决的最大问题就是多Topic下的IO 性能压力 。但是产品在阿里内部的不断改进, Rocket MQ开始体现出一些不一样的优势。


2 、Rocket MQ特点

当今互联⽹MQ产品众多,其中,影响⼒和使⽤范围最⼤的当数Apache Kafka RabbitMQ Apache Rocket MQ以及Apache Plusar。这⼏⼤产品虽然都是典型的MQ产品,但是由于设计和实现上的—些差异,造成他们适合于不同的细分场景。

优点

缺点

适合场景

Apache Kafka

吞吐量⾮常⼤ ,性能⾮常好,集群⾼可 

会有丢数据的可

能,功能⽐较单—

⽇志分析、

⼤数据采集

Rabbit MQ

消息可靠性⾼ ,功能全⾯ 

erlang语⾔不好定 制。吞吐量⽐较    低。

企业内部⼩

规模服务调

Apache Pulsar

基于Bookeeper构建, 消息可靠性⾮常 

周边⽣态还有差

距, ⽬前使⽤的公 司⽐较少。

企业内部⼤

规模服务调

Apache 

Rocket MQ

⾼吞吐 、⾼性能 、⾼可⽤ 。功能全⾯ 

客户端协议丰富。使⽤java语⾔开发,  便定制。

服务加载⽐较慢。

⼏乎全场

景,特别适

合⾦融场景

其中Rocket MQ ,孵化⾃阿⾥巴巴。历经阿⾥多年双十一的严格考验, Rocket MQ可以说是从全世界最严苛的 ⾼并发场景中摸爬滚打出来的过硬产品,也是少数⼏个在⾦融场景⽐较适⽤的MQ产品。从横向对⽐来看,Rocket MQKafkaRabbitMQ相⽐ Rocket MQ的消息吞吐量虽然和Kafka⽐还是稍有差距,但是却⽐RabbitMQ⾼很多。在阿⾥内部, Rocket MQ集群每天处理的请求数超过5万亿次,⽀持的核⼼应⽤超过3000  个。⽽Rocket MQ最⼤的优势就是他天⽣就为⾦融互联⽹⽽ 。他的消息可靠性相⽐Kafka也有了很⼤的提升, ⽽消息吞吐量相⽐RabbitMQ也有很⼤的提升。另外, Rocket MQ的⾼级功能也越来越全⾯ ,⼴播消费 、延迟队 、死信队列等等⾼级功能一应俱全, 甚⾄某些业务功能⽐如事务消息, 已经呈现出领先潮流的趋势。

三、RocketMQ快速实战

1.快速搭建RocketMQ

RocketMQ的官网地址: http://rocketmq.apache.org 。在下载页面可以获取RocketMQ的源码包以及运行包。下载页面地址:https://rocketmq.apache.org/download。

当前最新的版本是5.x,这是一个着眼于云原生的新版本,给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看,企业中用得还比较少。因此,我们这里采用的还是更为稳定的4.9.5版本。  

注:在2020年下半年,RocketMQ新推出了5.0的大版本,这对于RocketMQ来说,是一个里程碑式的大版本。在这个大版本中,RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性,也对已有功能重新做了升级。
​ 比如在具体功能方面,在4.x版本中,对于定时消息,只能设定几个固定的延迟级别,而5.0版本中,已经可以指定具体的发送时间了。在客户端语言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中,增加了对Grpc协议的支持,这基本上就解除了对客户端语言的限制。在服务端架构方面,4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。
​ 但是功能强大,同时也意味着问题会很多。所以目前来看,企业中直接用新版本的还比较少。小部分使用新版本的企业,也大都是使用内部的改造优化版本。

 运⾏只需要下载Binary运⾏版本就可以了。 当然,源码包也建议下载下来,后续会进⾏解读。运⾏包下载下  来后,就可以直接解压,上传到服务器上。我们这⾥会上传到/app/rocketmq⽬录。解压后⼏个重要的⽬录如:

 

默认情况下, Rocket MQ建议的运⾏环境需要⾄少12G的内存, 这是⽣产环境⽐较理想的资源配置。但是,  习阶段,如果你的服务器没有这么⼤的内存空间,那么就需要做—下调整。进⼊bin ⽬录,对其中的runserver.shrunbroker.sh两个脚本进⾏—下修改。

使⽤vi runserver.sh指令,编辑这个脚本,找到下⾯的—⾏配置,调整Java进程的内存⼤⼩。 

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m - XX:MaxMetaspaceSize=320m"

接下来, 同样调整runbroker.sh中的内存⼤⼩。 

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g"

 修改配置时, 注意要根据你的JDK版本调整对应的配置⾏ 。Rocket MQ是—个典型的Java应⽤ ,所以需要 提前安装JDK 。我们这⾥采⽤的是1 .8版本。JDK的安装过程略。
⽣产环境不建议调整。这—系列参数实际上就是Rocket MQ的JVM调优结果。

 Rocket MQ的后端服务分为nameserverbroker两个服务,关于他们的作⽤ ,后⾯会给你分享。接下来我们 先将这两个服务启动起来。

第⼀步:启动nameserver服务。 

 

cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqnames rv &

指令执⾏后,会⽣成—个nohup.out的⽇志⽂件。在这个⽇志⽂件⾥如果看到下⾯这—条关键⽇志,就表示 nameserver服务启动成功了。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

 接下来,可以通过jsp指令进⾏验证。使⽤jps指令后,可以看到有—个NamesrvStartup的进程运⾏ ,也表示 nameserver服务启动完成。

第⼆步:启动broker服务

broker也是—个Java服务, 只需要调整conf⽬录下的broker.conf⽂件, 进⾏—些定制。然后就可以启动了。

具体配置项参⻅官⽅⽂档, 这⾥尽量⾛默认配置。

如果你的服务器配置了多张⽹卡, 建议配置brokerIP1属性。⽐如阿⾥云,腾讯云这样的云服务器,他们 通常有内⽹⽹卡和外⽹⽹卡两张⽹卡,那么需要增加配置brokerIP1属性,指向服务器的外⽹IP 地址,  样才能确保从其他服务器上访问到Rocket MQ 服务。

在启动broker服务前,需要先指定NameServer的服务地址。Rocket MQ可以使⽤—个NAMESRV_ADDR的环 境变量指定NameServer服务地址。 

export NAMESRV_ADDR= I localhost:9876 I

 9876是nameserver的默认服务端⼝ 。

然后也可以⽤之前的⽅式启动broker服务。启动broker服务的指令是mqbroker 

cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqbroker &

 启动完成后, 同样检查nohup.out⽇志⽂件,有如下—条关键⽇志,就表示broker务启动正常了。

The broker [xxxxx] boot success. serializeType=JSON and name server is localhost:9876

 

 1、在实际服务部署时,通常会将Rocket MQ的部署地址添加到环境变量当中。例如使⽤vi ~/.bash_profile指令,添加以下内容。

export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-5.3.0-bin-release PATH=$ROCKETMQ_HOME/bin:$PATH

export PATH

这样就不必每次进⼊Rocket MQ的安装⽬录了。直接可以使⽤mqnamesrv mqbroker指令。

2 、停⽌Rocket MQ服务可以通过mqshutdown指令进⾏

mqshutdown namesrv # 关闭nameserver服务mqshutdown broker # 关闭broker服务

同样使⽤jps指令可以检查服务的启动状态。使jps指令后,可以看到—个名为BrokerStartup的进程,则表示 broker服务启动完成。 

2 、快速实现消息收发

Rocket MQ后端服务启动完成后,就可以启动客户端的消息⽣产者和消息消费者进⾏消息转发了。接下来,我 们会先通过Rocket MQ提供的命令⾏⼯具快速体验—下Rocket MQ消息收发的功能。然后,再动⼿搭建—个Maven项⽬ ,在项⽬中使⽤Rocket MQ进⾏消息收发。

(1)命令⾏快速实现消息收发

1 :通过指令启动Rocket MQ的消息⽣产者发送消息。
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

这个指令会默认往Rocket MQ中发送1000条消息。在命令⾏窗⼝可以看到发送消息的⽇志: 

 

.....
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31103E6,
offsetMsgId=C0A8417000002A9F000000000003AEFE, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31203E7,
offsetMsgId=C0A8417000002A9F000000000003AFF0, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=2], queueOffset=249]

 这部分⽇志中,并没有打印出发送了什么消息。上⾯Send Result开头部分是消息发送到Broker后的结果。最 后两⾏⽇志表示消息⽣产者发完消息后,服务正常关闭了。

2):可以启动消息消费者接收之前发送的消息 
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

 消费者启动完成后,可以看到消费到的消息

......
ConsumeMessageThread_please_rename_unique_group_name_4_18 Receive New Messages:
[MessageExt [brokerName=192-168-65-112, queueId=1, storeSize=242, queueOffset=211,
sysFlag=0, bornTimestamp=1725004967502, bornHost=/192.168.65.112:52748,
storeTimestamp=1725004967502, storeHost=/192.168.65.112:10911,
msgId=C0A8417000002A9F0000000000031F4E, commitLogOffset=204622, bodyCRC=47888112,
reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic= ITopicTest I,
flag=0, properties={CONSUME_START_TIME=1725005058184, MSG_REGION=DefaultRegion,
UNIQ_KEY=C0A841708122246B179D98C9E24E034E, CLUSTER=DefaultCluster, MIN_OFFSET=0,
TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=250}, body= [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 52, 54], transactionId= Inull I}]]

 每—条这样的⽇志信息就表示消费者接收到了—条消息。

这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。我们可以使⽤CTRL+C ⽌该进程。 

注:在Rocket MQ提供的这个简单示例中并没有打印出传递的消息内容,⽽是打印出了消息相关的很多重 要的属性。

其中有⼏个⽐较重要的属性: brokerId,brokerName,queueId,msgId,topic,cluster。这些属性的作⽤会在 后续—起分享, 这⾥你不妨先找—下这些属性是什么,消费者与⽣产者之间有什么样的对应关系。

3 、搭建Java客户端项⽬

之前的步骤实际上是在服务器上快速验证Rocket MQ的服务状态,接下来我们动⼿搭建—个Rocket MQ的客户 端应⽤ ,在实际应⽤中集成使⽤Rocket MQ

第⼀步 :创建—个标准的maven项⽬ ,在pom.xml中引⼊以下核⼼依赖

 

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

第⼆步:**就可以直接创建—个简单的消息⽣产者

 

相关文章:

  • Vscode自定义代码快捷方式
  • MySQL-日志+事务
  • 海拔案例分享-门店业绩管理小程序
  • uniapp+vue3做小程序,获取容器高度
  • 短期项目与长期目标如何同时兼顾
  • 华为云 Flexus+DeepSeek 征文|增值税发票智能提取小工具:基于大模型的自动化信息解析实践
  • 【面板数据】上市公司投资者保护指数(2010-2023年)
  • 【达梦数据库】忘记SYSDBA密码处理方法-已适配
  • 第十六届蓝桥杯C/C++程序设计研究生组国赛 国二
  • JavaScript中的10种排序算法:从入门到精通
  • 【源码+文档+调试讲解】基于web的运动健康小程序的设计与实现y196
  • VMware安装Ubuntu22.04详细教程
  • 基于协议转换的 PROFIBUS DP 与 ETHERNET/IP 在石化生产中的协同运行实践
  • Docker镜像制作
  • 从Java API调用者到架构思考:我的Elasticsearch认知升级之路
  • 【Linux篇章】线程同步与互斥2:打破多线程并发困境,开启高效程序运行新境界
  • libwebsockets编译
  • 【机器学习1】线性回归与逻辑回归
  • SQLite FTS4全文搜索实战指南:从入门到优化
  • Python Django全功能框架开发秘籍
  • 做画册可以参考哪些网站/福州百度seo
  • 罗湖网站建设公司/今日头条号官网
  • 网站ip访问做图表/培训机构网站制作
  • gta5买房子网站正在建设/武汉网络营销推广
  • 网站建设的功能都需要有哪些方面/网站发布流程
  • 做视频网站的上市公司/莆田seo推广公司