当前位置: 首页 > news >正文

Spring-rabbit使用实战六

目录

优雅实现 Spring RabbitMQ 多交换机多队列绑定

一、核心设计思路

二、完整实现代码

1. 配置定义(枚举方式)

2. 声明式配置工厂

3. 生产者服务(通用发送方法)

4. 消费者抽象基类

5. 具体消费者实现

6. 配置中心扩展(YAML 配置)

三、设计优势与扩展点

1. 架构优势

2. 扩展点设计

四、最佳实践建议

1.命名规范

2.监控增强

3.单元测试方案

4.消费者弹性配置

五、架构演进方向

1.配置中心集成

2.消息路由服务

3.流量控制中间件


优雅实现 Spring RabbitMQ 多交换机多队列绑定

在需要创建多个交换机、每个交换机绑定多个队列的场景中,通过合理设计可以显著提高代码的复用性和扩展性。以下是针对 3 个交换机各绑定 3 个队列的优雅实现方案:

一、核心设计思路

  1. 配置驱动:使用枚举或配置类定义交换机、队列和路由键

  2. 工厂模式:创建交换机、队列和绑定的工厂方法

  3. 自动绑定:通过反射或配置自动创建绑定关系

  4. 模板方法:复用消费者处理逻辑

二、完整实现代码

1. 配置定义(枚举方式)

public enum RabbitConfigEnum {// 订单业务ORDER_EXCHANGE("order.direct.exchange", ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("order.create.queue", "order.create"),new QueueConfig("order.pay.queue", "order.pay"),new QueueConfig("order.cancel.queue", "order.cancel"))),// 库存业务INVENTORY_EXCHANGE("inventory.topic.exchange", ExchangeTypes.TOPIC,Arrays.asList(new QueueConfig("inventory.deduct.queue", "inventory.deduct"),new QueueConfig("inventory.restore.queue", "inventory.restore.*"),new QueueConfig("inventory.alert.queue", "inventory.alert.#"))),// 通知业务NOTIFICATION_EXCHANGE("notification.fanout.exchange", ExchangeTypes.FANOUT,Arrays.asList(new QueueConfig("email.notification.queue", ""), // Fanout 不需要路由键new QueueConfig("sms.notification.queue", ""),new QueueConfig("push.notification.queue", "")));private final String exchangeName;private final ExchangeType exchangeType;private final List<QueueConfig> queueConfigs;RabbitConfigEnum(String exchangeName, ExchangeType exchangeType, List<QueueConfig> queueConfigs) {this.exchangeName = exchangeName;this.exchangeType = exchangeType;this.queueConfigs = queueConfigs;}// 队列配置内部类@Getter@AllArgsConstructorpublic static class QueueConfig {private final String queueName;private final String routingKey;}// 交换机类型枚举public enum ExchangeType {DIRECT, TOPIC, FANOUT}// 获取所有配置public static List<RabbitConfigEnum> getAllConfigs() {return Arrays.asList(values());}
}

2. 声明式配置工厂

@Configuration
public class RabbitMQConfigFactory {private static final Logger logger = LoggerFactory.getLogger(RabbitMQConfigFactory.class);@Beanpublic Declarables createRabbitElements() {List<Declarable> declarables = new ArrayList<>();for (RabbitConfigEnum config : RabbitConfigEnum.getAllConfigs()) {// 创建交换机AbstractExchange exchange = createExchange(config.getExchangeName(), config.getExchangeType());declarables.add(exchange);logger.info("Created exchange: {}", config.getExchangeName());// 为交换机创建队列和绑定for (RabbitConfigEnum.QueueConfig queueConfig : config.getQueueConfigs()) {Queue queue = new Queue(queueConfig.getQueueName(), true);declarables.add(queue);Binding binding = createBinding(exchange, queue, queueConfig.getRoutingKey(), config.getExchangeType());declarables.add(binding);logger.info("Created queue '{}' bound to '{}' with routing key '{}'",queueConfig.getQueueName(),config.getExchangeName(),queueConfig.getRoutingKey());}}return new Declarables(declarables);}private AbstractExchange createExchange(String name, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT -> new DirectExchange(name, true, false);case TOPIC -> new TopicExchange(name, true, false);case FANOUT -> new FanoutExchange(name, true, false);};}private Binding createBinding(AbstractExchange exchange, Queue queue, String routingKey, RabbitConfigEnum.ExchangeType type) {return switch (type) {case DIRECT, TOPIC -> BindingBuilder.bind(queue).to(exchange).with(routingKey);case FANOUT -> BindingBuilder.bind(queue).to((FanoutExchange) exchange);};}// 统一消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

3. 生产者服务(通用发送方法)

@Service
public class RabbitMQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 通用消息发送方法* * @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message, m -> {MessageProperties props = m.getMessageProperties();props.setContentType(MessageProperties.CONTENT_TYPE_JSON);props.setTimestamp(new Date());return m;});logger.debug("Sent message to {}[{}]: {}", exchange, routingKey, message);}/*** 按业务发送消息(推荐)* * @param config 业务配置* @param routingKey 路由键* @param message 消息内容*/public void sendByConfig(RabbitConfigEnum config, String routingKey, Object message) {sendMessage(config.getExchangeName(), routingKey, message);}
}

