SpringBoot集成RocketMQ的两种方式
一:原生RocketMQ
配置复杂,需要手动配置product、consumer、监听器、序列化、消息过滤、事物消息;
非常灵活,所有RocketMQ底层能力都能用上。
使用建议: 对 RocketMQ 特性依赖较深(如事务消息、消息延迟等级、过滤等)。 项目中对性能和精细化控制要求高。
1.添加依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>
2.生产者:
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//1.初始化一个消息生产者,指定组名DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 2.指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 3.启动消息生产者服务producer.start();for (int i = 0; i < 2; i++) {try {// 4.创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//5.发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//6.消息发送完后,停止消息生产者服务。producer.shutdown();}
}
3.消费者:
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//1.构建一个消息消费者指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//2.指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 3.订阅一个感兴趣的话题,这个话题需要与消息的topic一致,不实用tag过滤consumer.subscribe("TopicTest", "*");// 4.注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});//返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者服务consumer.start();System.out.print("Consumer Started");}
}
二:spring-boot-starter-rocketmq
使用 @RocketMQMessageListener 和 RocketMQTemplate 快速开发,封装好 producer 和 consumer 配置简单,SpringBoot自动装配,不够灵活
1.添加依赖(使用springboot集成时,版本特别关键,稍微有偏差就会报错)
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
2.配置文件
rocketmq.name-server=192.168.65.112:9876
rocketmq.producer.group=springBootGroup
#如果这⾥不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup
server.port=9000
3.生产者
RocketMQTemplate 不光可以发送消息还可以拉消息
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
}
4.消费者
消费者的声明也很简单。所有属性通过@RocketMQMessageListener注解声明
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode=
ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
SpringBoot框架中对消息的封装与原⽣API的消息封装是不⼀样的
在springBoot封装的rocketmq中,默认的RocketMQTemplate只能处理一开始初始化的生产者组,特别不方便,可以通过@ExtRocketMQTemplateConfiguration()自己在额外配置需要的。
原生的代码量多,但是更加令快,SpinrgBoot封装的则适合场景简单快速开发,具体以自己业务场景为主。
Producer发送的Message对象是没有msgId属性的。Broker端接收到Producer发过来的消息后,会给每条消息单独分配⼀个唯⼀的msgId。这个msgID可以作为消息的唯⼀主键来使⽤。
但是需要注意,对于客户端来说,毕竟是不知道这个msgId是如何产⽣的。实际上,在RocketMQ内部,也会针对批量消息、事务消息等特殊的消息机制,有特殊的msgId分配机制。因此,在复杂业务场景下,不建议使⽤msgId来作为消息的唯⼀索引,⽽建议采⽤下⾯的key属性,⾃⾏指定业务层⾯上的唯⼀索引。
针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能。