当前位置: 首页 > news >正文

spring中的@RabbitListener注解详解

    • 基本用法
    • 主要属性
      • 1. queues / queueNames
      • 2. containerFactory
      • 3. id
      • 4. concurrency
      • 5. ackMode
      • 6. priority
      • 7. bindings
    • 高级特性
      • 1. 消息转换器
      • 2. 手动确认
      • 3. 条件监听
      • 4. 错误处理
    • 配置监听容器工厂
    • 注意事项
    • 完整示例
    • 循环依赖解决
      • 1. 使用 Setter 注入
      • 2. 使用 `@Lazy` 注解
      • 3. 重构代码结构
      • 4. 使用事件驱动架构
      • 5. 使用 `@PostConstruct` 方法
    • @RabbitListener注解与@Profile注解组合使用
      • 基本用法
      • 配置文件激活
        • 1. 在 `application.properties` 或 `application.yml` 中设置
        • 2. 在启动应用程序时通过命令行参数设置
        • 3. 在 IDE 中设置
      • 高级用法
      • 注意事项
    • @RabbitListener的autoStartup配置说明
      • 作用
      • 使用场景
      • 示例代码
      • 手动启动监听器


@RabbitListener 是 Spring AMQP 框架中的一个核心注解,用于简化 RabbitMQ 消息监听器的开发。下面详细介绍这个注解的用法和特性。

基本用法

@RabbitListener 可以标注在方法上,表示该方法是一个 RabbitMQ 消息监听器。

@Component
public class MyRabbitListener {@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {System.out.println("Received <" + message + ">");}
}

在这里插入图片描述

主要属性

1. queues / queueNames

指定要监听的队列名称:

@RabbitListener(queues = "myQueue")
// 或
@RabbitListener(queueNames = "myQueue")

可以监听多个队列:

@RabbitListener(queues = {"queue1", "queue2"})

2. containerFactory

指定使用的消息监听容器工厂:

@RabbitListener(queues = "myQueue", containerFactory = "myFactory")

3. id

为监听器指定一个唯一ID:

@RabbitListener(id = "myListener", queues = "myQueue")

4. concurrency

设置并发消费者数量:

@RabbitListener(queues = "myQueue", concurrency = "3-5")

5. ackMode

设置确认模式:

@RabbitListener(queues = "myQueue", ackMode = "MANUAL")

6. priority

设置监听器优先级:

@RabbitListener(queues = "myQueue", priority = "10")

7. bindings

使用绑定器配置(更复杂的路由配置):

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myQueue", durable = "true"),exchange = @Exchange(value = "myExchange", type = ExchangeTypes.TOPIC),key = "myRoutingKey"
))

高级特性

1. 消息转换器

可以指定消息转换器:

@RabbitListener(queues = "myQueue")
public void receiveMessage(@Payload MyObject obj, @Header(AmqpHeaders.CONTENT_TYPE) String contentType) {// ...
}

2. 手动确认

@RabbitListener(queues = "myQueue")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {// 处理消息channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true);}
}

3. 条件监听

使用 SpEL 表达式控制是否处理消息:

@RabbitListener(queues = "myQueue", condition = "headers['type']=='important'")
public void receiveImportantMessage(String message) {// 只处理带有特定header的消息
}

4. 错误处理

可以结合 @RabbitHandler@ServiceActivator 实现错误处理:

@RabbitListener(queues = "myQueue")
public void receiveMessage(String message) {// ...
}@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {// 处理错误
}

配置监听容器工厂

通常需要配置一个 SimpleRabbitListenerContainerFactory

@Configuration
public class RabbitConfig {@Beanpublic SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(10);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
}

注意事项

  1. 监听器方法应该是 public 的
  2. 方法可以有多个参数,Spring 会尝试匹配消息头和内容
  3. 默认情况下,消息会自动确认,除非抛出异常
  4. 对于 POJO 消息,需要配置适当的消息转换器

完整示例

