当前位置: 首页 > news >正文

RocketMQ 详细教程(Spring Boot Spring Cloud Alibaba)

1. RocketMQ 简介

RocketMQ 是阿里巴巴开源的一款分布式消息队列,具有高吞吐量、低延迟、可靠性等特点,广泛应用于金融、电商、物联网等领域。

  • RocketMQ 的核心特性:
    • 高可靠性:支持消息存储、重复消费、失败重试等
    • 高可用性:分布式架构,支持主从复制
    • 高性能:高吞吐量、低延迟

2. 引入依赖

首先,在 Spring Boot 项目 中引入 RocketMQ 和 Spring Cloud Alibaba 的依赖。

2.1 配置依赖

pom.xml 中添加以下依赖:

<dependencies>
    <!-- Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- Spring Cloud Alibaba RocketMQ -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId>
    </dependency>

    <!-- Spring Cloud Alibaba -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba</artifactId>
    </dependency>
</dependencies>

2.2 配置 RocketMQ

application.yml 中配置 RocketMQ 连接信息:

spring:
  cloud:
    alibaba:
      rocketmq:
        name-server: 127.0.0.1:9876  # RocketMQ NameServer 地址
        producer:
          group: my-producer-group   # 生产者组
        consumer:
          group: my-consumer-group   # 消费者组

3. 生产者代码实现

3.1 创建消息生产者

我们可以通过注解 @RocketMQMessageListener 来创建 RocketMQ 生产者,消息可以通过 @Value 传递或直接通过 Bean 注入。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RocketMQProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 发送普通消息
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}

3.2 发送消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/send")
    public String sendMessage() {
        String message = "Hello, RocketMQ!";
        rocketMQProducer.sendMessage("my-topic", message);
        return "Message sent: " + message;
    }
}

访问 http://localhost:8080/send 发送消息,RocketMQ 将开始处理消息。


4. 消费者代码实现

4.1 创建消息消费者

在消费者端,我们需要创建一个消息监听器,利用 @RocketMQMessageListener 注解监听 RocketMQ 消息。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class RocketMQConsumer {

    @org.springframework.messaging.handler.annotation.MessageMapping
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

4.2 消费消息

每次生产者发送的消息,消费者都会通过 listen 方法进行接收。控制台会打印出收到的消息。


5. 顺序消息

RocketMQ 支持顺序消息,可以通过设置 MessageQueueSelector 来保证消息的顺序性。

5.1 发送顺序消息

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 发送顺序消息
    public void sendOrderedMessage(String topic, String message, int orderId) {
        rocketMQTemplate.convertAndSend(topic, message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 根据orderId选择队列
                return mqs.get(orderId % mqs.size());
            }
        }, orderId);
    }
}

5.2 接收顺序消息

顺序消息的消费逻辑和普通消息相似,只不过要保证顺序消息的消费顺序。


6. 事务消息

RocketMQ 提供了事务消息功能,能够保证消息在分布式事务中的可靠性。我们通过 @RocketMQTransactionListener 来实现。

6.1 配置事务消息

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransaction;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@RocketMQTransactionListener
public class TransactionProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendTransactionMessage(String topic, String message) {
        rocketMQTemplate.sendMessageInTransaction(topic, message, null);
    }

    public RocketMQLocalTransaction executeLocalTransaction(String message, Object arg) {
        // 事务操作
        try {
            // 业务操作
            return RocketMQLocalTransaction.SUCCESS;
        } catch (Exception e) {
            return RocketMQLocalTransaction.ROLLBACK;
        }
    }
}

6.2 事务回查

RocketMQ 支持事务回查机制,如果事务消息发送后没有明确的提交或回滚,RocketMQ 会通过回查接口查询事务状态。


7. Spring Cloud Alibaba 集成

7.1 配置 Spring Cloud RocketMQ

application.yml 配置 Spring Cloud Alibaba 与 RocketMQ 集成。

spring:
  cloud:
    alibaba:
      rocketmq:
        name-server: 127.0.0.1:9876
        producer:
          group: my-producer-group
        consumer:
          group: my-consumer-group

7.2 集成 OpenFeign 与 RocketMQ

通过 OpenFeign 实现远程服务调用,可以和 RocketMQ 一起工作。例如,将 RocketMQ 生产者集成到一个微服务中,使用 OpenFeign 调用。

@FeignClient("rocketmq-producer-service")
public interface RocketMQFeignClient {

    @PostMapping("/send")
    void sendMessage(@RequestBody String message);
}

通过 Feign 客户端发送请求并触发 RocketMQ 生产者的消息发送。


8. 总结

功能总结

功能说明
消息生产使用 RocketMQTemplate 发送消息
消息消费使用 @RocketMQMessageListener 监听消息
顺序消息使用 MessageQueueSelector 保证消息顺序
事务消息使用 RocketMQTransactionListener 保证事务一致性
集成 Spring Cloud结合 Spring Cloud Alibaba RocketMQ 进行分布式消息通信

集成 RocketMQ 的好处

  • 提高系统解耦,避免直接调用远程服务。
  • 支持异步、可靠的消息传递。
  • 通过顺序消息保证业务流程的顺序性。
  • 事务消息保证分布式事务的一致性。

这篇教程将帮助你实现 Spring Boot 和 Spring Cloud Alibaba 集成 RocketMQ 的基本功能,之后可以根据业务需求进行扩展。如果这篇教程对你有帮助,记得点赞、收藏哦! 🚀

相关文章:

  • 靶场练习ing
  • 【A2DP】蓝牙A2DP协议剖析:从架构到规范
  • Linux进程基础知识
  • APIC 是什么?深度解析高级可编程中断控制器
  • Redis-配置文件
  • 【论文精读】Deformable DETR:用于端到端目标检测的可变形 Transformer
  • [Ai 力扣题单] 数组基本操作篇 27/704
  • 深陷帕金森困境,怎样重燃生活信心?
  • 基于 Vue 的Deepseek流式加载对话Demo
  • 网络安全基础知识:从零开始了解网络安全
  • 【网络协议】应用层协议HTTPS
  • C语言学习笔记-进阶(10)自定义类型:结构体
  • # linux有哪些桌面环境?有哪些显示服务器协议及显示服务器?有哪些用于开发图形用户界面的工具包?
  • 消息队列MQ使用场景有哪些?
  • 【VS+Qt】VS中编辑Qt中ui界面,代码中未更新
  • 如何在前后端分离项目当中调用星火大模型实现AI问答模块设计?前后端点亮星火:Vue+Node 全链路打通 AI 问答系统
  • 运维AI自动化配置的方案选型
  • Ubuntu 24.04 安装与配置 JetBrains Toolbox 指南
  • 【网络安全工程】任务12:网络安全设备
  • 第十一届蓝桥杯单片机国赛
  • wordpress管理地址/网站的优化
  • 哪个网站可以付费做淘宝推广/成都自然排名优化
  • 中国兼职设计师网/常德网站seo
  • 网站建设图标图片/著名营销策划公司
  • 企业网站哪家做的好/互联网运营自学课程
  • 重庆网站建设价格/seo运营推广