当前位置: 首页 > news >正文

RocketMQ介绍与部署

RocketMQ介绍

MQ:MessageQueue,消息队列。
MQ的作⽤主要:

  • 异步能提⾼系统的响应速度、吞吐量。
  • 解耦:服务之间进⾏解耦,才可以减少服务之间的影响。解耦后可以实现数据分发,⽣产者发送⼀个消息后,可以由⼀个或者多个消费者进⾏消费,并且消费者的增加或者减少对⽣产者没有影响。
  • 削峰:以稳定的系统资源应对突发的流量冲击。

RocketMQ部署

Binary 下载

RocketMQ官网

修改默认配置

测试环境配置不高,修改choose_gc_options函数内的jvm相关参数

vim bin/runserver.sh

修改choose_gc_log_directory下一行的jvm相关参数

vim bin/runbroker.sh

启动

启动NameServer

nohup bin/mqnamesrv &

启动Broker

  • 先修改环境变量,让Broker知道NameServer的地址端口
vim ~/.bash_profileexport NAMESRV_ADDR='localhost:9876'source ~/.bash_profile
  • 启动Broker
nohup bin/mqbroker &

消息测试

  • 生产消息
 bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 
  • 消费消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

搭建Java客户端项目

  • 在pom.xml中引⼊以下核⼼依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>对应的服务端的版本</version></dependenc>
  • 创建⼀个简单的消息⽣产者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 指定nameserver地址producer.setNamesrvAddr("192.168.234.141:9876");//设置发送超时时间和客户端api超时时间producer.setSendMsgTimeout(10000);producer.setMqClientApiTimeout(10000);// 启动消息生产者服务producer.start();for (int i = 0; i < 1000; i++) {try {// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后,停止消息生产者服务。producer.shutdown();}
}
  • 创建⼀个消息消费者
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");//指定nameserver地址consumer.setNamesrvAddr("192.168.234.141:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//设置客户端api超时时间10000毫秒consumer.setMqClientApiTimeout(10000);//启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

RocketMQ可视化管理服务

  • 源码下载
    官网Dashboard服务下载
  • 编译
 mvn clean package -Dmaven.test.skip=true
  • 修改配置文件
    在jar包所在的⽬录下创建⼀个application.yml配置⽂件
rocketmq: config: namesrvAddrs: - 192.168.65.112:9876
  • 运行
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1>dashboard.log 2>&1 &

在这里插入图片描述

相关文章:

  • NodeJS全栈WEB3面试题——P6安全与最佳实践
  • SDU棋界精灵——实现硬件程序ESP32的FreeRTOS任务
  • 【LeetCode 热题100】动态规划实战:打家劫舍、完全平方数与零钱兑换(LeetCode 198 / 279 / 322)(Go语言版)
  • 【QT控件】QWidget 常用核心属性介绍 -- 万字详解
  • Laplace 噪声
  • 案例:TASK OA
  • YOLOv5 :训练自己的数据集
  • wow Warlock shushia [Dreadsteed]
  • 简单了解string类的特性及使用(C++)
  • MDP的curriculums部分
  • volatile,synchronized,原子操作实现原理,缓存一致性协议
  • 基于Python学习《Head First设计模式》第四章 工厂方法+抽象工厂
  • “等待-通知”机制优化(一次性申请)循环等待
  • HarmonyOS5 仓颉入门:和 ArkTs 互操作
  • 初识vue3(vue简介,环境配置,setup语法糖)
  • RGB888色彩格式转RGB565格式
  • VMware安装Ubuntu全攻略
  • 记忆解码 | 从神经机制到记忆逻辑的科学探索
  • Google机器学习实践指南(TensorFlow六大优化器)
  • Python----目标检测(Ultralytics安装和YOLO-V8快速上手)
  • 可以做甩货的电商网站/百度小说排行榜完本
  • 没有域名可以先做网站吗/东莞疫情最新消息今天新增
  • 电商网站有哪些功能/百度站长平台有哪些功能
  • 餐厅网站设计模板下载/西安网站seo工作室
  • 网站建设教程搭建芽嘱湖南岚鸿信赖/网络推广自学
  • 有哪些专做旅游定制的网站/网站推广郑州