@Configuration
public class RabbitConfig {@Beanpublic Queue myQueue() {return new Queue("myQueue", true);}@Beanpublic TopicExchange myExchange() {return new TopicExchange("myExchange");}@Beanpublic Binding binding() {return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");}@Beanpublic SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(3);return factory;}
}@Component
public class MyRabbitListener {@RabbitListener(queues = "myQueue",containerFactory = "myFactory",ackMode = "MANUAL")public void receiveMessage(@Payload MyObject obj,@Header(AmqpHeaders.DELIVERY_TAG) long tag,Channel channel) throws IOException {try {// 处理消息System.out.println("Received: " + obj);channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true);}}
}

@RabbitListener 注解极大地简化了 RabbitMQ 消费者的开发,通过合理的配置可以满足各种消息处理场景的需求。

循环依赖解决

在使用 Spring 的 @RabbitListener 注解时,循环依赖问题通常发生在监听器方法中直接或间接地引用了其他需要注入的 Bean,而这些 Bean 又依赖于监听器本身或其相关组件。以下是解决这类循环依赖问题的几种方法:

1. 使用 Setter 注入

Setter 注入可以帮助解决循环依赖问题,因为 Spring 可以在创建 Bean 后设置依赖关系。

@Component
public class MyService {private MyListener myListener;@Autowiredpublic void setMyListener(MyListener myListener) {this.myListener = myListener;}
}@Component
public class MyListener {private final MyService myService;@Autowiredpublic MyListener(MyService myService) {this.myService = myService;}@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {myService.processMessage(message);}
}

2. 使用 @Lazy 注解

@Lazy 注解可以延迟 Bean 的初始化,从而打破循环依赖。

@Component
public class MyListener {private final MyService myService;@Autowiredpublic MyListener(@Lazy MyService myService) {this.myService = myService;}@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {myService.processMessage(message);}
}

3. 重构代码结构

重构代码以消除循环依赖。这通常涉及将共享逻辑提取到一个单独的 Bean 中,或者重新设计组件之间的交互。

@Component
public class MessageProcessor {public void processMessage(String message) {// 处理消息的逻辑}
}@Component
public class MyListener {private final MessageProcessor messageProcessor;@Autowiredpublic MyListener(MessageProcessor messageProcessor) {this.messageProcessor = messageProcessor;}@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {messageProcessor.processMessage(message);}
}@Component
public class MyService {private final MessageProcessor messageProcessor;@Autowiredpublic MyService(MessageProcessor messageProcessor) {this.messageProcessor = messageProcessor;}public void someMethod() {// 使用 messageProcessor}
}

4. 使用事件驱动架构

通过引入事件驱动架构,可以将监听器与业务逻辑解耦。监听器只负责发布事件,具体的业务逻辑由事件监听器处理。

@Component
public class MyListener {private final ApplicationEventPublisher eventPublisher;@Autowiredpublic MyListener(ApplicationEventPublisher eventPublisher) {this.eventPublisher = eventPublisher;}@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {eventPublisher.publishEvent(new MessageReceivedEvent(message));}
}@Component
public class MessageEventListener {@EventListenerpublic void handleMessageEvent(MessageReceivedEvent event) {// 处理消息事件}
}

5. 使用 @PostConstruct 方法

在某些情况下,可以使用 @PostConstruct 方法来初始化依赖关系。

@Component
public class MyListener {private MyService myService;@Autowiredpublic void setMyService(MyService myService) {this.myService = myService;}@PostConstructpublic void init() {// 确保在初始化时设置好所有依赖}@RabbitListener(queues = "myQueue")public void receiveMessage(String message) {myService.processMessage(message);}
}

通过以上方法,可以有效地解决在使用 @RabbitListener 注解时可能遇到的循环依赖问题。选择哪种方法取决于具体的应用场景和架构设计。

@RabbitListener注解与@Profile注解组合使用

在 Spring 中,@RabbitListener@Profile 注解可以组合使用,以便根据不同的环境或配置文件来启用或禁用特定的消息监听器。这在需要根据环境(如开发、测试、生产)来调整消息处理逻辑时非常有用。

基本用法

你可以将 @Profile 注解与 @RabbitListener 注解一起使用,以便只有在特定配置文件激活时,监听器才会被注册和运行。

@Component
@Profile("dev") // 只有在 "dev" 配置文件激活时,这个 Bean 才会被创建
public class DevRabbitListener {@RabbitListener(queues = "devQueue")public void receiveMessage(String message) {System.out.println("Dev environment received: " + message);}
}@Component
@Profile("prod") // 只有在 "prod" 配置文件激活时,这个 Bean 才会被创建
public class ProdRabbitListener {@RabbitListener(queues = "prodQueue")public void receiveMessage(String message) {System.out.println("Prod environment received: " + message);}
}

配置文件激活

要激活特定的配置文件,可以在应用程序启动时设置 spring.profiles.active 属性。这可以通过多种方式实现:

1. 在 application.propertiesapplication.yml 中设置
spring.profiles.active=dev
2. 在启动应用程序时通过命令行参数设置
java -jar myapp.jar --spring.profiles.active=dev
3. 在 IDE 中设置

在 IntelliJ IDEA 或 Eclipse 中,可以在运行配置中设置激活的配置文件。

高级用法

如果需要更复杂的逻辑,可以使用 @ConditionalOnProperty 或其他条件注解来进一步控制监听器的行为。

@Component
@Profile("dev")
public class DevRabbitListener {@RabbitListener(queues = "devQueue")public void receiveMessage(String message) {System.out.println("Dev environment received: " + message);}
}@Component
@Profile("prod")
public class ProdRabbitListener {@RabbitListener(queues = "prodQueue")public void receiveMessage(String message) {System.out.println("Prod environment received: " + message);}
}@Component
@Profile("!dev & !prod") // 默认情况下激活,如果 dev 和 prod 都没有激活
public class DefaultRabbitListener {@RabbitListener(queues = "defaultQueue")public void receiveMessage(String message) {System.out.println("Default environment received: " + message);}
}

注意事项

