当前位置: 首页 > news >正文

学习和掌握RabbitMQ及其与springboot的整合实践(篇二)

目录

  • 前言
  • 流程通览
  • 项目配置与连接
  • 声明队列、交换机和绑定
    • 方式一:基于配置类的声明
    • 基于@RabbitListener注解的声明
  • 生产者端
    • yaml相关配置
    • 回调函数
    • 发送消息
  • 消费者端
    • yaml相关配置
    • 预取计数、并发线程
    • 本地重试
    • 消费者确认
    • 接收并处理异步消息
  • 序列化配置
    • 为什么要改变序列化配置
    • 如何配置JSON序列化

前言

这篇博客主要讲述rabbitMQ的理论基础和springboot整合指南,由于篇幅过长,我将这篇博客分为两个文章讲述,学习和掌握RabbitMQ及其与springboot的整合实践(篇一)主要讲述rabbitMQ的相关概念、理论基础,本篇主要讲述springboot与rabbitMQ的整合实践。如果还未安装部署rabbitMQ,可以参考我的这篇博客RabbitMQ 的部署安装

流程通览

springoot整合rabbitMQ的流程非常简单直接,主要包括依赖引入、项目配置、声明创建交换机/队列、生产者/消费者配置与使用等等。

在这里插入图片描述

项目配置与连接

  1. 依赖引入,在boot项目的pom.xml中添加amqp的起步依赖

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 在 application.yml配置文件中配置RabbitMQ服务器的连接信息,别忘了指定正确的虚拟主机

    spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# 可选配置虚拟主机virtual-host: /
    

声明队列、交换机和绑定

SpringBoot 整合 RabbitMQ 时,可以声明队列、交换机和绑定,这样在程序启动时,会自动在rabbitMQ服务器端创建声明的交换机和队列。springboot中声明队列、交换机和绑定主要有三种方式。

方式一:基于配置类的声明

这是最经典和常见的方式,通过 @Configuration配置类和 @Bean方法来定义组件,结构清晰,集中管理,支持所有交换机类型和丰富的队列参数。

@Configuration
public class RabbitMQConfig {// 声明队列@Beanpublic Queue demoQueue() {//方式1 new 对象创建队列return new Queue("demo.queue", true); // true 表示队列持久化//方式2 工厂方法创建队列return QueueBuilder.durable("demo.queue").build(); //也是一个持久化队列//声明一个普通队列,关联到死信交换机Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dlx.exchange"); //死信交换机名称args.put("x-dead-letter-routing-key", "dead"); //routing key 可不配置return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();}// 声明DirectExchange、FanoutExchange、TopicExchange交换机,以DirectExchange举例@Beanpublic DirectExchange directExchange() {//方式1:new 对象创建交换机return new DirectExchange("direct.exchange",true,false); //持久化,非自动删除//方式2:工厂方法创建交换机return ExchangeBuilder.directExchange("direct.exchange")//.delayed() 延时交换机.durable(true)//.autoDelete().build();}// 绑定队列到交换机,并指定binging键@Beanpublic Binding binding(Queue demoQueue, DirectExchange directExchange) {return BindingBuilder.bind(demoQueue).to(directExchange).with("demo.routing.key");}//如果有多组交换机和队列,可以通过传参方法返回值 的方式 进行绑定@Beanpublic Binding binding() {return BindingBuilder.bind(demoQueue()).to(delayedExchange()).with("demo.routing.key");}
}

基于@RabbitListener注解的声明

这种方式直接将声明信息消息消费者结合在一起,使用 @RabbitListener注解在消费者类上一次性完成队列、交换机的创建和绑定,非常简洁,适合简单的direct类型或topic主题订阅场景

@Component
public class MessageConsumer {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "email.topic.queue", durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = "dlx.topic.exchange"), // 指定死信交换机@Argument(name = "x-dead-letter-routing-key", value = "dlx.email") // 死信路由键(可选)}),exchange = @Exchange(value = "topic.order.exchange", type = ExchangeTypes.TOPIC), // 声明交换机key = "*.email" // 路由键))public void receiveMessage(String message) {// 处理消息System.out.println("Received message: " + message);}
}

生产者端

yaml相关配置

生产者端的配置包括 本地连接重试消息可靠性投递(包括消息确认confirm、消息退回returns)配置

