RocketMQ介绍与部署
RocketMQ介绍
MQ:MessageQueue,消息队列。
MQ的作⽤主要:
- 异步能提⾼系统的响应速度、吞吐量。
- 解耦:服务之间进⾏解耦,才可以减少服务之间的影响。解耦后可以实现数据分发,⽣产者发送⼀个消息后,可以由⼀个或者多个消费者进⾏消费,并且消费者的增加或者减少对⽣产者没有影响。
- 削峰:以稳定的系统资源应对突发的流量冲击。
RocketMQ部署
Binary 下载
RocketMQ官网
修改默认配置
测试环境配置不高,修改choose_gc_options函数内的jvm相关参数
vim bin/runserver.sh
修改choose_gc_log_directory下一行的jvm相关参数
vim bin/runbroker.sh
启动
启动NameServer
nohup bin/mqnamesrv &
启动Broker
- 先修改环境变量,让Broker知道NameServer的地址端口
vim ~/.bash_profileexport NAMESRV_ADDR='localhost:9876'source ~/.bash_profile
- 启动Broker
nohup bin/mqbroker &
消息测试
- 生产消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
- 消费消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
搭建Java客户端项目
- 在pom.xml中引⼊以下核⼼依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>对应的服务端的版本</version></dependenc>
- 创建⼀个简单的消息⽣产者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 指定nameserver地址producer.setNamesrvAddr("192.168.234.141:9876");//设置发送超时时间和客户端api超时时间producer.setSendMsgTimeout(10000);producer.setMqClientApiTimeout(10000);// 启动消息生产者服务producer.start();for (int i = 0; i < 1000; i++) {try {// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后,停止消息生产者服务。producer.shutdown();}
}
- 创建⼀个消息消费者
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");//指定nameserver地址consumer.setNamesrvAddr("192.168.234.141:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//设置客户端api超时时间10000毫秒consumer.setMqClientApiTimeout(10000);//启动消费者服务consumer.start();System.out.print("Consumer Started");}
}
RocketMQ可视化管理服务
- 源码下载
官网Dashboard服务下载 - 编译
mvn clean package -Dmaven.test.skip=true
- 修改配置文件
在jar包所在的⽬录下创建⼀个application.yml配置⽂件
rocketmq: config: namesrvAddrs: - 192.168.65.112:9876
- 运行
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1>dashboard.log 2>&1 &