动态控制rabbitmq中的消费者监听的启动和停止
引言:
在业务系统中,消息中间件常用于实现流量削峰、异步通信等场景中发挥重要作用,通过消息监听机制可实现数据的异步获取与处理;而实际业务中常需根据特定场景(如系统维护、下游服务异常等)动态暂停或启动消息监听,对此即需要通过动态控制消费的启动和停止。以下操作方可实现该功能
一、引入依赖
引入springboot web以及rabbitmq的依赖
<!--springboot web -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!--springboot web -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置application.yaml 配置文件
spring:rabbitmq:addresses: 1127.0.0.1:5672username: adminpassword: konne20211220@.+virtual-host: /
三、创建消费服务
@Configuration
public class Consumer {@Autowiredprivate RabbitTemplate rabbitTemplate;//服务中运行的数量public int serverRunSite;Logger logger = LoggerFactory.getLogger(Consumer.class);/*** 消费rabbitmq 中 testConsumer 的数据,并设置手动确认消息* testListener 是rabbitmq中的唯一id,不能重复*/@RabbitListener(id = "testListener", queues = "testConsumer", ackMode = "MANUAL")public void receiveMessage(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {logger.info("收到方案新增信息:{}", msg);try {channel.basicAck(tag,false);} catch (Exception e) {logger.error("消息确认失败,msg:{}",msg);}int limitSize = 10;//服务限制的数量和服务当前运行的数量一致进行停止当前服务监听if(limitSize == serverRunSite){rabbitTemplate.convertAndSend("datasource_add", msg);logger.warn("当前服务消费数量已达上限,数据扔回队列");}else{//处理下游业务}}
}
四、实现rabbitmq的消费者启动和停止
@RestController
public class UpdateStatusController {//获取到rabbitmq的注册器@Autowiredprivate RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;/*** 更改状态* @param status 0 开启,1关闭*/@GetMapping("updateStatus")private void updateStatus(int status) {if(status == 0){this.startConsumption();}else{this.stopConsumption();}}/*** 通过rabbitListenerEndpointRegistry 根据唯一id获取container* 关闭监听*/public void stopConsumption() {AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");container.stop();}/*** 开启监听*/public void startConsumption() {AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");container.start();}}
按照以上步骤,方可实现通过接口控制rabbitmq中的消费者 启动 与 停止