spring:rabbitmq:# 项目配置与连接# ---省略# 消息可靠性投递配置publisher-confirm-type: correlated  # 开启确认回调publisher-returns: true             # 开启退回回调template:mandatory: true # 必须设置为 true,ReturnCallback 才会生效# tcp连接重试配置retry:enabled: true                   # 开启重试max-attempts: 3                 # 最大重试3次initial-interval: 1000ms        # 首次重试等待1秒multiplier: 2                   # 下次间隔乘数max-interval: 10000ms           # 最大重试间隔10秒

回调函数

  • 生产者端confirm回调机制是确认消息成功发送到rabbitMQ服务器端的交换机,消息成功转发到交换机,Broker 返回的确认ACK回执,投递失败则会返回NACK回执,无论返回的是ACK还是NACK,只要生产者端开启的是correlated 类型生产者确认配置,都会回调confirm回调函数

  • 消息退回returns是只有当消息从交换机路由到任何一个队列失败时​​,才会回调生产者设置的回调函数

  • 下面的程序是配置两个回调函数的标准方式

  • 创建回调实现类
    创建一个类,同时实现 RabbitTemplate.ConfirmCallback和 RabbitTemplate.ReturnsCallback接口(在较新版本的 Spring Boot 中,ReturnsCallback是推荐的方式,旧版可能使用 ReturnCallback),注册到容器中。

    
    @Component
    public class RabbitMQConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {/*** ConfirmCallback 接口方法实现* 用于确认消息是否成功发送到交换机 (Exchange)** @param correlationData 发送消息时设置的关联数据* @param ack true 表示消息成功到达交换机,false 表示失败* @param cause 失败的原因,当 ack 为 true 时,此参数为 null*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String messageId = (correlationData != null) ? correlationData.getId() : "null";if (ack) {log.info("消息成功发送到交换机,消息ID: {}", messageId);} else {log.error("消息未能到达交换机,消息ID: {}, 原因: {}", messageId, cause);// 此处可根据业务需要进行重发或其他错误处理}}/*** ReturnsCallback 接口方法实现 (新版Spring Boot推荐)* 用于处理消息成功到达交换机,但无法路由到队列的情况** @param returned 包含返回消息、回复码、回复文本、交换机名和路由键等信息的对象*/@Overridepublic void returnedMessage(org.springframework.amqp.rabbit.support.ReturnedMessage returned) {logger.error("消息从交换机路由到队列失败,消息将被退回。");logger.error("交换机: {}", returned.getExchange());logger.error("路由键: {}", returned.getRoutingKey());logger.error("回复码: {}, 原因: {}", returned.getReplyCode(), returned.getReplyText());// 可以对返回的消息进行进一步处理,如记录日志、通知人工干预等}
    }
    
  • 配置 RabbitTemplate
    创建一个配置类,将上面实现的回调类设置到 RabbitTemplate中

    @Configuration
    public class RabbitTemplateConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitMQConfirmCallback rabbitMQConfirmCallback;/*** 使用 @PostConstruct 在 Bean 初始化后设置回调*/@PostConstructpublic void initTemplate() {// 设置 ConfirmCallbackrabbitTemplate.setConfirmCallback(rabbitMQConfirmCallback);// 设置 ReturnsCallbackrabbitTemplate.setReturnsCallback(rabbitMQConfirmCallback);// 确保 mandatory 设置为 true(通常在 yml 中设置)// rabbitTemplate.setMandatory(true);}
    }
    
  • 发送消息时使用 CorrelationData
    发送消息时,务必创建并传递 CorrelationData对象,以便在回调中能够追踪到具体的消息

    import java.util.UUID;@Service
    public class MessageSenderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String messageContent) {// 创建关联数据,并设置一个唯一ID(如业务ID、订单号等)CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend(exchange, routingKey, messageContent, correlationData);// 可以记录日志,表示消息已尝试发送System.out.println("消息发送调用完成,消息ID: " + correlationData.getId());}
    }
    

发送消息

在引入依赖后,amqp起步依赖会自动向容器中注入 RabbitTemplate工具类,其发送消息到RabbitMQ非常便捷,它封装了与RabbitMQ交互的常用操作。下面表格总结了几个最常用的RabbitMQ发送消息的重载方法。

