Windows11安装rockerMq5.0+以及springboot集成rockerMq
安装jdk17,rockermq5.0+需要jdk11+,我这里使用jdk17
配置系统环境变量
ROCKETMQ_HOME
D:\work\mmq\rocketmq-all-5.2.0-bin-release
编写启动脚本
D:
cd D:\work\mmq\rocketmq-all-5.2.0-bin-release\bin
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 -c ..\conf\broker.conf
启动 会弹出2个黑窗口 说明启动成功
Windows版RockerMq5.0+服务端的包-包含dashboard的图形化包
启动 dashboard 图形化界面
访问地址: http://localhost:8080/#/
创建一个springboot的项目 集成rockermq
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xulk</groupId><artifactId>spring-boot</artifactId><version>0.0.1-SNAPSHOT</version><name>springAI</name><description>Demo project for Spring Boot</description><packaging>jar</packaging><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.2</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- <dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>3.0.0</version></dependency>--><!-- RocketMQ --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.2</version> <!-- 使用与您RocketMQ服务端匹配的版本 --></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.7</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build><repositories><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>
创建配置文件
logging:config: classpath:logback.xml
server:port: 8081# RocketMQ Config
rocketmq:name-server: 127.0.0.1:9876producer:group: my-consumer_group # 生产者组名
消息生产者 普通消息和延迟消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;/*** 消息生产者*/@Service
public class MqProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息* @param topic 消息主题* @param message 消息内容*/public void sendMessage(String topic, String message) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());rocketMQTemplate.convertAndSend(topic, currentTime + message);}//定时/延时消息发送public void sendDeliveryTimestampMessage( ) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());System.out.println( "发送时间: " + currentTime );currentTime = currentTime + " ==== AAAAAAAAAAAAAAAAAAAAAAAAAA";Message message = new Message("my-topic", "", "", currentTime.getBytes());//time这里是消费者的接收时间,单位是毫秒。Long time = System.currentTimeMillis() + 15 * 60 * 1000;message.setDeliverTimeMs(time);DefaultMQProducer producer = rocketMQTemplate.getProducer();try {producer.send(message);}catch (Exception e){System.out.println( "发送异常----{}" + e.getLocalizedMessage() );}}}
消息消费者
package com.xxl.job.executor.mq;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;/*** 消息消费者*/@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer_group")
public class MqConsumer implements RocketMQListener<String> {/*** 消费对应 topic 主题的消息* @param message 生产者发送过来的消息*/@Overridepublic void onMessage(String message) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());System.out.println( "消费端 ======== 接收时间: " + currentTime );System.out.println("消费端 消费消息" + " ========== " + message);}}
创建一个 Controller
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RocketMqController {@ResourceMqProducer mqProducer;/*** 发送消息*/@GetMapping("send")public void send( ) {
// mqProducer.sendMessage("my-topic", "666666666666668888888");mqProducer.sendDeliveryTimestampMessage();}}
浏览器请求
http://localhost:8081/send