高并发下的优雅延迟:Python异步爬虫(aiohttp)的速率限制实践
一、为何需要“优雅”的延迟?从 <font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">time.sleep()</font>
的失效说起
在同步爬虫中,我们使用 <font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">time.sleep()</font>
来在请求之间插入间隔。这个方法简单粗暴,但却行之有效。然而,在异步世界里,<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">time.sleep()</font>
是一个阻塞式调用。它会阻塞整个事件循环,使得所有并发的任务都被“冻住”,这完全违背了异步编程的初衷。
取而代之的是 <font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">asyncio.sleep()</font>
,它是一个非阻塞的延迟,在等待期间事件循环可以自由地去执行其他任务。
但问题远不止于此。真正的核心在于:为什么我们要延迟?
- 遵守道德与规则(Robots.txt):许多网站会在
<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">robots.txt</font>
中规定<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">Crawl-delay</font>
。尊重这些规则是网络公民的基本素养。 - 减轻目标网站压力:过快的请求频率会像DDoS攻击一样,对目标网站的服务器造成巨大压力,甚至导致服务瘫痪。
- 避免被反爬机制封锁:这是最直接的生存需求。服务器端通过检测IP的请求频率和模式,可以轻易地识别出爬虫并封禁IP。固定的、简单的延迟同样容易被识别。
- 保证数据抓取的稳定性:一个稳健的爬虫系统需要在长期运行中保持稳定。合理的速率限制是稳定性的基石。
因此,“优雅”的延迟意味着:在保持高并发性能优势的同时,智能地、动态地控制请求速率,使其既高效又难以被察觉,从而稳定、持久地完成抓取任务。
二、核心技术栈:aiohttp 与 asyncio
**<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">asyncio</font>**
: Python 的异步I/O框架,用于编写并发代码。**<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">aiohttp</font>**
: 基于<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">asyncio</font>
的异步HTTP客户端/服务器框架。我们将用它来发起高效的异步HTTP请求。
三、实践方案:从基础到高级
我们将通过三种递进的方案来展示如何实现优雅的速率限制。
方案一:简单令牌桶算法实现
令牌桶算法是网络流量整形中最常用的算法之一。其基本思想是:
- 有一个桶,以固定速率(如每秒
<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">r</font>
个)生成令牌。 - 桶的容量是固定的(最多存放
<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">b</font>
个令牌)。 - 每个请求需要从桶中获取一个令牌。
- 如果桶中有令牌,则取出一个,请求立即执行。
- 如果桶中无令牌,则请求必须等待,直到有新的令牌生成。
这种算法既能将平均速率限制在预期值,又能允许一定程度的突发流量。
代码实现:
import asyncio
import aiohttp
from typing import Optional
import timeclass TokenBucket:"""一个简单的异步令牌桶实现"""def __init__(self, rate: float, capacity: int):"""Args:rate: 令牌生成速率,个/秒capacity: 令牌桶容量"""self._rate = rateself._capacity = capacityself._tokens = capacityself._last_time = time.monotonic() # 使用单调时间避免系统时间调整的影响async def acquire(self):"""获取一个令牌,如果不够则等待"""while self._tokens < 1:await self._add_tokens()# 短暂让出控制权,避免繁忙等待await asyncio.sleep(0)self._tokens -= 1return Trueasync def _add_tokens(self):"""根据时间差计算并添加令牌"""now = time.monotonic()elapsed = now - self._last_time# 计算这段时间内应生成的令牌数new_tokens = elapsed * self._rateif new_tokens > 0:self._tokens = min(self._capacity, self._tokens + new_tokens)self._last_time = nowasync def fetch_with_token_bucket(session: aiohttp.ClientSession, url: str, bucket: TokenBucket):"""使用令牌桶限制的请求函数"""await bucket.acquire() # 先获取令牌try:async with session.get(url) as response:print(f"Status: {response.status}, URL: {url}")# 这里可以返回或处理响应内容# return await response.text()except Exception as e:print(f"Request failed for {url}: {e}")async def main_with_token_bucket():"""使用令牌桶的主函数"""# 限制为每秒10个请求,桶容量为5(允许一定突发)bucket = TokenBucket(rate=10, capacity=5)urls = [f"https://httpbin.org/get?id={i}" for i in range(50)] # 示例URLasync with aiohttp.ClientSession() as session:tasks = []for url in urls:# 为每个URL创建一个任务task = asyncio.create_task(fetch_with_token_bucket(session, url, bucket))tasks.append(task)# 等待所有任务完成await asyncio.gather(*tasks)# 运行
# asyncio.run(main_with_token_bucket())
方案二:利用 <font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">asyncio.Semaphore</font>
进行总并发数控制
虽然令牌桶很优秀,但有时我们更需要直接控制“同时在进行中”的请求数量。这可以防止过多的并发连接耗尽本地或服务器端的资源。<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">asyncio.Semaphore</font>
(信号量)是实现此目标的完美工具。
信号量管理着一个内部计数器,<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">acquire()</font>
使其减少,<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">release()</font>
使其增加。如果计数器为零,<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">acquire()</font>
会等待,直到其他任务调用 <font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">release()</font>
。
代码实现:
import asyncio
import aiohttpasync def fetch_with_semaphore(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore):"""使用信号量限制并发数的请求函数"""async with semaphore: # 进入上下文管理器时自动acquire,退出时自动releasetry:async with session.get(url) as response:print(f"Status: {response.status}, URL: {url}")# 模拟处理响应的时间# await asyncio.sleep(0.1)except Exception as e:print(f"Request failed for {url}: {e}")async def main_with_semaphore():"""使用信号量的主函数"""# 限制最大并发数为5semaphore = asyncio.Semaphore(5)urls = [f"https://httpbin.org/get?id={i}" for i in range(50)]async with aiohttp.ClientSession() as session:tasks = []for url in urls:task = asyncio.create_task(fetch_with_semaphore(session, url, semaphore))tasks.append(task)await asyncio.gather(*tasks)# 运行
# asyncio.run(main_with_semaphore())
方案三:组合拳——令牌桶 + 信号量 + 随机延迟(生产级推荐)
在实际生产环境中,我们通常需要多管齐下,结合多种策略来达到最“优雅”的效果。
- 令牌桶:控制长期的平均请求速率。
- 信号量:控制瞬间的最大并发数,保护客户端和服务器。
- 随机延迟:在获取令牌后、发送请求前,插入一个小的、随机的延迟,打破机器行为的规律性,使其更接近人类操作。
代码实现:
import asyncio
import aiohttp
import random
from typing import Optionalasync def fetch_gracefully(session: aiohttp.ClientSession, url: str, bucket: TokenBucket, semaphore: asyncio.Semaphore):"""优雅的请求函数:结合令牌桶、信号量和随机延迟"""await bucket.acquire() # 等待令牌,控制平均速率# 插入一个0.1秒到0.5秒之间的随机延迟,增加人性化await asyncio.sleep(random.uniform(0.1, 0.5))async with semaphore: # 获取信号量,控制最大并发try:async with session.get(url) as response:print(f"Status: {response.status}, URL: {url} at {asyncio.get_event_loop().time():.2f}")# 处理响应...if response.status != 200:# 遇到错误状态码,可以考虑重试策略print(f"Error: Received status {response.status}")return await response.text()except aiohttp.ClientConnectorError as e:print(f"Connection error for {url}: {e}")except asyncio.TimeoutError:print(f"Timeout for {url}")except Exception as e:print(f"Unexpected error for {url}: {e}")async def main_graceful():"""最终版的优雅主函数"""# 配置参数REQUEST_RATE = 10 # 平均每秒10个请求BUCKET_CAPACITY = 5 # 令牌桶容量,允许小范围突发MAX_CONCURRENT = 3 # 最大并发连接数bucket = TokenBucket(REQUEST_RATE, BUCKET_CAPACITY)semaphore = asyncio.Semaphore(MAX_CONCURRENT)urls = [f"https://httpbin.org/get?id={i}" for i in range(50)]async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10) # 为每个请求设置超时) as session:tasks = [fetch_gracefully(session, url, bucket, semaphore) for url in urls]results = await asyncio.gather(*tasks, return_exceptions=True)# 处理results...# 运行这个最完善的版本
if __name__ == "__main__":asyncio.run(main_graceful())
四、总结与最佳实践
通过上述三种方案,我们看到了在Python异步爬虫中实现速率限制的演进路径:
- 从****
**<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">async.sleep</font>**
****到算法:速率限制的核心从简单的等待升级为精密的算法控制。 - 从单一维度到多维度控制:优秀的限制策略应同时考虑平均速率(令牌桶)、瞬时并发(信号量) 和行为模式(随机延迟)。
- 从功能实现到生产稳健性:完整的代码还需要考虑错误处理、重试机制、超时设置等,才能构成一个真正健壮的生产级爬虫。
最佳实践建议:
- 动态调整速率:根据服务器的响应状态码(如429 Too Many Requests)动态调慢速率。
- 使用代理池:对于大规模抓取,结合代理池轮换IP,将速率限制的压力分散到多个IP上。
- 监控与日志:记录请求的成功率、延迟等指标,便于监控和调试。
- 遵守
**<font style="color:rgb(15, 17, 21);background-color:rgb(235, 238, 242);">robots.txt</font>**
:始终优先读取并遵守网站的爬虫协议。例如:https://www.16yun.cn/
高并发爬虫的“优雅”,本质上是效率与尊重、性能与稳定之间的精妙平衡。掌握这些速率限制技术,不仅能让你更高效地获取数据,更能让你成为一个负责任、受信任的网络数据采集者。