当前位置: 首页 > news >正文

如何让rabbitmq保存服务断开重连?保证高可用?

在 Spring Boot 集成 RabbitMQ 时,可以通过以下几种方式让 RabbitMQ 保存服务断开重连,以保证高可用:

配置自动重连

  • application.properties 配置 :在 Spring Boot 的配置文件 application.properties 中,可以设置 RabbitMQ 的连接工厂相关参数来开启自动重连功能。

    • spring.rabbitmq.listener.simple.recovery-interval:设置自动重连的时间间隔,在指定的时间间隔后会尝试重新建立连接。 spring.rabbitmq.connection-timeout:设置连接超时时间,单位为毫秒。如果在指定的时间内无法完成连接,就会认为连接失败,之后会根据其他重连机制进行尝试。

  • 代码配置 :可以在配置类中通过编程的方式配置连接工厂的相关参数来实现自动重连。

@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setConnectionTimeout(5000);connectionFactory.setRequestedHeartbeat(20);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setTopologyRecoveryEnabled(true);return connectionFactory;
}

其中,setAutomaticRecoveryEnabled(true) 启用自动重连功能,setTopologyRecoveryEnabled(true) 表示在重连时会自动恢复队列、交换器等拓扑结构。

使用重试机制

  • @Retryable 注解 :在发送消息的方法上添加@Retryable注解,当发送消息失败时,会根据注解的配置进行重试。

@Retryable(value = {AmqpException.class}, maxAttempts = 3, backoff = @Backoff(delay = 3000))
public void sendMessage(String message) {rabbitTemplate.convertAndSend("exchange", "routingKey", message);
}

这里maxAttempts指定了最大重试次数为 3 次,backoff指定了重试的延迟时间为 3000 毫秒。

  • RetryTemplate 配置 :可以通过配置RetryTemplate来自定义重试策略,例如设置重试的间隔、重试的次数等,然后将其注入到消息发送相关的类中使用。

@Bean
public RetryTemplate retryTemplate() {RetryTemplate retryTemplate = new RetryTemplate();FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();fixedBackOffPolicy.setBackOffPeriod(3000);retryTemplate.setBackOffPolicy(fixedBackOffPolicy);SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();retryPolicy.setMaxAttempts(3);retryTemplate.setRetryPolicy(retryPolicy);return retryTemplate;
}

在发送消息的方法中使用retryTemplate.execute来执行发送消息的操作。

集群模式

  • 搭建 RabbitMQ 集群 :将多个 RabbitMQ 服务器组成一个集群,当一个节点出现故障时,客户端可以自动连接到其他可用的节点上。在 Spring Boot 中,可以通过配置多个 RabbitMQ 节点的地址来实现与集群的连接。

@Bean
public ConnectionFactory rabbitConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();List<String> addresses = new ArrayList<>();addresses.add("amqp://guest:guest@localhost:5672");addresses.add("amqp://guest:guest@localhost:5673");addresses.add("amqp://guest:guest@localhost:5674");connectionFactory.setAddresses(addresses);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");return connectionFactory;
}

这样,当连接到某个节点失败时,会自动尝试连接其他节点。

消息持久化

  • 消息持久化 :将消息设置为持久化存储,这样在 RabbitMQ 服务重启后,消息不会丢失。可以通过在发送消息时设置消息的持久化标志来实现。

rabbitTemplate.convertAndSend("exchange", "routingKey", message, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return messagePostProcessor;
});

 同时,还需要将队列设置为持久化,在声明队列时指定durable参数为true

Queue queue = new Queue("queueName", true);

消费者确认机制

  • 手动确认 :在消息消费完成后,手动发送确认消息,这样在服务断开重连后,RabbitMQ 会重新发送未确认的消息,保证消息的可靠性。

@RabbitListener(queues = "queueName")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {// 消息处理逻辑channel.basicAck(tag, false);} catch (Exception e) {channel.basicNack(tag, false, true);}
}

如果消息处理成功,则发送basicAck确认消息;如果处理失败,则发送basicNack拒绝消息,并将消息重新入队。

监控和告警

  • 监控工具 :使用监控工具如 Prometheus 和 Grafana 等来监控 RabbitMQ 的运行状态,包括连接数、队列长度、消息速率等。当出现异常情况时,及时发现并进行处理。

  • 告警机制 :配置告警规则,当监控指标超过设定的阈值时,触发告警通知,及时通知运维人员进行干预,确保 RabbitMQ 的稳定运行。

相关文章:

  • tryhackme——Lateral Movement and Pivoting
  • 学习threejs,使用Physijs物理引擎,加载各种几何体网格对象
  • 【Web前端开发】CSS基础
  • vue2关闭eslint
  • 通义千问席卷日本!开源界“卷王”阿里通义千问成为日本AI发展新基石
  • Kotlin 懒初始化值
  • Cabot:开源免费的 PagerDuty 替代品,让系统监控更简单高效
  • 力扣 旋转图像
  • 十五、多态与虚函数
  • 在MyBatis Plus里处理LocalDateTime类型
  • Termius ssh连接服务器 vim打开的文件无法复制问题
  • 【Java ee初阶】IP协议
  • 进程和线程
  • GTC2025——英伟达布局推理领域加速
  • 什么是Vim
  • 神经生物学+图论双buff,揭示大脑语言系统的拓扑结构
  • 探秘高可用负载均衡集群:企业网络架构的稳固基石
  • EnumUtils:你的枚举“变形金刚“——让枚举操作不再手工作业
  • ARM-CortexM固件升级相关问题研究
  • 模型上下文协议(MCP):AI的“万能插座”
  • 李公明 | 一周画记:印巴交火会否升级为第四次印巴战争?
  • “犍陀罗艺术与亚洲文明”在浙大对外展出
  • 价格周报|供需回归僵局,本周生猪均价与上周基本持平
  • 婚姻登记“全国通办”首日,上海虹口登记中心迎来“甜蜜高峰”
  • 国家主席习近平会见斯洛伐克总理菲佐
  • 马上评丨维护学术诚信别陷入“唯AI检测”误区