  • 确保在配置文件中正确设置了激活的配置文件,否则相关的监听器不会被注册。
  • 使用 @Profile 注解时,确保在应用程序上下文初始化时配置文件已经被正确加载。
  • 可以通过组合多个配置文件来实现更复杂的条件逻辑。

通过将 @RabbitListener@Profile 注解结合使用,可以灵活地管理不同环境下的消息处理逻辑,使应用程序在不同环境中表现不同的行为。

@RabbitListener的autoStartup配置说明

@RabbitListenerautoStartup 配置用于控制监听器容器在应用启动时是否自动启动,以下是详细说明:

作用

autoStartup 属性是一个布尔值,默认情况下其值为 true,表示监听器容器将会在应用启动时自动启动,开始监听指定队列中的消息。当设置为 false 时,监听器容器在应用启动时不会自动启动,需要手动触发启动。

使用场景

  • 测试场景:在测试环境中,可能需要关闭某些监听器,以便单独测试其他组件,或者模拟特定的消息处理场景。此时可以将 autoStartup 设置为 false,在需要时再手动启动监听器。
  • 条件性启动:根据某些条件(如配置文件中的参数、环境变量等)来决定是否启动监听器。例如,只有在特定的配置文件激活时才启动某个监听器,可以通过结合 @Profile 注解和 autoStartup 属性来实现。
  • 资源优化:如果某些监听器在应用启动初期并不需要立即运行,可以将其 autoStartup 设置为 false,以减少应用启动时的资源占用,在需要时再启动。

示例代码

@Component
public class MyListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myQueue", durable = "true"),exchange = @Exchange(value = "myExchange", type = ExchangeTypes.TOPIC),key = "myRoutingKey"),autoStartup = "false" // 设置为false,监听器容器在应用启动时不会自动启动)public void receiveMessage(String message) {System.out.println("Received: " + message);}
}

手动启动监听器

如果将 autoStartup 设置为 false,可以通过编程方式手动启动监听器容器。例如,可以在一个 @PostConstruct 方法中启动监听器:

@Component
public class ListenerStarter {@Autowiredprivate RabbitListenerEndpointRegistry registry;@PostConstructpublic void startListeners() {registry.getListenerContainer("myListenerContainerId").start(); // "myListenerContainerId"是监听器的ID,如果未指定,可以使用默认生成的ID}
}

或者,也可以使用 RabbitAdmin 或其他管理组件来控制监听器的启动和停止。

相关文章:

  • 腾讯 ovCompose 跨平台框架发布,几年后还会有人用吗?
  • SSM spring Bean实例化
  • matlab 2024a ​工具箱Aerospsce Toolbox报错​
  • 【力扣链表篇】19.删除链表的倒数第N个节点
  • 2025年06月07日Github流行趋势
  • Vue3 项目的基本架构解读
  • 2012-2023年 上市公司-知识重组创造、知识重组再利用数据-社科经管实证数据
  • 《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析LLP (二)
  • 备份还原打印机驱动
  • 数据库管理与高可用-MySQL高可用
  • Java基于SpringBoot的校园闲置物品交易系统,附源码+文档说明
  • 以智能管理为基础,楼宇自控打造建筑碳中和新路径
  • WebFuture 系统升级提示外键约束的问题处理
  • WebWorker-----高频面试题(浏览器篇)
  • 30、memory-order-relaxed
  • 从零开始开发纯血鸿蒙应用之网络检测
  • A Execllent Software Project Review and Solutions
  • 【物联网-ModBus-RTU
  • 【Go语言基础【14】】defer与异常处理(panic、recover)
  • 【HarmonyOS 5】拍摄美化开发实践介绍以及详细案例
  • 专门找图片素材的网站/最新国际新闻50条简短
  • 开发网站教程/怎么做手工
  • 福建建设执业资格网站报名系统/seo建站是什么意思
  • 自建网站怎么做优化/百度网址链接
  • 网站开发检测用户微信号/推广app有哪些
  • 手机wap网站模板下载/营销型网站制作建设