方法名适用场景示例
convertAndSend(String routingKey, Object message)发送到默认交换机(通常为空字符串),使用指定的路由键rabbitTemplate.convertAndSend(“my.queue”, “Hello”);
convertAndSend(String exchange, String routingKey, Object message)最常用​​。指定目标交换机和路由键。rabbitTemplate.convertAndSend(“order.exchange”, “order.create”, order);
convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor postProcessor)发送前对消息进行额外处理,如设置消息头、过期时间等。详见下方代码示例
convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)用于​​消息可靠性投递​​,关联确认数据。详见可靠性配置部分
  • 使用 MessagePostProcessor的示例​​,在convertAndSend内部会将异步消息封装成Message类型对象,然后会进行postProcessMessage方法的后置处理,为消息添加过期时间或者其他属性。

    // 发送消息,并通过MessagePostProcessor设置消息属性
    rabbitTemplate.convertAndSend("my.exchange", "my.routing.key", "Hello Message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息的过期时间(毫秒)message.getMessageProperties().setExpiration("5000");// 设置自定义消息头message.getMessageProperties().setHeader("trace-id", "12345");return message;}
    });
    

消费者端

yaml相关配置

消费者端的配置包括 预取计数本地重试消费者确认配置

spring:rabbitmq:# 项目配置与连接 省略# 消费者监听器配置listener:simple:acknowledge-mode: manual # 使用手动ACKconcurrency: 5 # 初始5个消费者max-concurrency: 10 # 最大可扩展到10个消费者prefetch: 1 # 每个消费者每次预取1条消息retry:enabled: true # 开启重试max-attempts: 3 # 最多重试3次initial-interval: 2000ms # 首次重试等待2秒

预取计数、并发线程

  • 上述配置设置concurrency=5,意思是项目启动后,会为每个被@RabbitListener注解的方法创建5个消费者线程(Channel)来同时监听和处理消息,然后prefetch设置为1,每个线程每次只能预处理一次异步信息。举个例子,如果有个消费者线程正在处理一个异步消息,还没有返回消费者确认回执,此时这个消费者线程就是出于繁忙状态,监听的rabbitMQ队列不会向当前消费者分发消息。

本地重试

  • 当前配置的本地重试策略,即max-attempts=3,当消费者监听方法执行失败时,会本地重新运行当前方法3次

消费者确认

  • 当前项目配置的消费者确认策略为手动,即acknowledge-mode: manual,表示监听方法处理完成消息之后,无论成功与否,需要手动返回确认回执ack/nack/reject

    /**** channel.basicAck(deliveryTag, multiple),multiple:是否批量确认(true 表示确认所有小于当前 deliveryTag 的消息)* channel.basicNack(deliveryTag, multiple, requeue):拒绝消息,requeue:true 表示将消息重新放回队列,false 表示直接丢弃成为死信* channel.basicReject(deliveryTag, requeue):拒绝单条消息*/
    @RabbitListener(queues = "test_queue")
    public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {// 处理业务逻辑System.out.println("处理消息:" + message);// 手动确认消息(单条确认)channel.basicAck(deliveryTag, false);} catch (Exception e) {// 处理失败,将消息重新放回队列(或拒绝)channel.basicNack(deliveryTag, false, true); // requeue=true 重新入队}
    }
    
  • channel 是 RabbitMQ 客户端与服务器通信的核心接口,就是消费者线程的抽象表示,在手动确认模式中,它提供了消息确认、拒绝等关键操作的方法,是手动控制消息生命周期的核心工具。

  • @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,deliveryTag 是 RabbitMQ 为每个消息生成的唯一标识(长整数),用于区分同一信道(Channel)中传递的不同消息,消息的 deliveryTag 从 1 开始递增,不同信道的 deliveryTag 相互独立。

  • deliveryTag 通过 @Header(AmqpHeaders.DELIVERY_TAG) 注解从消息头中提取,AmqpHeaders.DELIVERY_TAG 是 Spring AMQP 定义的常量,对应 RabbitMQ 消息头中的标识。

  • 例如,channel.basicAck(deliveryTag, false) 表示 “确认 deliveryTag 对应的这条消息”。

接收并处理异步消息

