当前位置: 首页 > wzjs >正文

淄博周村网站建设定制电子商务书店网站设计实验

淄博周村网站建设定制,电子商务书店网站设计实验,资料大全正版资料,电话约建设网站 客户📌 引言 在分布式系统中,RabbitMQ 是一个非常流行的消息队列中间件,广泛用于解耦系统、异步处理任务、提高系统性能。然而,在实际使用中,消费端可能因为 代码异常、数据库故障、网络问题 等原因导致消息消费失败。 如…

📌 引言

在分布式系统中,RabbitMQ 是一个非常流行的消息队列中间件,广泛用于解耦系统、异步处理任务、提高系统性能。然而,在实际使用中,消费端可能因为 代码异常、数据库故障、网络问题 等原因导致消息消费失败。

如果不对这些失败消息进行处理,可能会导致数据丢失,影响业务流程。因此,我们需要一个 可靠的失败消息清洗和重试机制 来保证消息最终被成功消费或进行合理的存储处理。


📌 1. 失败消息的常见问题

在 RabbitMQ 消费过程中,可能会遇到以下问题:

  • 消息处理逻辑异常(如空指针异常、数据格式转换错误)。
  • 数据库异常(如主键冲突、唯一索引冲突、数据库连接失败)。
  • 外部依赖不可用(如调用第三方 API 超时)。
  • 消息重复消费(由于网络抖动或 RabbitMQ 配置问题)。

这些异常可能导致:

  • 消息丢失:如果 RabbitMQ 没有进行消息重试,消息可能永远丢失。
  • 死循环消费:如果未配置死信队列(DLX),消息可能会不断重新入队,造成死循环。
  • 系统压力过大:频繁失败的消息可能会影响正常消息的消费,拖垮整个消费端。

因此,我们需要一个 完整的失败消息清洗与重试机制 来保障系统的稳定性。


📌 2. 解决方案

✅ 目标

  1. 记录失败的消息,存入数据库,方便后续排查。
  2. 实现自动重试,允许在一定次数内自动重试消费失败的消息。
  3. 配置延迟队列,避免立即重试导致的瞬时高并发问题。
  4. 超出重试次数后进入死信队列,避免死循环消费,便于后续人工干预。

📝 数据库表设计

我们创建一个 failed_message_log 表来存储失败的消息:

CREATE TABLE failed_message_log (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL,exchange_name VARCHAR(255),routing_key VARCHAR(255),queue_name VARCHAR(255),message TEXT NOT NULL,error_reason TEXT NOT NULL,retry_count INT DEFAULT 0,status ENUM('PENDING', 'FAILED', 'RETRYING', 'SUCCESS') DEFAULT 'PENDING',created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

字段说明:

  • message_id:消息的唯一标识符。
  • exchange_name / routing_key / queue_name:记录消息来源,方便追溯。
  • message:存储原始消息内容。
  • error_reason:记录失败的具体原因。
  • retry_count:当前消息的重试次数。
  • status:消息状态(PENDING:待处理,FAILED:失败,RETRYING:正在重试,SUCCESS:成功)。
  • created_at / updated_at:时间戳,方便查询和统计。

📌 3. 消费端实现

📍 监听 RabbitMQ 消息

@Slf4j // 使用 Lombok 提供的日志功能,简化日志记录
@Component // 让 Spring 管理该组件
public class RabbitMqConsumer {private static final int MAX_RETRY_COUNT = 3; // 最大重试次数,超过后将消息放入死信队列@Autowiredprivate FailedMessageLogMapper failedMessageLogMapper; // 失败消息日志的数据库访问对象,用于记录失败消息@Autowiredprivate RabbitTemplate rabbitTemplate; // RabbitMQ 消息发送模板,用于消息重试和死信处理/*** 监听 RabbitMQ 队列,处理消息** @param messageId   消息的唯一标识 ID* @param redelivered 是否是 RabbitMQ 重新投递的消息(用于判断是否是重试消息)* @param message     消息内容(JSON 格式字符串)*/@RabbitListener(queues = "your_queue_name") // 监听指定队列,自动消费消息public void processMessage(@Header("messageId") String messageId,@Header(AmqpHeaders.REDELIVERED) Boolean redelivered,String message) {try {log.info("Processing message: {}", message);JSONObject jsonObject = JSON.parseObject(message); // 解析 JSON 格式的消息handleMessage(jsonObject); // 处理业务逻辑} catch (Exception e) {log.error("Message processing failed: {}", message, e); // 记录错误日志// 查询数据库中是否已存在该失败消息的记录FailedMessageLog failedMessage = failedMessageLogMapper.findByMessageId(messageId);if (failedMessage == null) {// 如果数据库中没有记录,则插入一条新的失败记录failedMessage = new FailedMessageLog(messageId, message, e.getMessage(), 0, "PENDING");failedMessageLogMapper.insert(failedMessage);} else {// 如果已存在记录,则增加重试次数failedMessage.setRetryCount(failedMessage.getRetryCount() + 1);failedMessageLogMapper.update(failedMessage);}// 判断重试次数是否超过最大限制if (failedMessage.getRetryCount() >= MAX_RETRY_COUNT) {log.warn("Message {} reached max retry limit, moving to dead letter queue", messageId);failedMessage.setStatus("FAILED"); // 标记消息为失败failedMessageLogMapper.update(failedMessage); // 更新数据库记录// 将消息发送到死信队列(DLX),用于后续人工干预rabbitTemplate.convertAndSend("dlx_exchange", "dlx_routing_key", message);} else {log.warn("Retrying message: {}, attempt {}", messageId, failedMessage.getRetryCount());failedMessage.setStatus("RETRYING"); // 更新状态为重试中failedMessageLogMapper.update(failedMessage);// 发送到延迟队列,等待 5 秒后再重试,避免频繁失败造成系统压力rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {msg.getMessageProperties().setDelay(5000); // 设置消息延迟 5 秒return msg;});}}}/*** 业务处理逻辑(模拟成功处理)** @param jsonObject 解析后的消息对象*/private void handleMessage(JSONObject jsonObject) {log.info("业务处理成功: {}", jsonObject);}
}

📌 4. 配置延迟队列 & 死信队列

📍 延迟队列

@Bean
public Queue delayedQueue() {return QueueBuilder.durable("delayed_queue").withArgument("x-dead-letter-exchange", "dlx_exchange")  .withArgument("x-dead-letter-routing-key", "dlx_routing_key").withArgument("x-message-ttl", 10000)  // 10秒延迟.build();
}

📍 死信队列

@Bean
public Queue dlxQueue() {return QueueBuilder.durable("dlx_queue").build();
}

📌 5. 总结

方案作用
数据库存储失败消息记录失败原因,方便排查
延迟队列避免瞬时失败,允许重试
最大重试次数防止死循环消费
死信队列彻底失败后进入死信队列,人工干预

这样,我们就可以 保证 RabbitMQ 消息消费的高可用性,同时避免消息丢失和死循环消费的问题。🚀🚀🚀


文章转载自:

http://6A2HMT33.dwzwm.cn
http://XrLZ04q6.dwzwm.cn
http://CVOgrYux.dwzwm.cn
http://1ZYylvjC.dwzwm.cn
http://pstvHvf8.dwzwm.cn
http://xdg0UazQ.dwzwm.cn
http://e3RIM2HF.dwzwm.cn
http://z17I8xlP.dwzwm.cn
http://LlMMd7c5.dwzwm.cn
http://Qo5AAEpr.dwzwm.cn
http://U2B4ptF2.dwzwm.cn
http://rEp4yF7H.dwzwm.cn
http://5xjqk1Pg.dwzwm.cn
http://V6CzglUO.dwzwm.cn
http://AlkCI9s0.dwzwm.cn
http://oVFwqMoU.dwzwm.cn
http://267qiyaW.dwzwm.cn
http://ELfdhX9y.dwzwm.cn
http://ba3hCJO0.dwzwm.cn
http://j2mrkNXe.dwzwm.cn
http://OS5ypN24.dwzwm.cn
http://KpNTZPIO.dwzwm.cn
http://iXP2VROW.dwzwm.cn
http://gCIjY0v8.dwzwm.cn
http://EQQbOtV2.dwzwm.cn
http://sHVYyB3r.dwzwm.cn
http://MvzXSONw.dwzwm.cn
http://HRifnGLC.dwzwm.cn
http://ITgtXMKd.dwzwm.cn
http://wwhOZNWF.dwzwm.cn
http://www.dtcms.com/wzjs/678151.html

相关文章:

  • 苏州做网站好的公司手机网站改app
  • 宣讲家网站 家风建设江苏省建设厅网站官网
  • 有关网站建设的毕业设计八戒logo设计网
  • 建设互联网站是什么免费ppt模板的网站
  • 石家庄58同城大型seo公司
  • 安卓网站开发前景wordpress评论框高度
  • 怎么给网站做301做网站设计都做些什么
  • 镇江做网站公司网站开发常用的谷歌插件
  • 企业网站建设外包服务合同哈巴狗模式网站开发
  • 电子商务网站备案网站备案机构
  • 营业执照咋做网等网站广州建设六马路小学网站
  • 什么网站做软件任务挣钱成都人才网
  • 做网站图片大小网上哪里可以注册公司
  • 老网站做seo能不能重新注册哪个素材网站做美工最好
  • 盐城网站开发公司上海建设网站是国家级吗
  • 前端网站如何做全景图查看网站访问量
  • 可信的免费网站建设淘宝关键词挖掘工具
  • 电子商务网站平台不包括网站的导航页怎么做
  • 哔哩网站开发需求分析模板北京 互联网公司
  • 热门网站有哪些北京建筑信息网
  • seo 网站地图优化国内最大的app开发公司
  • 做网站设计的网站南昌知名网站建设公司
  • 目前做美术的网站以及app学习通网页版
  • 腾讯云图床wordpress免费网站seo
  • 怎么看网站用什么平台做的深圳网站建设制作网络公司
  • 一个购物交易网站怎么做婺源做网站有吗
  • 2017年用什么语言做网站案例查询网站
  • 全国网站打开速度象山专业网站建设
  • wordpress网站如何播放视频jsp网站开发之html入门知识
  • 个人网站 怎么备案整站网站优化价格