【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。
代码
Python多线程爬虫教程
核心概念
多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率。
线程池:预先创建固定数量的线程,避免频繁创建销毁线程的开销。
1. 基础单线程版本
import time
import random
from urllib.parse import urljoindef simulate_fetch(url):"""模拟网络请求,随机延迟0.5-2秒"""delay = random.uniform(0.5, 2.0)time.sleep(delay)return f"Data from {url} (took {delay:.2f}s)"def single_thread_crawler(urls):"""单线程爬虫"""results = []start_time = time.time()for url in urls:result = simulate_fetch(url)results.append(result)print(f"✓ {result}")print(f"单线程总耗时: {time.time() - start_time:.2f}秒")return results
2. 多线程版本(Thread类)
import threadingclass CrawlerThread(threading.Thread):def __init__(self, url, results, lock):super().__init__()self.url = urlself.results = resultsself.lock = lock # 线程锁,保护共享资源def run(self):result = simulate_fetch(self.url)# 使用锁保护共享数据with self.lock:self.results.append(result)print(f"✓ {result}")def multi_thread_crawler_basic(urls):"""基础多线程爬虫"""results = []lock = threading.Lock()threads = []start_time = time.time()# 创建并启动线程for url in urls:thread = CrawlerThread(url, results, lock)threads.append(thread)thread.start()# 等待所有线程完成for thread in threads:thread.join()print(f"多线程总耗时: {time.time() - start_time:.2f}秒")return results
3. 线程池版本(推荐)
from concurrent.futures import ThreadPoolExecutor, as_completeddef multi_thread_crawler_pool(urls, max_workers=5):"""使用线程池的多线程爬虫"""results = []start_time = time.time()with ThreadPoolExecutor(max_workers=max_workers) as executor:# 提交所有任务future_to_url = {executor.submit(simulate_fetch, url): url for url in urls}# 获取结果for future in as_completed(future_to_url):url = future_to_url[future]try:result = future.result()results.append(result)print(f"✓ {result}")except Exception as e:print(f"✗ {url} failed: {e}")print(f"线程池总耗时: {time.time() - start_time:.2f}秒")return results
4. 异步版本(asyncio)
import asyncio
import aiohttp
from aiohttp import ClientSessionasync def async_fetch(session, url):"""异步模拟网络请求"""delay = random.uniform(0.5, 2.0)await asyncio.sleep(delay) # 异步等待return f"Data from {url} (took {delay:.2f}s)"async def async_crawler(urls, max_concurrent=5):"""异步爬虫"""results = []start_time = time.time()# 创建信号量限制并发数semaphore = asyncio.Semaphore(max_concurrent)async def fetch_with_semaphore(session, url):async with semaphore:return await async_fetch(session, url)# 模拟session(实际使用中用aiohttp.ClientSession)session = None# 创建所有任务tasks = [fetch_with_semaphore(session, url) for url in urls]# 并发执行所有任务for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)print(f"✓ {result}")print(f"异步总耗时: {time.time() - start_time:.2f}秒")return resultsdef run_async_crawler(urls):"""运行异步爬虫"""return asyncio.run(async_crawler(urls))
5. 完整测试代码
def main():# 测试数据:20个URLurls = [f"https://example.com/page{i}" for i in range(1, 21)]print("=" * 50)print("单线程爬虫测试")print("=" * 50)single_thread_crawler(urls[:5]) # 测试5个URL避免等待太久print("\n" + "=" * 50)print("多线程爬虫测试(Thread类)")print("=" * 50)multi_thread_crawler_basic(urls[:10])print("\n" + "=" * 50)print("线程池爬虫测试(推荐)")print("=" * 50)multi_thread_crawler_pool(urls, max_workers=5)print("\n" + "=" * 50)print("异步爬虫测试")print("=" * 50)run_async_crawler(urls)if __name__ == "__main__":main()
关键知识点
线程锁(Lock)
lock = threading.Lock()
with lock: # 自动获取和释放锁# 临界区代码shared_data.append(result)
线程池优势
- 自动管理线程生命周期
- 限制并发数量,避免资源耗尽
- 异常处理更完善
- 代码更简洁
异步 vs 多线程
- 异步:单线程,通过事件循环处理IO等待
- 多线程:多个线程并行执行
- 选择:IO密集型优先考虑异步,CPU密集型考虑多线程
性能对比
在20个URL的测试中:
- 单线程:约20-40秒
- 多线程:约4-8秒
- 异步:约4-8秒
实际应用建议
- 简单场景:使用
ThreadPoolExecutor
- 大规模爬虫:使用
asyncio + aiohttp
- 混合任务:结合多线程和异步
- 注意事项:
- 控制并发数,避免被网站封禁
- 添加重试机制和异常处理
- 遵守robots.txt和网站使用条款