当前位置: 首页 > news >正文

动态控制rabbitmq中的消费者监听的启动和停止

引言:

在业务系统中,消息中间件常用于实现流量削峰、异步通信等场景中发挥重要作用,通过消息监听机制可实现数据的异步获取与处理;而实际业务中常需根据特定场景(如系统维护、下游服务异常等)动态暂停或启动消息监听,对此即需要通过动态控制消费的启动和停止。以下操作方可实现该功能

一、引入依赖

引入springboot web以及rabbitmq的依赖

<!--springboot web -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!--springboot web -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、配置application.yaml 配置文件

spring:rabbitmq:addresses: 1127.0.0.1:5672username: adminpassword: konne20211220@.+virtual-host: /

三、创建消费服务

@Configuration
public class Consumer {@Autowiredprivate RabbitTemplate rabbitTemplate;//服务中运行的数量public int serverRunSite;Logger logger = LoggerFactory.getLogger(Consumer.class);/*** 消费rabbitmq 中 testConsumer 的数据,并设置手动确认消息*  testListener 是rabbitmq中的唯一id,不能重复*/@RabbitListener(id = "testListener", queues = "testConsumer", ackMode = "MANUAL")public  void receiveMessage(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {logger.info("收到方案新增信息:{}", msg);try {channel.basicAck(tag,false);} catch (Exception e) {logger.error("消息确认失败,msg:{}",msg);}int limitSize = 10;//服务限制的数量和服务当前运行的数量一致进行停止当前服务监听if(limitSize == serverRunSite){rabbitTemplate.convertAndSend("datasource_add", msg);logger.warn("当前服务消费数量已达上限,数据扔回队列");}else{//处理下游业务}}
}

四、实现rabbitmq的消费者启动和停止

@RestController
public class UpdateStatusController {//获取到rabbitmq的注册器@Autowiredprivate RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;/*** 更改状态* @param status 0 开启,1关闭*/@GetMapping("updateStatus")private  void updateStatus(int status) {if(status == 0){this.startConsumption();}else{this.stopConsumption();}}/*** 通过rabbitListenerEndpointRegistry 根据唯一id获取container* 关闭监听*/public void stopConsumption() {AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");container.stop();}/*** 开启监听*/public void startConsumption() {AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer("testListener");container.start();}}

按照以上步骤,方可实现通过接口控制rabbitmq中的消费者 启动停止


文章转载自:

http://6YZqq3ur.brmbm.cn
http://mRSQ8aTm.brmbm.cn
http://2SMyK8tv.brmbm.cn
http://eALrX2Pg.brmbm.cn
http://irFiMSKo.brmbm.cn
http://PVW73PdA.brmbm.cn
http://N887Nm0M.brmbm.cn
http://zZAKgwqT.brmbm.cn
http://QJx3xAvn.brmbm.cn
http://BciKdGkb.brmbm.cn
http://cFkcbvdz.brmbm.cn
http://qseFN4kJ.brmbm.cn
http://fD3AC5VQ.brmbm.cn
http://cIcX0jzi.brmbm.cn
http://0h1maMhu.brmbm.cn
http://joqIKIS5.brmbm.cn
http://LeQ3WJgQ.brmbm.cn
http://rjgepZ1r.brmbm.cn
http://HdT4M07Z.brmbm.cn
http://Ld4btm8v.brmbm.cn
http://83f8CPz0.brmbm.cn
http://cg3MBQYe.brmbm.cn
http://03dOKe06.brmbm.cn
http://9E8OhWnr.brmbm.cn
http://boY85VZ8.brmbm.cn
http://nEoHhZJd.brmbm.cn
http://YtKmU5Rn.brmbm.cn
http://wq12hhiC.brmbm.cn
http://7Girkmcr.brmbm.cn
http://f9XVdG40.brmbm.cn
http://www.dtcms.com/a/375576.html

相关文章:

  • C# 基于halcon的视觉工作流-章30-圆圆距离测量
  • Android Studio 构建项目时 Gradle 下载失败的解决方案
  • 【STM32项目开源】STM32单片机智能恒温箱控制系统
  • 苹果ios的系统app应用WebClip免签应用开源及方式原理
  • Java数据库连接JDBC完全解析
  • Node-RED 究竟是否适合工业场景?
  • zephyr开发环境搭建
  • OpenCV 实战:基于模板匹配的身份证号自动识别系统
  • java中将租户ID包装为JSQLParser的StringValue表达式对象,JSQLParser指的是?
  • CMake工程指南
  • 单北斗GNSS该如何在变形监测中发挥最大效能?
  • 大数据毕业设计-基于大数据的高考志愿填报推荐系统(高分计算机毕业设计选题·定制开发·真正大数据)
  • 分布式锁redis
  • Java学习之——“IO流“的进阶流之转换流的学习
  • git 如何直接拉去远程仓库的内容且忽略本地与远端不一致的commit
  • 每日一算:分发糖果
  • 神经算子学习
  • AI大模型入门1.1-python基础字符串代码
  • Tlias管理系统(多表查询-内连接外连接)
  • win11家庭版配置远程桌面
  • 8. LangChain4j + 提示词工程详细说明
  • ChatGPT大模型训练指南:如何借助动态代理IP提高训练效率
  • 利用git进行版本控制
  • 深入理解synchronized:从使用到原理的进阶指南
  • 第八章 矩阵按键实验
  • 【CSS 3D 实战】从零实现旋转立方体:理解 3D 空间的核心原理
  • C++互斥锁使用详解与案例分析
  • Python+DRVT 从外部调用 Revit:批量创建柱
  • Matlab机器人工具箱6.2 导入stl模型——用urdf文件描述
  • 网页设计模板 HTML源码网站模板下载