在 Spring Boot 整合 RabbitMQ 时,@RabbitListener 注解是消费者接收并处理消息的核心注解,其方法定义方式灵活多样,可根据消息格式、业务需求选择不同形式。以下是常见的方法定义方式汇总

  • 基础形式:直接接收消息体(最常用)
    适用于消息体为简单类型(如字符串、JSON 字符串等),直接获取消息内容进行处理,若消息是 JSON 格式,结合 @Payload 注解(可选,默认自动解析),Spring 会自动将 JSON 转为指定对象

    // 监听指定队列,直接接收消息体(字符串为例)
    @RabbitListener(queues = "test.queue")
    public void handleSimpleMessage(String message) {System.out.println("接收消息:" + message);
    }
    // 自动将 JSON 消息转为 User 对象
    @RabbitListener(queues = "user.queue")
    public void handleUserMessage(@Payload User user) {System.out.println("接收用户消息:" + user);
    }
    
  • 接收消息体 + 消息头(元数据)
    需获取消息的附加信息(如消息 ID、时间戳、自定义头信息等)时,使用 @Header 注解提取指定头信息,或 @Headers 提取全部头信息。

    @RabbitListener(queues = "meta.queue")
    public void handleMessageWithMeta(String message,  // 消息体@Header(AmqpHeaders.MESSAGE_ID) String messageId,  // 消息ID@Header("custom-header") String customHeader,  // 自定义头信息@Headers MessageHeaders headers  // 全部头信息(Map结构)
    ) {System.out.println("消息体:" + message);System.out.println("消息ID:" + messageId);System.out.println("自定义头:" + customHeader);System.out.println("全部头信息:" + headers);
    }
    

    常用内置头信息(AmqpHeaders 常量)

    消息头解释
    MESSAGE_ID消息唯一标识
    DELIVERY_TAG手动确认时的消息标识(见手动确认模式)
    RECEIVED_ROUTING_KEY接收消息的路由键
    TIMESTAMP消息发送时间戳
  • 接收完整 Message 对象
    获取封装了消息体、头信息、属性的完整 Message 对象(Spring AMQP 的 Message 类),适合需要全面操作消息元数据的场景。

    import org.springframework.amqp.core.Message;@RabbitListener(queues = "full.message.queue")
    public void handleFullMessage(Message message) {// 获取消息体(字节数组,需手动转换)String body = new String(message.getBody());// 获取消息头信息MessageProperties properties = message.getMessageProperties();String messageId = properties.getMessageId();System.out.println("消息体:" + body);System.out.println("消息ID:" + messageId);
    }
    
  • 手动确认模式:结合 Channel 和 deliveryTag,见上一小节消费者确认。

    	@RabbitListener(queues = "test_queue")public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {// 处理业务逻辑System.out.println("处理消息:" + message);// 手动确认消息(单条确认)channel.basicAck(deliveryTag, false);} catch (Exception e) {// 处理失败,将消息重新放回队列(或拒绝)channel.basicNack(deliveryTag, false, true); // requeue=true 重新入队}}
    

序列化配置

为什么要改变序列化配置

  • 在 Spring Boot 整合 RabbitMQ 时,消息的序列化 / 反序列化是核心环节。默认情况下,RabbitMQ 使用 JDK 自带的序列化方式,但实际开发中更推荐使用 JSON 序列化。

  • 什么是序列化,RabbitMQ 底层依赖 AMQP 协议,消息在网络传输中需转换为字节流(序列化),消费者接收后再转回对象(反序列化)。Spring AMQP 默认使用 JDK 序列化,但它存在明显缺陷:

    缺陷解释
    兼容性差JDK 序列化要求消息实体类必须实现 Serializable 接口,且序列化后的数据与类结构强绑定(如类名、包路径、字段增减都会导致反序列化失败),跨语言(如 Java 发送、Python 接收)时完全无法解析
    数据体积大JDK 序列化会携带大量类元数据(如类描述、继承关系),导致序列化后的字节流体积较大,增加网络传输成本和 RabbitMQ 存储压力
    可读性差序列化后的字节流是二进制数据,无法直接查看内容,调试和问题排查困难
  • JSON 序列化 能解决以上问题,体积小,JSON是通用数据格式,跨语言兼容性好,可读性强,可参与排查问题等等

如何配置JSON序列化

使用Spring Boot官方推荐的Jackson库来配置JSON序列化是最常见和简便的方式。配置分为​​生产者端​​和​​消费者端​​

  1. 添加依赖,确保我们的pom.xml中包含AMQP和Jackson的依赖,通常Jackson依赖已被其他起步依赖间接引入。

    	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Jackson(JSON 处理,若 starter 已包含可省略) --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
    
  2. 生产者端配置:发送JSON消息
    在生产者端,我们只需要将Jackson提供的JSON消息转换器Jackson2JsonMessageConverter配置到容器中的RabbitTemplate Bean即可,在使用rabbitTemplate 发送异步消息时会自动进行json消息的转换。

    @Configuration
    public class RabbitMQProducerConfig {@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 为生产者显式配置RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter jsonMessageConverter) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 核心步骤2:主动将转换器设置到RabbitTemplate中template.setMessageConverter(jsonMessageConverter);// 还可以在此设置ConfirmCallback、ReturnCallback等template.setMandatory(true);return template;
    }
    }
    

    这个ConnectionFactory是一个负责创建和管理到RabbitMQ服务器tcp连接的工厂类​​,而 RabbitTemplate需要它是因为​​所有与RabbitMQ的通信都必须基于一个有效的连接​​,可以这样理解,RabbitTemplate发送消息时,需要从配置的ConnectionFactory tcp连接池中获取有效信道 ,才能将消息成功发送到rabbitMQ服务器

  3. 消费者端配置:接收JSON消息
    消费者端消息监听器SimpleMessageConverter 默认采用JDK 序列化,无法解析 JSON 格式,必须替换为 Jackson2JsonMessageConverter 才能实现 JSON 到对象的自动转换,替换方式跟生产者端别无二致。

    @Configuration
    public class RabbitConsumerConfig {// 配置消费者容器工厂,指定 JSON 消息转换器@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置 JSON 消息转换器,替代默认的 JDK 序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}
    }
    

    配置完成后,消费者方法可直接声明接收目标实体类(如 User),转换器会自动将消息体的 JSON 字符串反序列化为该对象。

    // 直接接收 User 对象,转换器自动完成 JSON 反序列化@RabbitListener(queues = "user.queue")public void handleUserMessage(User user) {System.out.println("接收用户:" + user.getName());}
    
