当前位置: 首页 > news >正文

rabbitmq的多交换机(扇出为例)监听实现

  1. 配置文件,微服务或者单体都可以在yml里配置,也可以写死在java文件.
spring:rabbitmq:addresses: 127.0.0.1:5672username: guestpassword: guestvirtual-host: /connection-timeout: 15000#配置rabbitMq启用开关enable: truelistener:simple:acknowledge-mode: manualconcurrency: 15max-concurrency: 20
  1. 监听处理文件
package jzy.ziyuan.util.rabbitmq;import cn.hutool.json.JSONObject;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import jnpf.util.JsonUtil;
import jzy.model.mom.EventMessageMom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true", matchIfMissing = true)
public class MultiExchangeMessageListener {private static final Logger logger = LoggerFactory.getLogger(MultiExchangeMessageListener.class);@Autowiredprivate IdempotentMessageProcessor idempotentProcessor;@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 监听来自多个交换机的消息*/// 声明 Direct 交换机、队列,并绑定路由键@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "order.queue", durable = "true"),    // 队列exchange = @Exchange(name = "20250917test1", type = ExchangeTypes.FANOUT) // 交换机))@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "user.queue", durable = "true"),    @RabbitListener(queues = "#{tempQueue.name}") // 引用临时队列public void handleMessage(Message message) throws IOException {String messageId = message.getMessageProperties().getMessageId();String exchange = message.getMessageProperties().getReceivedExchange();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 1. 幂等性检查if (idempotentProcessor.isProcessed(messageId)) {logger.info("消息已处理,跳过重复消费: messageId={}, exchange={}", messageId, exchange);// 手动确认消息rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicAck(deliveryTag, false);return;}// 2. 处理消息内容String messageBody = new String(message.getBody(), "UTF-8");String processedMessage = replaceNbspWithSpace(messageBody);// 3. 解析消息JSONObject jsonObject = JsonUtil.getJsonToBean(processedMessage, JSONObject.class);EventMessageMom eventMessage = JsonUtil.getJsonToBean(jsonObject, EventMessageMom.class);logger.info("接收来自交换机的消息: exchange={}, messageId={}, content={}",exchange, messageId, eventMessage);switch (exchange) {default:break;}// 4. 执行业务逻辑// 5. 标记为已处理idempotentProcessor.markAsProcessed(messageId);// 6. 手动确认消息rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicAck(deliveryTag, false);logger.info("消息处理成功: messageId={}, exchange={}", messageId, exchange);} catch (Exception e) {logger.error("消息处理失败: messageId={}, exchange={}", messageId, exchange, e);// 业务失败,拒绝消息并放入死信队列rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicNack(deliveryTag, false, false); // 不重新入队}}/*** 过滤特殊符号*/public static String replaceNbspWithSpace(String str) {if (str == null) {return null;}return str.replaceAll("\\u00A0", " ");}
}
  1. rabbitmq配置文件
package jzy.ziyuan.util.rabbitmq;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MultiExchangeRabbitConfig {// 交换机名称列表 - 可根据实际需求扩展public static final String[] EXCHANGE_NAMES = {"exchange.20250918logs","exchange.20250918events","exchange.20250918notifications"};// 交换机类型public static final String EXCHANGE_TYPE = "fanout";// 死信交换机和队列public static final String DLX_EXCHANGE = "20250918exchange.dlx";public static final String DLX_QUEUE = "20250918queue.dlx";/*** 创建死信交换机*/@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(DLX_EXCHANGE).durable(true).build();}/*** 创建死信队列*/@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(DLX_QUEUE).build();}/*** 绑定死信队列到死信交换机*/@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}/*** 创建临时队列 - 非持久化、排他、自动删除*/@Beanpublic Queue tempQueue() {// 配置死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);args.put("x-dead-letter-routing-key", "dlx.routing.key");return QueueBuilder.nonDurable()    // 非持久化.exclusive()                // 排他队列.autoDelete()               // 自动删除.withArguments(args).build();}/*** 创建多个交换机并绑定到临时队列*/@Beanpublic Declarables exchangeBindings() {Declarables declarables = new Declarables();// 创建所有交换机并绑定到临时队列for (String exchangeName : EXCHANGE_NAMES) {// 创建交换机FanoutExchange exchange = ExchangeBuilder.fanoutExchange(exchangeName).durable(true).build();declarables.getDeclarables().add(exchange);// 绑定交换机到临时队列Binding binding = BindingBuilder.bind(tempQueue()).to(exchange);declarables.getDeclarables().add(binding);}return declarables;}/*** 配置监听容器工厂 - 手动ACK和QoS设置*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认factory.setPrefetchCount(10); // QoS设置factory.setConcurrentConsumers(3); // 并发消费者数量factory.setMaxConcurrentConsumers(5); // 最大并发消费者数量// 配置自动重连机制factory.setRecoveryInterval(5000L); // 重连间隔5秒return factory;}
}
http://www.dtcms.com/a/507732.html

相关文章:

  • 做网站那个好做淘宝联盟网站要多少钱
  • 4A 架构(业务架构、数据架构、应用架构、技术架构)在智慧电网中的实战:从边缘 AI 到云边协同的代码级拆解
  • django 做网站wordpress使用七牛防止降权
  • 定制建站方案当当网站建设的目标
  • 阿里云创建交换分区、设置内存监控预警和自动处理内存占用过大进程的脚本
  • 网站建设费用做什么科目思明自助建站软件
  • php网站开发实战教程app和网站开发语言的区别
  • 引领未来交易:达普韦伯全链路Swap交易所系统开发解决方案
  • 2.CUDA编程模型
  • YOLOV4
  • MES系统如何实现生产过程数据采集与管控?
  • 医保局网站建设dw网页设计作品简单
  • 网站如何更换空间wordpress镜像什么意思
  • 使用Yum安装Redis
  • Verilog和FPGA的自学笔记7——流水灯与时序约束(XDC文件的编写)
  • 蜜蜂及飞行昆虫多类别检测数据集VOC+YOLO格式3630张6类别
  • 从零开始:在 TCP 基础上手动搭建 ModBus TCP 协议
  • 台州的网站建设wordpress动漫展主题
  • 外贸假发 网站南京网站设计制作公司排名
  • 【C++闯关笔记】模板的特化
  • iOS flutter 上架 4.3(a)【flutter 专讲】
  • 【鸿蒙flutter】flutter_echarts和webview_flutter 兼容问题解决
  • flutter 鸿蒙
  • 重庆潼南网站建设公司企业网站建设示范平台
  • 临沂做网站哪家好西安的互联网公司
  • Linux下的DNS配置文件/etc/resolv.conf详解(1)
  • MyBatis 中 #{ } 与 ${ } 的区别与使用场景
  • C++源代码行数统计工具的设计与实现
  • temBoard:一款开源PostgreSQL监控和管理工具
  • 模型上下文协议(MCP)——使用Java构建SQL数据库代理(MCP代理教程)