自己动手造轮子:用Requests和线程池构建一个轻量级高并发爬虫框架
在数据采集场景中,复杂框架往往带来冗余设计,而简单脚本又难以应对高并发需求。本文将带你从零开始,用 Python 的 Requests 库和线程池,打造一个轻量、灵活、高效的高并发爬虫框架,既保留手动编码的可控性,又能满足中小规模数据采集的性能要求。
一、技术选型核心逻辑
选择 Requests + 线程池的组合,核心是平衡 “易用性” 与 “高性能”:
- Requests:简洁的 API 设计,无需复杂配置即可完成 HTTP 请求,支持 cookie、代理、请求头定制,完美适配各类网站的基础访问需求。
- 线程池(concurrent.futures.ThreadPoolExecutor):Python 标准库内置模块,无需额外安装,能高效管理线程生命周期,避免手动创建线程的资源泄露风险,特别适合 IO 密集型的爬虫任务。
- 轻量级定位:不依赖 Scrapy 等重型框架的复杂组件,核心代码仅数百行,易调试、易修改,可根据具体需求快速迭代。
二、框架核心架构设计
框架采用模块化拆分,分为 5 个核心组件,各司其职且低耦合:
- 配置模块:统一管理请求头、线程数、目标 URL 列表、存储路径等参数。
- 请求模块:封装 Requests 请求逻辑,处理重试、超时、反爬伪装。
- 解析模块:提取目标数据(支持 XPath、BeautifulSoup 或正则)。
- 并发模块:通过线程池调度任务,控制并发数。
- 存储模块:将解析后的数据保存到文件(CSV/JSON)或数据库。
整体流程:读取配置 → 生成任务队列 → 线程池执行请求 → 解析数据 → 异步存储 → 任务完成。
三、分步实现框架核心代码
3.1 环境准备
首先安装依赖(仅需 Requests,解析模块可选 BeautifulSoup):
bash
pip install requests beautifulsoup4 lxml
3.2 配置模块:统一管理参数
将可变参数集中配置,方便后续修改维护:
python
运行
class SpiderConfig:# 并发配置MAX_WORKERS = 10 # 线程池最大线程数(根据目标网站抗压能力调整)TIMEOUT = 10 # 请求超时时间(秒)RETRY_TIMES = 3 # 失败重试次数# 请求配置HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Referer": "https://www.baidu.com"}PROXIES = None # 代理配置(需使用时填写:{"http": "http://ip:port", "https": "https://ip:port"})# 任务配置TARGET_URLS = [] # 待爬取URL列表(可从文件读取或动态生成)SAVE_PATH = "spider_result.csv" # 数据存储路径
3.3 请求模块:稳健的 HTTP 请求封装
处理网络波动、超时、反爬等问题,确保请求成功率:
python
运行
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retryclass RequestHandler:def __init__(self, config: SpiderConfig):self.config = config# 初始化session,配置重试机制self.session = requests.Session()retry_strategy = Retry(total=config.RETRY_TIMES,backoff_factor=0.5, # 重试间隔:0.5, 1, 1.5...秒status_forcelist=[429, 500, 502, 503, 504] # 针对这些状态码重试)self.session.mount("http://", HTTPAdapter(max_retries=retry_strategy))self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))self.session.headers.update(config.HEADERS)self.session.proxies = config.PROXIESdef get(self, url: str, params=None) -> requests.Response:"""发送GET请求,返回响应对象"""try:response = self.session.get(url=url,params=params,timeout=self.config.TIMEOUT,verify=False # 忽略SSL证书验证(根据需求选择))response.raise_for_status() # 触发HTTP错误(4xx/5xx)response.encoding = response.apparent_encoding # 自动识别编码return responseexcept Exception as e:print(f"请求失败:{url},错误信息:{str(e)}")return None
3.4 解析模块:灵活提取目标数据
采用 “抽象基类 + 自定义实现” 的方式,支持不同解析规则:
python
运行
from abc import ABC, abstractmethod
from bs4 import BeautifulSoupclass ParserBase(ABC):@abstractmethoddef parse(self, response: requests.Response) -> dict:"""解析响应数据,返回字典格式结果"""pass# 示例:解析博客列表(可根据目标网站修改)
class BlogListParser(ParserBase):def parse(self, response: requests.Response) -> dict:if not response:return {}soup = BeautifulSoup(response.text, "lxml")data = []# 假设页面中博客项的CSS选择器为".blog-item"for item in soup.select(".blog-item"):title = item.select_one(".blog-title").get_text(strip=True) if item.select_one(".blog-title") else ""author = item.select_one(".blog-author").get_text(strip=True) if item.select_one(".blog-author") else ""publish_time = item.select_one(".publish-time").get_text(strip=True) if item.select_one(".publish-time") else ""data.append({"title": title,"author": author,"publish_time": publish_time,"url": response.url})return {"data": data}
3.5 存储模块:异步保存数据
避免存储操作阻塞爬虫进程,采用简单的文件存储(支持扩展到数据库):
python
运行
import csv
import json
from threading import Lockclass DataStorage:def __init__(self, save_path: str):self.save_path = save_pathself.lock = Lock() # 解决多线程写文件冲突def save_to_csv(self, data: list):"""保存数据到CSV文件"""if not data:return# 获取字段名(从第一条数据提取)fieldnames = data[0].keys()with self.lock:# 追加模式写入,不存在则创建with open(self.save_path, "a+", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=fieldnames)# 判断文件是否为空,为空则写入表头f.seek(0)if not f.readline():writer.writeheader()writer.writerows(data)def save_to_json(self, data: list):"""保存数据到JSON文件(追加模式)"""if not data:returnwith self.lock:with open(self.save_path.replace(".csv", ".json"), "a+", encoding="utf-8") as f:for item in data:json.dump(item, f, ensure_ascii=False)f.write("\n")
3.6 并发核心:线程池调度
整合所有模块,实现高并发任务执行:
python
运行
from concurrent.futures import ThreadPoolExecutor, as_completedclass LightweightSpider:def __init__(self, config: SpiderConfig, parser: ParserBase):self.config = configself.request_handler = RequestHandler(config)self.parser = parserself.storage = DataStorage(config.SAVE_PATH)def run_single_task(self, url: str) -> None:"""执行单个爬取任务:请求→解析→存储"""response = self.request_handler.get(url)parsed_data = self.parser.parse(response)if parsed_data.get("data"):self.storage.save_to_csv(parsed_data["data"])# 可选:同时保存JSON格式# self.storage.save_to_json(parsed_data["data"])print(f"成功爬取:{url},获取{len(parsed_data['data'])}条数据")def run(self) -> None:"""启动爬虫,线程池调度任务"""print(f"启动爬虫,并发线程数:{self.config.MAX_WORKERS}")print(f"待爬取URL总数:{len(self.config.TARGET_URLS)}")# 使用线程池执行任务with ThreadPoolExecutor(max_workers=self.config.MAX_WORKERS) as executor:# 提交所有任务futures = [executor.submit(self.run_single_task, url) for url in self.config.TARGET_URLS]# 等待所有任务完成for future in as_completed(futures):future.result() # 捕获任务中的异常(若有)print("所有爬取任务执行完毕!")
四、框架使用示例
以爬取某技术博客列表为例,演示完整使用流程:
python
运行
if __name__ == "__main__":# 1. 配置参数(根据目标网站调整)config = SpiderConfig()config.MAX_WORKERS = 15 # 调整并发数config.TARGET_URLS = [f"https://example-blog.com/list?page={i}" for i in range(1, 51) # 爬取1-50页]config.SAVE_PATH = "blog_data.csv"# 可选:添加代理(应对反爬)# config.PROXIES = {"http": "http://127.0.0.1:7890", "https": "https://127.0.0.1:7890"}# 2. 选择解析器(自定义解析规则)parser = BlogListParser()# 3. 启动爬虫spider = LightweightSpider(config, parser)spider.run()
五、优化与扩展方向
5.1 基础优化
- 动态请求头:随机切换 User-Agent、Referer,降低被识别为爬虫的概率。
- 代理池集成:对接免费 / 付费代理池,自动切换代理,突破 IP 限制。
- 任务去重:用 Redis 或集合存储已爬取 URL,避免重复请求。
- 限速控制:通过
time.sleep或信号量控制请求频率,保护目标网站。
5.2 功能扩展
- 分布式爬取:将任务队列迁移到 Redis,多台机器共享任务,提升爬取规模。
- 动态渲染支持:集成 Selenium 或 Playwright,应对 JavaScript 渲染的网站。
- 数据验证:添加数据字段校验逻辑,过滤无效数据。
- 日志系统:替换 print 为 logging 模块,记录爬取日志,方便问题排查。
六、总结
这套轻量级爬虫框架基于 Requests 和线程池,仅需少量代码即可实现高并发数据采集。核心优势在于 “灵活可控”—— 无需依赖复杂框架的冗余功能,可根据具体需求快速修改请求、解析、存储逻辑。
对于中小规模的数据采集场景(如爬取博客、新闻列表、商品信息等),它既能满足性能要求,又能降低开发和维护成本。而通过扩展代理池、分布式等功能,还能应对更复杂的反爬场景。
