Python中使用HTTP 206状态码实现大文件下载的完整指南
在文件下载过程中,我们经常会遇到大文件的下载需求。传统的下载方式在面对网络不稳定或大文件时往往效率低下,而HTTP 206状态码(部分内容)则提供了更好的解决方案。本文将详细介绍如何使用Python实现支持断点续传的大文件下载。
目录
-
HTTP 206状态码简介
-
基础分块下载实现
-
多线程分块下载优化
-
带进度显示的下载器
-
完整的下载管理器
-
总结
HTTP 206状态码简介
HTTP 206状态码表示"部分内容",它允许客户端通过Range头请求资源的特定部分。这对于大文件下载具有以下优势:
-
断点续传:网络中断后可以从断点继续下载
-
多线程下载:可以同时下载文件的不同部分
-
节省带宽:只下载需要的部分内容
基础分块下载实现
首先,我们实现一个基础的分块下载器,支持断点续传功能:
python
import requests import osdef download_large_file(url, filename, chunk_size=8192):"""下载大文件(支持断点续传)参数:url: 文件下载地址filename: 本地保存文件名chunk_size: 分块大小,默认8KB"""# 获取文件信息headers = {}file_size = 0# 检查本地文件是否存在,实现断点续传if os.path.exists(filename):file_size = os.path.getsize(filename)headers['Range'] = f'bytes={file_size}-'try:response = requests.get(url, headers=headers, stream=True)if response.status_code == 206: # 部分内容print(f"继续下载: 从 {file_size} 字节开始")mode = 'ab'elif response.status_code == 200: # 全新下载print("开始新下载")mode = 'wb'file_size = 0else:print(f"下载失败,状态码: {response.status_code}")return Falsetotal_size = int(response.headers.get('content-length', 0)) + file_sizewith open(filename, mode) as file:for chunk in response.iter_content(chunk_size=chunk_size):if chunk:file.write(chunk)file_size += len(chunk)# 显示进度progress = (file_size / total_size) * 100 if total_size > 0 else 0print(f"\r下载进度: {progress:.1f}% ({file_size}/{total_size} bytes)", end='')print(f"\n下载完成: {filename}")return Trueexcept Exception as e:print(f"下载出错: {e}")return False# 使用示例 if __name__ == "__main__":url = "https://example.com/large-file.zip"filename = "large-file.zip"download_large_file(url, filename)
多线程分块下载优化
对于超大文件,我们可以使用多线程同时下载不同部分来提升下载速度:
python
import requests import os import threading from concurrent.futures import ThreadPoolExecutorclass MultiThreadDownloader:"""多线程文件下载器"""def __init__(self, url, filename, thread_num=4):self.url = urlself.filename = filenameself.thread_num = thread_numself.file_size = 0self.chunk_size = 1024 * 1024 # 1MBdef get_file_size(self):"""获取文件总大小"""response = requests.head(self.url)if 'content-length' in response.headers:return int(response.headers['content-length'])return 0def download_chunk(self, start_byte, end_byte, part_num):"""下载指定范围的数据块"""headers = {'Range': f'bytes={start_byte}-{end_byte}'}try:response = requests.get(self.url, headers=headers, stream=True)if response.status_code == 206:chunk_data = response.content# 写入文件的指定位置with open(self.filename, 'r+b') as file:file.seek(start_byte)file.write(chunk_data)print(f"部分 {part_num} 下载完成: {start_byte}-{end_byte}")else:print(f"部分 {part_num} 下载失败,状态码: {response.status_code}")except Exception as e:print(f"部分 {part_num} 下载出错: {e}")def download(self):"""多线程下载主函数"""self.file_size = self.get_file_size()if self.file_size == 0:print("无法获取文件大小")return Falseprint(f"文件总大小: {self.file_size} bytes")# 创建空文件with open(self.filename, 'wb') as file:file.truncate(self.file_size)# 计算每个线程下载的字节范围chunk_size = self.file_size // self.thread_numranges = []for i in range(self.thread_num):start = i * chunk_sizeif i == self.thread_num - 1:end = self.file_size - 1else:end = start + chunk_size - 1ranges.append((start, end, i + 1))# 使用线程池下载with ThreadPoolExecutor(max_workers=self.thread_num) as executor:for start, end, part_num in ranges:executor.submit(self.download_chunk, start, end, part_num)print("所有部分下载完成")return True# 使用示例 if __name__ == "__main__":downloader = MultiThreadDownloader(url="https://example.com/large-file.zip",filename="large-file.zip",thread_num=4)downloader.download()
带进度显示的下载器
为了更好地监控下载进度,我们可以添加进度条显示:
python
import requests import os import time from tqdm import tqdmclass ProgressDownloader:"""带进度条的文件下载器"""def __init__(self, url, filename):self.url = urlself.filename = filenamedef download_with_progress(self):"""带进度条的下载"""# 获取文件信息response = requests.head(self.url)total_size = int(response.headers.get('content-length', 0))# 检查已下载部分downloaded_size = 0if os.path.exists(self.filename):downloaded_size = os.path.getsize(self.filename)headers = {}if downloaded_size > 0:headers['Range'] = f'bytes={downloaded_size}-'# 发送请求response = requests.get(self.url, headers=headers, stream=True)if response.status_code not in [200, 206]:print(f"下载失败,状态码: {response.status_code}")return False# 更新总大小(对于断点续传)if response.status_code == 206:content_range = response.headers.get('content-range', '')if content_range:total_size = int(content_range.split('/')[-1])# 设置进度条progress_bar = tqdm(total=total_size,initial=downloaded_size,unit='B',unit_scale=True,desc="下载进度")# 下载文件mode = 'ab' if downloaded_size > 0 else 'wb'with open(self.filename, mode) as file:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)progress_bar.update(len(chunk))progress_bar.close()print("下载完成!")return True# 使用示例 if __name__ == "__main__":downloader = ProgressDownloader(url="https://example.com/large-file.zip",filename="large-file.zip")downloader.download_with_progress()
完整的下载管理器
最后,我们整合所有功能,创建一个功能完整的下载管理器:
python
import requests import os import time import hashlibclass FileDownloader:"""功能完整的文件下载管理器"""def __init__(self, max_retries=3, timeout=30):self.max_retries = max_retriesself.timeout = timeoutself.session = requests.Session()def calculate_md5(self, filename):"""计算文件的MD5值"""hash_md5 = hashlib.md5()with open(filename, "rb") as f:for chunk in iter(lambda: f.read(4096), b""):hash_md5.update(chunk)return hash_md5.hexdigest()def download_file(self, url, filename, verify_md5=None):"""下载文件,支持断点续传和MD5验证参数:url: 下载地址filename: 保存文件名verify_md5: 可选的MD5校验值"""for attempt in range(self.max_retries):try:# 获取文件信息file_size = 0headers = {}if os.path.exists(filename):file_size = os.path.getsize(filename)headers['Range'] = f'bytes={file_size}-'response = self.session.get(url, headers=headers, stream=True, timeout=self.timeout)if response.status_code not in [200, 206]:print(f"下载失败,状态码: {response.status_code}")continuetotal_size = int(response.headers.get('content-length', 0)) + file_sizemode = 'ab' if file_size > 0 else 'wb'print(f"开始下载: {filename}")print(f"文件大小: {total_size} bytes")downloaded = file_sizestart_time = time.time()with open(filename, mode) as file:for chunk in response.iter_content(chunk_size=8192):if chunk:file.write(chunk)downloaded += len(chunk)# 显示下载速度elapsed = time.time() - start_timespeed = downloaded / elapsed if elapsed > 0 else 0progress = (downloaded / total_size) * 100 if total_size > 0 else 0print(f"\r进度: {progress:.1f}% | "f"速度: {speed/1024/1024:.1f} MB/s | "f"{downloaded}/{total_size} bytes", end='')print(f"\n下载完成: {filename}")# MD5验证if verify_md5:file_md5 = self.calculate_md5(filename)if file_md5 == verify_md5:print("MD5验证成功")return Trueelse:print(f"MD5验证失败: 期望 {verify_md5}, 实际 {file_md5}")os.remove(filename) # 删除损坏的文件return Falsereturn Trueexcept Exception as e:print(f"下载尝试 {attempt + 1} 失败: {e}")if attempt == self.max_retries - 1:return Falsetime.sleep(2) # 等待后重试return False# 使用示例 if __name__ == "__main__":downloader = FileDownloader()# 下载文件(可选MD5验证)success = downloader.download_file(url="https://example.com/large-file.zip",filename="large-file.zip",verify_md5="expected_md5_hash_here" # 可选)if success:print("文件下载成功!")else:print("文件下载失败!")
总结
本文介绍了四种使用Python实现HTTP 206大文件下载的方法:
-
基础分块下载:适合简单的断点续传需求
-
多线程分块下载:适合超大文件的高速下载
-
带进度显示的下载器:提供良好的用户体验
-
完整的下载管理器:适合生产环境,包含完整的错误处理和验证机制
关键技术点
-
Range头:使用
Range: bytes=start-end
请求特定范围的内容 -
流式下载:使用
stream=True
避免内存溢出 -
状态码处理:正确处理206和200状态码
-
错误重试:实现重试机制提高下载成功率
-
文件验证:通过MD5校验确保文件完整性
适用场景
-
大文件下载(视频、镜像文件等)
-
网络不稳定的环境
-
需要断点续传的应用
-
需要显示下载进度的GUI应用