Redis+Kafka实现动态延时任务
对于未上传完的文件片占用的磁盘空间,则是通过Redis+Kafka实现动态延时任务的存储与下发执行,保证与MySQL的最终一致性
从 “业务背景”、“技术方案”、“为何用Redis+Kafka”、“如何保证最终一致性” 四个角度来分析:
🧩 一、业务背景
在实现大文件 分片上传、断点续传 功能时,有以下问题:
- 用户上传的文件被分成多个小片,每个片段会先暂存在服务器硬盘的临时目录中;
- 文件合并前,这些文件片会一直占据磁盘空间;
- 如果用户上传到一半就不继续了(无论是误操作还是恶意攻击),这些碎片就永远不会被合并清理;
- 从而造成 磁盘空间长期占用、资源浪费、甚至导致服务崩溃(被攻击压垮)。
所以:必须有一种机制自动清理这些未完成上传的文件碎片。
🔧 二、技术方案总览
核心目标:每个文件片上传完后,给它设置一个“延时任务”,比如:如果 5 秒内没有继续上传,那就清理这个文件的所有碎片。
技术选型:
Redis
:用来快速缓存上传状态、延时任务(高性能读写);Kafka
:作为可靠的异步消息中间件,负责延时任务的可靠下发和最终落地执行(高可靠性);MySQL
:真正的数据持久化落地(低频写、稳定性要求高)。
🚀 三、为什么用 Redis + Kafka?
✅ Redis 的作用:
- 用
ZSet
(有序集合)存储所有文件上传任务及其过期时间; - 每次上传文件片,都会刷新延时时间(例如当前时间 + 5秒);
- 定时调度器(比如每秒执行)扫描 Redis 中到期的任务;
- 如果任务到期未上传完,就发消息到 Kafka。
👉 Redis 保证快速更新和调度精度,但本身不做复杂业务处理。
✅ Kafka 的作用:
- Kafka 消费端监听 Redis 发过来的超时任务;
- 收到后执行任务:删除对应磁盘碎片,更新数据库中“未完成上传空间”等信息;
- Kafka 保证消息可靠投递、可重试、可顺序、具备持久化和幂等保障。
👉 Kafka 确保任务“最终会被执行”,即使 Redis/调度器短暂异常,消息也不会丢。
🧩 四、如何保证与 MySQL 的最终一致性?
这是句子中的关键——“最终一致性”的意思是:即使中间发生延迟、系统抖动等,只要系统恢复,MySQL 里的数据最终会被正确更新,和 Redis 的状态保持一致。
⚙️ 具体机制:
阶段 | 数据存储层 | 行为 |
---|---|---|
上传过程中 | Redis | 每次上传一个文件片,就更新 Redis 中该任务的延迟时间;同时更新 Redis 中该用户“未完成上传空间”字段 |
任务过期 | Kafka | Redis 将过期任务发到 Kafka 任务队列 |
任务执行 | 消费端处理 | 1. 删除磁盘碎片; 2. 将 Redis 中统计的空间同步到 MySQL(更新“未完成上传空间”字段) 3. 删除 Redis 中该任务记录 |
上传完成 | 应用服务 | 合并文件,Redis 中清除上传状态,同时从 Redis 和数据库中扣除“未完成上传空间”并加到“已完成上传空间” |
只要 Kafka 不丢消息,消费端一定会最终完成这个任务 → 保证 MySQL 和 Redis 中的数据最终一致。
✅ 总结
“对于未上传完的文件片占用的磁盘空间,则是通过Redis+Kafka实现动态延时任务的存储与下发执行,保证与MySQL的最终一致性”
可以如下理解:
- 未上传完的文件片会临时占用磁盘,需要被及时清理;
- 每个上传任务会被缓存到 Redis(使用 ZSet);
- 到期的任务由 Redis 调度器识别后,通过 Kafka 发送消息通知执行清理;
- Kafka 消费端负责真正去清理碎片文件 + 同步更新数据库;
- 即便系统某一刻异常,只要 Kafka 的消息还在,最终一定会将 MySQL 的状态修正正确;
- 这样就达到了:高性能缓存 + 高可靠调度 + 数据最终一致。
✅ 简历中表达方式:
为应对大文件上传中因中断或恶意攻击导致的磁盘碎片堆积问题,设计并实现基于
Redis ZSet + Kafka
的动态延时清理机制:
- 使用 Redis 高性能缓存上传任务及延时状态;
- 超时任务通过 Kafka 异步下发,保障任务可靠落地执行;
- 任务执行过程更新用户未完成上传空间并持久化至 MySQL,最终实现数据库与缓存状态一致。
当然可以,下面我通过一个实际例子,完整演示如何用 Redis + Kafka
来管理未上传完成的文件片所引发的磁盘空间占用问题,以及如何保证最终一致性。
🎯 背景场景
假设用户 user123 在上传一个视频文件,文件大小为 100MB,分成了 10个片段(每片10MB)。系统配置:
- 用户上传空间上限:500MB
- 磁盘碎片的保存目录按月存储,例如
/tmp/upload/2025-06/user123/abcd1234/
- 延时清理时间为:5秒未上传下一个片段就触发清理任务
🧩 实际操作流程
🪛 第一次上传(上传第1个文件片):
-
用户上传第1片(10MB),服务器接收后将它缓存在
/tmp/upload/2025-06/user123/abcd1234/part1
。 -
系统计算
md5 = abcd1234
(该文件的唯一标识)。 -
系统将任务写入 Redis:
ZSet 名称:zset_upload_tasks_1 key: abcd1234+user123 score: 当前时间戳 + 5秒(延时清理时间)
-
在 Redis 的另一个
Hash
中记录:user123 -> unfinishedSize = 10MB
-
任务调度器每秒扫描一次 ZSet,发现该任务未超时 → 不处理。
🧱 上传第2片(第2次上传):
- 用户紧接着上传第2片,系统发现已有这个任务 → 刷新 Redis 中的延时任务时间(当前时间 + 5秒)。
unfinishedSize
累加至 20MB。
🧨 用户断开连接,上传中断
假设用户上传完第3片(共30MB)后就关闭浏览器了,系统检测不到任何新的上传。
⏱️ Redis 延时任务触发
-
5秒过去,调度器再次扫描 ZSet:
- 发现
abcd1234+user123
的任务已超时。
- 发现
-
系统将该任务信息通过 Kafka 发送出去:
{"type": "upload_timeout","userId": "user123","fileMd5": "abcd1234","size": 30MB,"tmpPath": "/tmp/upload/2025-06/user123/abcd1234/"
}
🔁 Kafka 消费端处理任务
Kafka 消费者监听到这条消息,进行以下操作:
-
删除临时目录
/tmp/upload/2025-06/user123/abcd1234/
下所有文件片(节省磁盘空间)。 -
将 Redis 中
unfinishedSize = 30MB
减去该任务对应的大小。 -
同步更新 MySQL 数据库中:
use_space_unfinished = use_space_unfinished - 30MB
-
日志记录该任务已完成,确保幂等。
📌 最终效果(总结)
时间点 | 状态 | 磁盘使用 | Redis 状态 | 数据库状态 |
---|---|---|---|---|
上传中 | 正常写入 | +30MB | unfinishedSize = 30MB | unchanged |
超时后 | Kafka触发 | -30MB | unfinishedSize = 0MB | use_space_unfinished - 30MB |
🎓 一句话总结
“用户上传文件中断后,Redis 中的延时任务自动触发清理逻辑,通过 Kafka 下发异步清理任务,释放磁盘空间并同步更新 MySQL 中的已用空间,确保缓存和持久化层的最终一致性。”
- 分布式缓存与消息队列协作机制的理解;
- Redis 延时调度能力;
- Kafka 幂等消费处理;
- 数据一致性策略(最终一致);
如何基于 Redis + Kafka 管理未上传完成的文件片,防止服务器硬盘被恶意占用并确保最终一致性
📘 技术分享稿:未上传文件片的清理与一致性保障机制
在开发 SmartDrive 云盘系统 时,我们遇到一个潜在的系统稳定性问题:如果用户上传文件过程中中断(例如恶意攻击、频繁取消上传等),未完成的文件分片会持续占用服务器磁盘空间。由于这些文件尚未合并,数据库中的用户“已使用空间”不会更新,长此以往可能导致服务器磁盘资源被占满,影响服务可用性。
为了解决这一问题,我们设计并实现了一个基于 Redis + Kafka 的动态延时任务系统,实现文件片清理、用户空间回收以及数据一致性保障。
🔧 实现方案
-
文件上传分片记录与限时清理机制
- 用户上传每一个文件片时,我们通过
文件MD5 + 用户ID
构造唯一标识,并将该上传任务存入 Redis 的分布式 ZSet(有序集合)中,设置延迟时间(如5秒)作为 score。 - 若用户持续上传,则不断刷新该任务的过期时间;若中断,延迟任务最终会被触发。
- 用户上传每一个文件片时,我们通过
-
空间占用统计
- Redis 中为每个用户维护两个字段:
use_space_finished
(已上传完毕)和use_space_unfinished
(上传中)。 - 每次分片上传成功后,
use_space_unfinished
累加对应片段大小,但不立即写入数据库,以减少数据库压力。
- Redis 中为每个用户维护两个字段:
-
Kafka 异步任务分发
- 当调度器检测到某个上传任务超时后,会将其作为“上传中断”事件写入 Kafka 任务队列。
- Kafka 消费者接收到该事件后,执行清理逻辑:删除临时目录下的文件片、扣减 Redis 和数据库中的
use_space_unfinished
,同时记录日志确保幂等性。
-
一致性保障
- Redis 负责高频写操作,Kafka 消费者在空闲时批量落库,保证 Redis 与 MySQL 的最终一致性。
- 通过唯一任务 ID 和幂等机制,确保即使任务重复下发也不会出现重复扣减或误删文件。
💡 举例说明
假设用户 user123
上传一个 100MB 的视频,被分成 10 片。上传前3片后中断,系统记录:
- Redis 中
use_space_unfinished = 30MB
- 临时目录占用磁盘空间为 30MB
当用户5秒内未继续上传,调度器触发 Kafka 任务:
- 删除
/tmp/upload/.../user123/md5xyz/
下所有片段 - Kafka 消费者更新 Redis 和 MySQL,使
use_space_unfinished -= 30MB
最终,系统磁盘被及时释放,数据状态一致,避免了无效数据积压。
📌 效果与优势
- ⚙ 系统可用性提升:及时清理无效数据,防止磁盘被打爆
- ⚡ 高性能:Redis 实现高频写入,Kafka 异步处理延时任务,系统抗压能力强
- 🔐 安全性好:即使遭遇恶意攻击,也能快速识别并清理
- 📈 一致性保障:Redis 与 MySQL 数据最终一致,确保用户体验无误
这个机制目前已经稳定运行在我们云盘系统的上传链路中,极大地提升了系统的健壮性与可维护性。如果大家有类似文件上传、延时处理或一致性问题,也可以参考我们这套 Redis + Kafka 架构模式。
架构优点、对比其他方案、以及可选替代方案
✅ 为什么选择 Redis + Kafka 架构模式?
一、Redis 的优势:高速缓存 + 精准调度
- Redis 支持毫秒级延时处理(通过
ZSet + score + timestamp
实现),适合存储上传任务及调度时间。 - 具备高并发处理能力,可支持上传过程中对用户空间的快速写入与实时更新。
- 使用内存存储避免频繁访问数据库,缓解数据库压力。
二、Kafka 的优势:高吞吐 + 异步解耦
- Kafka 具备高可用、高吞吐、可回溯等特性,适合作为任务通知的“缓冲带”。
- Redis 负责状态判断和触发,Kafka 负责任务发出后的“慢处理”,避免阻塞请求线程。
- 异步解耦:上传请求不需要等待清理逻辑完成,极大提升响应速度与系统稳定性。
三、二者组合的优势
- Redis 做“调度器”,Kafka 做“执行者”,既确保调度精度,又实现任务缓冲与最终一致性。
- Redis 失效后 Kafka 仍可保留任务;Kafka 消费异常可重试,具备很好的鲁棒性和可恢复性。
🔁 与其他架构方案对比
架构模式 | 优势 | 缺点 |
---|---|---|
✅ Redis + Kafka | 高性能、高解耦、最终一致性强 | 系统设计复杂度略高 |
❌ 直接写数据库 | 简单直观 | 并发高时容易写崩库,写放大严重,影响主业务 |
❌ 仅用 Redis 实现延时清理 | 写性能好 | 无法可靠落盘,易丢失任务;需要自行实现幂等与持久化 |
❌ Quartz / ScheduledExecutorService | 精度较低,线程消耗高 | 不适合大规模任务调度,任务量大时调度不稳定 |
❌ RabbitMQ / 延迟队列 | 可替代 Kafka | 但吞吐与可靠性不如 Kafka,且不易追踪任务执行状态 |
🌟 其他可选实现方式(如果 Redis + Kafka 不可用)
1. 使用 Redis + 延迟队列中间件(如 RabbitMQ 的延迟插件、RocketMQ 延迟消息)
- 替代 Kafka
- 适合中小型系统,且部署运维成本较低
2. 使用 Redis + 定时任务轮询(如 ScheduledExecutor + Redis)
- 定期扫描 Redis 中即将过期的任务并执行
- 适用于任务量不大、调度精度要求不高的场景
3. 利用 Redisson 的 RDelayedQueue
- 支持分布式延迟队列,适用于 Redis 原生不支持的延迟任务功能
- 配合业务逻辑处理较轻的场景,快速上线
🧠 总结:为什么 Redis + Kafka 更好?
Redis 解决了高频访问场景下的快速读写 + 精准延时调度,Kafka 解决了异步、解耦、幂等、高吞吐处理的问题,两者结合:
- 性能强、可靠性高
- 调度精度好
- 任务追踪容易
- 系统扩展性强
这套架构非常适合大型上传系统中复杂的上传状态追踪、用户空间管控与数据一致性需求,能够在面对高并发、突发流量、异常上传行为时保持系统稳定。
Redis ZSet + Kafka 在上传文件片时的使用场景中的技术细节解析
🔧 背景场景简介
当用户上传大文件时,通常会进行分片上传。但如果恶意用户仅上传部分文件片(不合并完成),这些碎片可能长时间占用磁盘资源,导致服务器空间耗尽。
为此,系统需监控这些“未合并文件片”的生命周期,并在长时间未完成上传的情况下及时清理无效文件片。
这就引出了 Redis + Kafka 的组合使用:
✅ Redis ZSet 的作用(任务调度器)
技术细节:
- 使用 Redis 的有序集合(ZSet) 存储每个未完成上传任务。
- Key:如
upload_timeout_task_bucket_{n}
(分桶方案) - Member:任务唯一 ID,例如
md5_userid
- Score:当前时间戳 + 超时时间(例如 5 分钟)
示例:
ZADD upload_timeout_task_bucket_1 1718178000 md5_1234
表示用户1234上传的某个文件片,在 1718178000
(约5分钟后)仍未完成,则视为超时。
扩展细节:
- 使用内存 Map 缓存
md5+userid → zset桶编号
映射,提高查找性能。 - 每次用户上传一个新分片时,就更新对应任务的超时时间,延迟5秒或更长时间再次触发判断。
- 如果任务上传完成,则从 ZSet 删除,避免误清理。
✅ Kafka 的作用(任务下发执行者)
技术细节:
- Redis 的调度器定期扫描 ZSet 中即将到期的任务(例如每秒扫描
score ≤ 当前时间戳
的任务)。 - 对这些任务使用 Kafka 发送清理事件,异步通知后端执行清理动作。
Kafka 消息示例:
{"taskId": "md5_1234","userId": "1234","action": "clean_unfinished_chunks","timestamp": 1718178000
}
- 消息被下游服务异步消费,删除文件碎片、释放空间,并更新数据库与缓存状态。
- 具备 幂等处理能力:每条消息带唯一 ID,消费者侧使用去重机制防止重复清理。
🔐 并发与一致性保障机制
分桶设计:
- 将所有延时任务分散到多个 ZSet,例如 10 个桶:
upload_timeout_task_bucket_0
到_9
,按哈希值取模分配。 - 减少单个 ZSet 的 size,提升调度器扫描效率和延迟控制精度。
幂等性控制:
- Kafka 消息处理必须是幂等的,即同一条任务消息不管消费几次,最终状态一致。
- 可在数据库或 Redis 中记录任务处理状态(如 taskId → processed)防止重复执行。
与数据库一致性:
- Redis 在内存中快速处理临时状态;
- Kafka 负责写操作的异步最终一致性通知;
- 最终由数据库记录实际使用空间(如已上传/未完成空间大小字段)。
📌 总结一句话
使用 Redis ZSet 精准调度未完成上传的文件片生命周期,Kafka 异步可靠下发清理任务,两者协作实现了高并发场景下的磁盘保护、状态可追踪、任务幂等、最终一致性处理,有效防止恶意上传攻击,保障系统稳定性。
ZSet 分桶
“ZSet 分桶”是一个在高并发或大规模数据处理场景下的性能优化策略。它的核心思想是:将原本存储在一个 Redis 有序集合(ZSet)中的大量任务,拆分成多个 ZSet 存储,分散访问压力,提高查询效率与调度精度。
🧠 为什么要“分桶”?
当你把所有延迟任务都放在一个 Redis ZSet 里,比如叫 upload_timeout_tasks
,随着时间推移,这个集合会变得非常大。ZSet 的查询效率虽然不错,但:
ZRANGEBYSCORE
查询的是有序数据,任务一多,扫描就慢;- 每秒调度器都要扫描一次“过期任务”,访问压力集中;
- 单个 ZSet 容量太大,也可能成为 Redis 内存碎片化或阻塞操作的隐患。
为了解决这个问题,我们“分桶”。
🧰 ZSet 分桶的做法
举个例子:
假设我们要存储 100 万个上传文件的延时清理任务,不再用一个 ZSet,而是:
upload_timeout_task_bucket_0
upload_timeout_task_bucket_1
...
upload_timeout_task_bucket_9
共 10 个“桶”(ZSet)。我们把任务“均匀分布”到这些桶中。
如何分布任务?
可以根据任务的哈希值取模分桶,例如:
int bucketIndex = (md5 + userId).hashCode() % 10;
将这个任务放入第 bucketIndex
个桶中。
这样每个 ZSet 只维护一小部分任务,大大减少了单个桶的查询开销。
🔄 调度器如何配合分桶?
原来调度器每秒只扫描一个 ZSet:
ZRANGEBYSCORE upload_timeout_tasks 0 currentTime
现在变成轮询每个桶:
for i in 0..9:ZRANGEBYSCORE upload_timeout_task_bucket_i 0 currentTime
这样每次每个桶扫描的数据量变小了,调度延迟更小、吞吐量更高,也方便并发处理。
✅ 总结
特性 | 说明 |
---|---|
🎯 目的 | 降低单个 Redis ZSet 的压力,提高调度效率和查询性能 |
⚙️ 方式 | 将任务分散放入多个 ZSet(桶)中,按照哈希取模分配 |
🚀 优势 | 并发性能更强、查询更快、调度更精准、避免单点瓶颈 |
💡 适用场景 | 上传分片延时清理、定时任务调度、过期资源管理等场景 |
Kafka
🧠 一、Kafka 是什么?
Kafka 是一个高吞吐、可持久化的分布式消息队列系统,主要特点:
特性 | 说明 |
---|---|
发布-订阅模式 | 生产者发布消息,消费者订阅处理消息 |
高吞吐 | 每秒处理百万级消息 |
持久化 | 数据写入磁盘,支持消息持久保存 |
可扩展性 | 支持多 Broker 组成集群,水平扩展 |
容错性强 | 支持副本机制,节点宕机也不会丢消息 |
🧰 二、Kafka 在“文件片延迟清理”中的作用
在我们的系统中,用户上传文件是通过分片方式进行的:
- 如果某些分片长时间未上传完成,它们就一直占据服务器磁盘空间;
- 我们通过 Redis 的 ZSet 创建动态延时任务,比如“10分钟后检查是否上传完成”;
- 但延时任务并不直接清理磁盘,而是将清理请求发送到 Kafka 这个消息队列中;
- Kafka 中的消费者再异步消费任务,做清理、更新数据库等工作。
🔄 三、流程图示(简化)
[用户上传文件片] ↓
[Redis ZSet 生成延时任务](上传超时5分钟)↓
[调度器扫描 ZSet,到期后将任务发送到 Kafka Topic]↓
[Kafka 消费者消费消息]↓
[清理临时分片 + 更新 Redis/MySQL 空间使用数据]
📦 四、Kafka 的使用细节
1. Producer(生产者)发送消息
当某个文件片超时未合并:
// 伪代码:发送清理任务
kafkaTemplate.send("file-cleanup-topic", msg);
消息内容一般包括:
- 用户 ID
- 文件 MD5
- 分片路径
- 文件大小
2. Consumer(消费者)消费清理任务
@KafkaListener(topics = "file-cleanup-topic")
public void cleanupHandler(String msgJson) {// 解析消息// 删除磁盘中的分片// 更新 Redis 的未完成空间大小// 更新数据库(最终一致性)
}
3. 幂等性保证
Kafka 提供:
-
消息持久化:不怕宕机
-
消息重复消费:你需要在处理逻辑中加入幂等性设计,比如:
- 先检查文件是否已清理过;
- 或根据“唯一ID”判断是否是同一任务。
✅ 五、使用 Kafka 的优势
优势 | 说明 |
---|---|
解耦调度与处理逻辑 | Redis 延时任务调度只负责“发通知”,真正清理由 Kafka 消费者异步处理 |
提升系统性能与可扩展性 | 通过 Kafka 实现异步批量处理,避免同步阻塞 |
高可用保障 | Kafka 的持久化机制确保任务不会丢失 |
支持幂等处理 | 可以防止重复删除、误删等操作 |
💬 一句话总结:
在文件上传场景中,为防止未完成分片长期占用磁盘空间,我们基于 Redis ZSet 构建延迟任务调度机制,并结合 Kafka 进行任务异步下发和消费,实现高吞吐、高可用的碎片清理架构,同时通过唯一任务 ID 实现幂等处理与最终一致性保障。
系统性地解释整个链路
🧭 场景背景
用户上传大文件时,会被切分为多个文件片。为防止用户上传未完成就退出(或恶意攻击),我们使用Redis + Kafka架构,实现:
- 文件片超时未合并则定时清理(延迟任务)
- 上传空间计算最终一致性(Redis缓存+MySQL同步)
🔄 整体流程图
上传分片 → Redis ZSet 记录延时任务 → 到期 → 发送 Kafka 消息 → Kafka 消费者执行任务↓ ↓
更新 Redis 缓存使用量(未完成) 清理临时文件 + 更新 Redis/MySQL 空间使用情况
🧪 一、如何将任务发送到 Kafka Topic?
我们使用 Spring Boot + Kafka 的集成方式(Spring for Apache Kafka)。
示例代码(发送任务):
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendCleanupTask(String userId, String fileMd5, long unfinishedSize) {JSONObject task = new JSONObject();task.put("userId", userId);task.put("fileMd5", fileMd5);task.put("unfinishedSize", unfinishedSize);kafkaTemplate.send("file-cleanup-topic", task.toJSONString());
}
当 Redis 的 ZSet 检测到任务到期(即当前时间 > score),就调用该方法将任务发到 Kafka 的 file-cleanup-topic
主题中。
📥 二、Kafka 消费者如何消费消息?
你可以通过 @KafkaListener
注解监听某个 Topic:
@KafkaListener(topics = "file-cleanup-topic", groupId = "file-cleaner-group")
public void consumeCleanupTask(String messageJson) {JSONObject task = JSONObject.parseObject(messageJson);String userId = task.getString("userId");String fileMd5 = task.getString("fileMd5");long size = task.getLong("unfinishedSize");// 1. 删除 Redis 中对应的文件分片记录redisTemplate.delete("chunk:" + userId + ":" + fileMd5);// 2. 删除磁盘临时分片fileService.deleteTempChunks(userId, fileMd5);// 3. 更新 Redis 中的 use_space_unfinished 字段redisTemplate.opsForHash().increment("user:" + userId, "use_space_unfinished", -size);// 4. 将最终结果异步持久化到数据库userMapper.decreaseUnfinishedSize(userId, size);
}
🧠 三、Redis 与 MySQL 的数据更新方式
Redis 缓存的设计
我们使用 Hash 存储用户空间信息:
Key: user:{userId}
Field: use_space // 已上传完毕的空间
Field: use_space_unfinished // 未上传完毕的空间
更新逻辑:
- 每上传一片:
use_space_unfinished += chunkSize
- 清理:
use_space_unfinished -= chunkSize
- 合并:
use_space_unfinished -= totalSize
,use_space += totalSize
MySQL 最终一致性(异步批量同步)
UPDATE user_space
SET use_space_unfinished = use_space_unfinished - #{size}
WHERE user_id = #{userId};
这一步是为了防止 Redis 异常丢失数据时,系统还能恢复一致性。
✅ 总结一下完整链路
步骤 | 描述 |
---|---|
1. 上传分片 | 用户上传某个分片时,记录上传大小,更新 Redis 中的 use_space_unfinished |
2. 创建延迟任务 | 使用 Redis ZSet 记录(fileMd5+userId)+ 上传时间戳 |
3. 定时扫描任务 | 到期后调用 KafkaTemplate.send() 发送清理任务到 Kafka |
4. Kafka 消费者处理 | 监听 topic,执行任务:清文件 + 更新 Redis + 更新数据库 |
5. 最终一致性 | Redis 快速缓存写,MySQL 异步持久化,确保数据准确 |
💬 一句话总结:
为了防止文件片上传未完成导致磁盘资源被长期占用,我们使用 Redis ZSet 实现延时任务调度,通过 Kafka 实现任务异步消费。Redis 记录用户未完成空间信息以减轻数据库压力,Kafka 消费者在任务触发后清理磁盘分片并更新 Redis 与数据库,最终实现空间使用信息的一致性同步和系统高性能处理。