使用aiohttp实现高并发爬虫
使用aiohttp来编写一个高并发的爬虫,想法很不错,现实很骨感。这里我们要知道,由于高并发可能会对目标服务器造成压力,请确保遵守目标网站的robots.txt,并合理设置并发量,避免被封IP。
我将通过示例代码,我将并发爬取多个URL,并打印出每个URL的响应状态和内容长度。
下面是一个使用 aiohttp
实现的高并发爬虫示例,包含详细注释和并发控制:
import asyncio
import aiohttp
from datetime import datetimeasync def fetch_url(session, url, semaphore):"""异步获取单个URL的内容:param session: aiohttp会话对象:param url: 目标URL:param semaphore: 控制并发的信号量"""async with semaphore: # 限制并发数量try:start_time = datetime.now()async with session.get(url, timeout=10) as response: # 10秒超时content = await response.text()return {"url": url,"status": response.status,"content_length": len(content),"time": (datetime.now() - start_time).total_seconds()}except Exception as e:return {"url": url,"error": str(e)}async def crawl(urls, max_concurrency=100):"""主爬虫函数:param urls: URL列表:param max_concurrency: 最大并发数"""results = []# 创建信号量控制并发semaphore = asyncio.Semaphore(max_concurrency)# 创建连接池(复用TCP连接)connector = aiohttp.TCPConnector(limit=0) # 0表示不限制连接池大小async with aiohttp.ClientSession(connector=connector,headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}) as session:tasks = []for url in urls:task = asyncio.create_task(fetch_url(session, url, semaphore))tasks.append(task)# 等待所有任务完成results = await asyncio.gather(*tasks)return resultsif __name__ == "__main__":# 示例URL列表(实际使用时可替换为真实目标)urls = ["https://httpbin.org/get?q=1","https://httpbin.org/get?q=2","https://httpbin.org/get?q=3","https://httpbin.org/delay/1", # 模拟延迟响应"https://httpbin.org/status/404","https://invalid.url" # 测试错误处理] * 5 # 复制5倍生成30个URLprint(f"开始爬取 {len(urls)} 个URL...")# 启动异步主程序start = datetime.now()results = asyncio.run(crawl(urls))elapsed = (datetime.now() - start).total_seconds()# 输出统计信息success = [r for r in results if "error" not in r]print(f"\n完成! 耗时: {elapsed:.2f}秒")print(f"成功: {len(success)} 失败: {len(results)-len(success)}")print(f"平均响应时间: {sum(r['time'] for r in success if 'time' in r)/len(success):.2f}秒")# 输出错误信息示例(可选)errors = [r for r in results if "error" in r]if errors:print("\n错误示例:")for e in errors[:3]:print(f"URL: {e['url']}\n错误: {e['error']}\n")
关键特性说明:
1、高并发控制:
- 使用
asyncio.Semaphore
限制最大并发量(默认100) - TCP连接池复用(
TCPConnector
)
2、错误处理:
- 自动捕获请求异常(超时、DNS错误等)
- 记录错误信息不中断程序
3、性能优化:
- 连接复用减少TCP握手开销
- 异步I/O避免线程切换成本
- 超时设置防止阻塞
4、结果分析:
- 统计成功率/失败率
- 计算平均响应时间
- 显示错误样本
运行效果:
开始爬取 30 个URL...
完成! 耗时: 1.82秒
成功: 25 失败: 5
平均响应时间: 1.12秒错误示例:
URL: https://invalid.url
错误: Cannot connect to host invalid.url:443 ssl:True...
扩展建议:
1、添加代理支持:
async with session.get(url, proxy="http://proxy.com:8080") as response:
2、实现限速:
await asyncio.sleep(0.1) # 每个请求后延迟
3、持久化存储:
# 在fetch_url中添加
with open(f"data/{url_hash}.html", "w") as f:f.write(content)
4、动态URL生成:
urls = [f"https://example.com/page/{i}" for i in range(1, 1000)]
5、重试机制:
for retry in range(3):try:return await fetch()except Exception:await asyncio.sleep(2**retry)
这个爬虫框架可在单机上实现每秒数百个请求的并发能力,但是最终实际性能取决于网络带宽和目标服务器响应速度。所以说想要获得高性能爬虫能力,配套的服务器和带宽一定要足,不然发挥不出其实力。