4. 消费者抽象基类

public abstract class AbstractRabbitConsumer<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitConsumer.class);/*** 通用消息处理模板* * @param message 消息内容* @param channel RabbitMQ通道* @param tag 消息标签*/@RabbitHandlerpublic void handleMessage(T message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {// 1. 业务处理processMessage(message);// 2. 手动ACK确认channel.basicAck(tag, false);logger.debug("Message processed: {}", message);} catch (BusinessException e) {// 业务异常处理handleBusinessException(e, message, channel, tag);} catch (Exception e) {// 系统异常处理handleSystemException(e, message, channel, tag);}}/*** 业务处理抽象方法(子类实现)*/protected abstract void processMessage(T message) throws BusinessException;/*** 业务异常处理(可重写)*/protected void handleBusinessException(BusinessException e, T message, Channel channel, long tag) throws IOException {logger.error("Business error processing message: {}", message, e);// 拒绝消息但不重试channel.basicReject(tag, false);}/*** 系统异常处理(可重写)*/protected void handleSystemException(Exception e, T message, Channel channel, long tag) throws IOException {logger.error("System error processing message: {}", message, e);// 拒绝消息并重新入队channel.basicReject(tag, true);}
}

5. 具体消费者实现

// 订单创建消费者
@Component
@RabbitListener(queues = "order.create.queue")
public class OrderCreateConsumer extends AbstractRabbitConsumer<Order> {@Autowiredprivate InventoryService inventoryService;@Overrideprotected void processMessage(Order order) throws BusinessException {// 减库存inventoryService.deductStock(order.getProductId(), order.getQuantity());// 记录订单orderService.saveOrder(order);// 发送创建事件eventPublisher.publishOrderCreated(order);}// 重写异常处理@Overrideprotected void handleBusinessException(BusinessException e, Order order, Channel channel, long tag) throws IOException {if (e instanceof InventoryShortageException) {// 库存不足特殊处理orderService.markAsPending(order);channel.basicAck(tag, false);} else {super.handleBusinessException(e, order, channel, tag);}}
}// 库存告警消费者
@Component
@RabbitListener(queues = "inventory.alert.queue")
public class InventoryAlertConsumer extends AbstractRabbitConsumer<InventoryAlert> {@Overrideprotected void processMessage(InventoryAlert alert) {// 发送告警通知notificationService.sendAlert(alert.getProductId(), alert.getCurrentLevel());// 记录告警日志alertService.logAlert(alert);}
}

6. 配置中心扩展(YAML 配置)

# application.yml
spring:rabbitmq:host: rabbitmq-prod.example.comport: 5672username: ${RABBIT_USER}password: ${RABBIT_PASS}virtual-host: /prodlistener:simple:acknowledge-mode: manualconcurrency: 3max-concurrency: 10prefetch: 20# 自定义交换机配置(可选扩展)
rabbit:exchanges:- name: order.direct.exchangetype: DIRECTqueues:- name: order.create.queuerouting-key: order.create- name: order.pay.queuerouting-key: order.pay- name: order.cancel.queuerouting-key: order.cancel- name: inventory.topic.exchangetype: TOPICqueues:- name: inventory.deduct.queuerouting-key: inventory.deduct- name: inventory.restore.queuerouting-key: inventory.restore.*- name: inventory.alert.queuerouting-key: inventory.alert.#

三、设计优势与扩展点

1. 架构优势

设计特点优势应用场景
配置枚举化集中管理所有配置,避免硬编码多环境部署
工厂模式统一创建逻辑,减少重复代码新增交换机/队列
抽象消费者统一异常处理和ACK机制所有消费者
通用生产者简化消息发送接口所有业务场景

2. 扩展点设计

扩展点 1:动态添加新交换机

// 添加新业务配置
RabbitConfigEnum.NEW_EXCHANGE = new RabbitConfigEnum("new.exchange",ExchangeTypes.DIRECT,Arrays.asList(new QueueConfig("new.queue1", "key1"),new QueueConfig("new.queue2", "key2"))
);

扩展点 2:自定义绑定逻辑

// 重写绑定工厂方法
private Binding createCustomBinding(AbstractExchange exchange, Queue queue, String routingKey, ExchangeType type) {if ("special.binding".equals(routingKey)) {return BindingBuilder.bind(queue).to(exchange).with(routingKey).and(createCustomArguments()); // 自定义参数}return createBinding(exchange, queue, routingKey, type);
}

扩展点 3:基于配置文件的动态配置

@Configuration
@ConfigurationProperties(prefix = "rabbit")
public class DynamicRabbitConfig {private List<ExchangeConfig> exchanges;@Beanpublic Declarables dynamicDeclarables() {// 类似工厂方法实现,从配置文件读取}@Getter @Setterpublic static class ExchangeConfig {private String name;private String type;private List<QueueBinding> queues;}@Getter @Setterpublic static class QueueBinding {private String name;private String routingKey;}
}

四、最佳实践建议

1.命名规范

// 业务.类型.功能
String exchangeName = "order.direct.exchange";
String queueName = "inventory.topic.alert.queue";
String routingKey = "order.payment.completed";

2.监控增强

// 在生产者中添加监控埋点
public void sendMessage(String exchange, String routingKey, Object message) {Timer.Sample sample = Timer.start(metricsRegistry);// ...发送逻辑sample.stop(metricsRegistry.timer("rabbit.produce.time", "exchange", exchange, "routingKey", routingKey));
}

3.单元测试方案

@SpringBootTest
public class RabbitConfigTest {@Autowiredprivate RabbitAdmin rabbitAdmin;@Testpublic void testExchangeAndQueueCreation() {// 验证所有交换机已创建for (RabbitConfigEnum config : RabbitConfigEnum.values()) {Exchange exchange = new DirectExchange(config.getExchangeName());assertTrue(rabbitAdmin.getExchangeInfo(exchange.getName()) != null);// 验证队列绑定for (QueueConfig qc : config.getQueueConfigs()) {Queue queue = new Queue(qc.getQueueName());assertTrue(rabbitAdmin.getQueueInfo(queue.getName()) != null);}}}
}

4.消费者弹性配置

# 针对不同队列配置不同消费者参数
spring:rabbitmq:listener:order:concurrency: 5max-concurrency: 20notification:concurrency: 2max-concurrency: 5

五、架构演进方向

1.配置中心集成

2.消息路由服务

@Service
public class MessageRouter {private Map<MessageType, RabbitConfigEnum> routingMap;public void routeMessage(MessageType type, Object message) {RabbitConfigEnum config = routingMap.get(type);producer.sendByConfig(config, config.getDefaultKey(), message);}
}

3.流量控制中间件

@Around("@annotation(rabbitListener)")
public Object rateLimit(ProceedingJoinPoint joinPoint) {if (!rateLimiter.tryAcquire()) {// 返回特殊响应,触发消费者暂停return new RateLimitExceededResponse();}return joinPoint.proceed();
}

这种设计通过配置驱动、工厂模式和模板方法,实现了高可复用的 RabbitMQ 集成方案,能够轻松应对业务扩展需求,同时保持代码的简洁性和可维护性。

http://www.dtcms.com/a/315729.html

相关文章:

  • 国产三防平板电脑是什么?三防平板推荐
  • Spark内核调度
  • RTC实时时钟RX8900SA国产替代FRTC8900S
  • 使用maven-shade-plugin解决es跨版本冲突
  • 微信小程序功能实现:页面导航与跳转
  • jenkins插件Active Choices的使用通过参数动态控制多选参数的选项
  • LHA6958D是一款代替AD7606的芯片
  • 【前端】网站favicon图标制作
  • MyBatisPlus查询数据库中所有表的数据(AI)
  • 使标签垂直水平居中的多种方法
  • 自动驾驶控制算法——MPC控制算法
  • 数据结构 实现单链表
  • Vue3核心语法进阶(Props)
  • C语言:选择排序算法深度剖析!
  • nodejs 编码初体验
  • JAVA无人共享球杆柜系统球杆柜租赁系统源码支持微信小程序
  • 嵌入式硬件中运放的基本控制原理
  • 基于k8s环境下的pulsar常用命令(上)
  • 达梦分布式集群DPC_分布式任务执行拆分流程_yxy
  • 安全测绘之敏感网络资产排查指南
  • 在Linux上部署RabbitMQ、Redis、ElasticSearch
  • Taro Hooks 完整分类详解
  • 深度解析随机森林 API:参数奥秘与调优指南
  • 在AI时代,如何制定有效的职业规划?AI时代职业规划+AI产品经理角色
  • 【学习笔记】NTP时间同步验证
  • Kali Linux 2025.2基于MITRE ATTCK框架
  • DPU(数据处理单元)架构中,SoC(系统级芯片)与FPGA(现场可编程门阵列)之间的数据交互
  • 山东移动e企组网技术分析:底层架构与实现方式
  • 第12届蓝桥杯Scratch_选拔赛_初级组_真题2020年11月21日
  • SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解