《如何在 Spring 中实现 MQ 消息的自动重连:监听与发送双通道策略》
大家好,我是G探险者!
📌 背景场景
在高可用分布式系统中,我们经常面临:
- MQ 集群重启 → 消息监听中断
- MQ 网络短暂抖动 → 发送端连接失败
- 一端恢复正常,另一端仍处于挂死状态
如果你只配置了“连接工厂层”的重连,却忽略了监听容器或发送客户端的容错设计,重连机制可能失效,业务陷入长时间不可用。
✅ 核心理念:监听和发送是两个不同的连接“通道”
通道 | 用途 | 组件 |
---|---|---|
监听通道 | 从 MQ 拉取消息 | Spring JMS 的 MessageListenerContainer |
发送通道 | 发送消息到 MQ | Spring 的 JmsTemplate |
这两个通道各自有自己的连接池和生命周期,不能指望一个设置就解决全部问题。
🔁 一、监听端的自动重连机制
推荐做法:使用 DefaultMessageListenerContainer
并设置重连间隔
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("MY.QUEUE");
container.setMessageListener(new MyListener());// ✅ 开启事务模式可选
container.setSessionTransacted(true);// ✅ 开启自动重连机制(默认是 true)
container.setRecoveryInterval(5000L); // 每 5 秒重试连接一次container.afterPropertiesSet();
container.start();
DefaultMessageListenerContainer
内部会捕获ConnectionException
等连接中断异常,自动重试连接。
📤 二、发送端的容灾重连策略
监听容器有容器帮你维护连接,而 发送端(JmsTemplate)则需要连接池支撑。
推荐:配合使用 CachingConnectionFactory
ConnectionFactory factory = createIBMConnectionFactory(); // 原始 MQ 工厂
CachingConnectionFactory cachingFactory = new CachingConnectionFactory(factory);// 可选设置缓存大小(缓存 session 的数量)
cachingFactory.setSessionCacheSize(10);JmsTemplate jmsTemplate = new JmsTemplate(cachingFactory);
jmsTemplate.convertAndSend("MY.QUEUE", "Hello MQ");
📌 为啥要用 CachingConnectionFactory
?
原因 | 描述 |
---|---|
重用连接 | 避免每次发送都新建连接(开销大) |
支持连接断开重建 | 内部封装连接失效后重建逻辑 |
提供 session 缓存 | 提升发送效率,降低资源消耗 |
🧰 三、JMS 厂商参数补充(IBM MQ 举例)
若你使用 IBM MQ,可以在底层工厂设置:
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();factory.setHostName("192.168.1.102");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("CHANNEL1");
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);// ✅ 启用自动重连
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS,WMQConstants.WMQ_CLIENT_RECONNECT);// ✅ 设置最大重连时间(秒)
factory.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, 30);// ✅ 设置连接列表(用于集群 HA)
factory.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST,"192.168.1.102(1414),192.168.1.103(1414)");
🔗 四、总结策略建议表
场景 | 推荐设置 |
---|---|
MQ监听端 | DefaultMessageListenerContainer + setRecoveryInterval |
MQ发送端 | JmsTemplate + CachingConnectionFactory |
多 broker/集群 | 设置 CONNECTION_NAME_LIST |
事务性保障 | setSessionTransacted(true) + onMessage() 异常触发 rollback |
监听不生效 | 检查是否调用了 afterPropertiesSet() |
📘 下一篇预告:
《JMS事务性会话彻底解析:消息监听中的 commit、rollback 和幂等设计》
我们将深入剖析如何使用事务控制 MQ 消息的消费与回滚,Spring 容器如何自动帮你 commit/rollback,以及如何设计幂等保证系统不重复处理失败消息。