Python 实现大文件的高并发下载
项目背景
基于一个 scrapy-redis 搭建的分布式系统,所有item都通过重写 pipeline 存储到 redis 的 list 中。这里我通过代码演示如何基于线程池 + 协程实现对 item 的中文件下载。
-
Item 结构
目的是为了下载 item 中
attachments保存的附件内容。{ "crawl_time": "20221017 12:00:00", "version": "20221017 12:00:00", "data": [{ "title": "", "attachments": [ { "ori_url": "https://www.baidu.com", # 文件地址 "path": "", # 文件本地保存路径 "filename": "xxx" # 文件名称 } ] }] }
一、批量获取 item
为了能够提高数据的存储效率,选择从 redis 中弹出多个 item,但当前部署的 redis 版本为 5.0,lpop 不支持同时弹出多个数据,需要通过 LRANGE 和 LTRIM 命令实现,但是两个命令执行不是原子操作,在多线程的情况下会导致数据异常,因此通过 lua 脚本执行批量弹出多个 item 。
1.1 lua 脚本

1.2 读取数据
设定好批量读取的大小,执行 lua 脚本,获取数据。

二、并发
2.1 线程池
使用线程池去管理这么多 item 下载任务的原因:
- 减少频繁创建和销毁线程的开销
- 控制并发数量,防止不断创建线程导致资源耗尽
- 复用线程,减少线程切换开销
将获取到的 data 进行分片,分片后的数据交给多个线程去下载,提高并发效率。

2.2 协程任务
每个线程新建一个事件循环对象 loop,用来管理分片后的 data 协程任务。
为了复用 TCP 连接和 session,选择让分片 data 共享一个 TCPConnector 和 ClientSession 对象。这是基于 data 分片大小大概率是同一个网站的数据设计的,可以降低连接创建会话管理的的资源消耗。

2.3 协程并发
通过 asyncio.gather 实现协程并发。

三、大文件分块
下载文件时,如果文件比较大,网络又不稳定的情况下,很容易导致下载失败,因此这里通过将文件分块下载优化流程。
3.1 分块
对文件分块之前,先要获取文件大小。向服务器发送一个预请求 head,来获取文件长度,这样可以避免获取整个文件,减少网络传输耗时。

然后对文件进行分块处理,在传输中,需要平衡 网络拥塞 和 请求频次 导致的消耗,这里选择将文件分为 1024 * 1024 也就是 1 MB 的块大小。
使用 asyncio.Semaphore 控制 同时进行的下载任务数量,避免过多并发导致服务器崩溃。

3.2 下载
修改 headers 中的 Range 获取文件指定块大小的内容。
通过装饰器实现文件的断点续传功能,防止因网络不稳定导致文件内容缺失。

当文件的某个块下载失败,超出重试次数时,取消所有该文件块的下载任务,暂时放弃该文件,记录到失败下载队列中保存,避免因为问件本就损坏这种情况导致不断重试。

异步装饰器的实现

3.3 拼接
result 按顺序返回请求的结果,将请求的文件块拼接完成。