http://www.dtcms.com/a/544978.html

相关文章:

  • Flink、Storm、Spark 区别
  • 当 AI Agent 遇上工作流编排:微软 Agent Framework 的 Workflow 深度解析
  • 5步构建多模式内容策略:统一品牌信息,最大化内容影响力
  • STP 转换为 3DXML 的技术指南及迪威模型网在线转换推荐
  • 如何建设视频网站好的网站设计题目
  • 深入理解 Vite 开发服务器的 Local 与 Network 地址
  • 免费建立网站的网站吗免费软件视频
  • 和利时 PLC 配网
  • 时间序列数据预测:14种机器学习与深度学习模型
  • 手机网站编程语言finecms
  • 第六部分:VTK进阶(第178章 网格质量评估vtkMeshQuality)
  • 多模态+CLIP | 视觉语言交互的终极形态?CLIP融合AIGC与持续学习,重塑多模态AI边界
  • Linux下CMake工具使用与Makefile生成完全指南
  • 关系型数据库、非关系型数据库、结构化数据、半结构化数据、非结构化数据、OLAP、OLTP的关系和区分
  • 成都市成华区建设局网站拐角型布局网站
  • java 文本内容 相似度比对
  • 切换jdk17
  • 定制型网站 成功案例网站建设费 税前扣除吗
  • 【SpringMVC】SpringMVC 请求与响应全解析:从 Cookie/Session 到状态码、Header 配置
  • 兰州网站建设ulezhi郑州网站建设培训短期班
  • 8.1.2 大数据方法论与实践指南-埋点实现方式分类
  • 7.1.5 大数据方法论与实践指南-日志系统+监控报警
  • Node.js Stream:深入理解与高效使用
  • 7.1.1 大数据方法论与实践指南-数仓元数据平台(数据地图)
  • 网站建设会计处理重庆网络公司产品设计
  • LeetCode 2001.可互换矩形的组数
  • 哈尔滨做网站哪家好电脑外设网站建设论文
  • 【Linux】数据链路层
  • 基于CentOS安装LNMP
  • Vue八股问题