Python爬虫第7课:多线程与异步爬虫技术
目录
- 课程目标
- 1. 并发编程基础
- 1.1 为什么需要并发爬虫?
- 1.2 并发方式对比
- 2. 多线程爬虫
- 2.1 基础多线程爬虫
- 2.2 使用ThreadPoolExecutor
- 3. 异步爬虫
- 3.1 基础异步爬虫
- 3.2 高级异步爬虫
- 4. 并发控制和资源管理
- 4.1 速率限制
- 4.2 连接池管理
- 4.3 内存管理
- 5. 实战案例:新闻网站并发爬虫
- 6. 性能优化技巧
- 6.1 选择合适的并发数
- 6.2 连接复用优化
- 7. 实践练习
- 练习1:多线程图片下载器
- 练习2:异步API数据采集
- 练习3:并发爬虫监控
- 8. 课程小结
- 9. 下节预告
- 10. 作业
专栏导读
🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手
🏳️🌈 个人博客主页:请点击——> 个人的博客主页 求收藏
🏳️🌈 Github主页:请点击——> Github主页 求Star⭐
🏳️🌈 知乎主页:请点击——> 知乎主页 求关注
🏳️🌈 CSDN博客主页:请点击——> CSDN的博客主页 求关注
👍 该系列文章专栏:请点击——>Python办公自动化专栏 求订阅
🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏 求订阅
📕 此外还有python基础专栏:请点击——>Python基础学习专栏 求订阅
文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
❤️ 欢迎各位佬关注! ❤️
课程目标
- 理解并发编程的基本概念
- 掌握多线程爬虫的实现方法
- 学会使用异步编程提高爬虫效率
- 了解并发控制和资源管理
1. 并发编程基础
1.1 为什么需要并发爬虫?
- 网络I/O是爬虫的主要瓶颈
- 单线程爬虫大部分时间在等待网络响应
- 并发可以显著提高爬虫效率
- 充分利用系统资源
1.2 并发方式对比
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp# 测试URL列表
urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/1','https://httpbin.org/delay/1','https://httpbin.org/delay/1','https://httpbin.org/delay/1'
]def fetch_sync(url):"""同步请求"""response = requests.get(url)return response.status_codedef test_sync():"""测试同步爬虫"""start_time = time.time()results = []for url in urls:result = fetch_sync(url)results.append(result)end_time = time.time()print(f"同步爬虫耗时:{end_time - start_time:.2f}秒")return resultsdef test_threading():"""测试多线程爬虫"""start_time = time.time()with ThreadPoolExecutor(max_workers=5) as executor:results = list(executor.map(fetch_sync, urls))end_time = time.time()print(f"多线程爬虫耗时:{end_time - start_time:.2f}秒")return resultsasync def fetch_async(session, url):"""异步请求"""async with session.get(url) as response:return response.statusasync def test_async():"""测试异步爬虫"""start_time = time.time()async with aiohttp.ClientSession() as session:tasks = [fetch_async(session, url) for url in urls]results = await asyncio.gather(*tasks)end_time = time.time()print(f"异步爬虫耗时:{end_time - start_time:.2f}秒")return results# 运行测试
if __name__ == "__main__":test_sync() # 约5秒test_threading() # 约1秒asyncio.run(test_async()) # 约1秒
2. 多线程爬虫
2.1 基础多线程爬虫
import threading
import requests
from queue import Queue
import timeclass ThreadSpider:def __init__(self, max_workers=5):self.max_workers = max_workersself.url_queue = Queue()self.result_queue = Queue()self.session = requests.Session()# 设置请求头self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})def worker(self):"""工作线程函数"""while True:url = self.url_queue.get()if url is None:breaktry:response = self.session.get(url, timeout=10)result = {'url': url,'status_code': response.status_code,'content': response.text,'headers': dict(response.headers)}self.result_queue.put(result)print(f"✓ 成功爬取:{url}")except Exception as e:error_result = {'url': url,'error': str(e)}self.result_queue.put(error_result)print(f"✗ 爬取失败:{url} - {e}")finally:self.url_queue.task_done()def start_workers(self):"""启动工作线程"""self.threads = []for i in range(self.max_workers):thread = threading.Thread(target=self.worker)thread.daemon = Truethread.start()self.threads.append(thread)def add_urls(self, urls):"""添加URL到队列"""for url in urls:self.url_queue.put(url)def crawl(self, urls):"""开始爬取"""print(f"开始爬取 {len(urls)} 个URL,使用 {self.max_workers} 个线程")# 启动工作线程self.start_workers()# 添加URLself.add_urls(urls)# 等待所有任务完成self.url_queue.join()# 停止工作线程for _ in range(self.max_workers):self.url_queue.put(None)for thread in self.threads:thread.join()# 收集结果results = []while not self.result_queue.empty():results.append(self.result_queue.get())return results# 使用示例
urls = ['https://httpbin.org/get','https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/ip','https://httpbin.org/uuid'
]spider = ThreadSpider(max_workers=3)
results = spider.crawl(urls)for result in results:if 'error' in result:print(f"错误:{result['url']} - {result['error']}")else:print(f"成功:{result['url']} - 状态码:{result['status_code']}")
2.2 使用ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import timeclass AdvancedThreadSpider:def __init__(self, max_workers=5, timeout=10):self.max_workers = max_workersself.timeout = timeoutself.session = requests.Session()# 配置sessionself.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})# 设置连接池adapter = requests.adapters.HTTPAdapter(pool_connections=max_workers,pool_maxsize=max_workers * 2,max_retries=3)self.session.mount('http://', adapter)self.session.mount('https://', adapter)def fetch_url(self, url):"""获取单个URL"""try:start_time = time.time()response = self.session.get(url, timeout=self.timeout)end_time = time.time()return {'url': url,'status_code': response.status_code,'content_length': len(response.content),'response_time': end_time - start_time,'content': response.text,'success': True}except Exception as e:return {'url': url,'error': str(e),'success': False}def crawl_urls(self, urls, callback=None):"""爬取URL列表"""results = []with ThreadPoolExecutor(max_workers=self.max_workers) as executor:# 提交所有任务future_to_url = {executor.submit(self.fetch_url, url): url for url in urls}# 处理完成的任务for future in as_completed(future_to_url):url = future_to_url[future]try:result = future.result()results.append(result)# 调用回调函数if callback:callback(result)except Exception as e:error_result = {'url': url,'error': str(e),'success': False}results.append(error_result)if callback:callback(error_result)return resultsdef crawl_with_progress(self, urls):"""带进度显示的爬取"""total = len(urls)completed = 0def progress_callback(result):nonlocal completedcompleted += 1if result['success']:print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")else:print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")return self.crawl_urls(urls, callback=progress_callback)# 使用示例
urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json'
] * 3 # 重复3次,共15个URLspider = AdvancedThreadSpider(max_workers=5)
start_time = time.time()
results = spider.crawl_with_progress(urls)
end_time = time.time()print(f"\n总耗时:{end_time - start_time:.2f}秒")
print(f"成功:{sum(1 for r in results if r['success'])} 个")
print(f"失败:{sum(1 for r in results if not r['success'])} 个")
3. 异步爬虫
3.1 基础异步爬虫
import asyncio
import aiohttp
import time
from aiohttp import ClientTimeout, ClientSessionclass AsyncSpider:def __init__(self, max_concurrent=10, timeout=10):self.max_concurrent = max_concurrentself.timeout = ClientTimeout(total=timeout)self.semaphore = asyncio.Semaphore(max_concurrent)# 连接器配置self.connector = aiohttp.TCPConnector(limit=100, # 总连接池大小limit_per_host=max_concurrent, # 每个主机的连接数ttl_dns_cache=300, # DNS缓存时间use_dns_cache=True,)async def fetch_url(self, session, url):"""异步获取单个URL"""async with self.semaphore: # 限制并发数try:start_time = time.time()async with session.get(url) as response:content = await response.text()end_time = time.time()return {'url': url,'status_code': response.status,'content_length': len(content),'response_time': end_time - start_time,'content': content,'success': True}except Exception as e:return {'url': url,'error': str(e),'success': False}async def crawl_urls(self, urls):"""异步爬取URL列表"""async with ClientSession(timeout=self.timeout,connector=self.connector,headers={'User-Agent': 'AsyncSpider/1.0'}) as session:# 创建所有任务tasks = [self.fetch_url(session, url) for url in urls]# 等待所有任务完成results = await asyncio.gather(*tasks, return_exceptions=True)# 处理异常结果processed_results = []for i, result in enumerate(results):if isinstance(result, Exception):processed_results.append({'url': urls[i],'error': str(result),'success': False})else:processed_results.append(result)return processed_resultsasync def crawl_with_progress(self, urls):"""带进度显示的异步爬取"""total = len(urls)completed = 0results = []async def fetch_with_progress(session, url):nonlocal completedresult = await self.fetch_url(session, url)completed += 1if result['success']:print(f"[{completed}/{total}] ✓ {result['url']} - {result['response_time']:.2f}s")else:print(f"[{completed}/{total}] ✗ {result['url']} - {result['error']}")return resultasync with ClientSession(timeout=self.timeout,connector=self.connector,headers={'User-Agent': 'AsyncSpider/1.0'}) as session:tasks = [fetch_with_progress(session, url) for url in urls]results = await asyncio.gather(*tasks)return results# 使用示例
async def main():urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/json','https://httpbin.org/user-agent','https://httpbin.org/headers'] * 4 # 重复4次,共20个URLspider = AsyncSpider(max_concurrent=5)start_time = time.time()results = await spider.crawl_with_progress(urls)end_time = time.time()print(f"\n总耗时:{end_time - start_time:.2f}秒")print(f"成功:{sum(1 for r in results if r['success'])} 个")print(f"失败:{sum(1 for r in results if not r['success'])} 个")# 运行异步爬虫
asyncio.run(main())
3.2 高级异步爬虫
import asyncio
import aiohttp
import aiofiles
import json
from urllib.parse import urljoin, urlparse
import time
from dataclasses import dataclass
from typing import List, Optional, Callable@dataclass
class CrawlResult:url: strstatus_code: Optional[int] = Nonecontent: Optional[str] = Noneerror: Optional[str] = Noneresponse_time: Optional[float] = Nonesuccess: bool = Falseclass AdvancedAsyncSpider:def __init__(self, max_concurrent=20,timeout=30,retry_times=3,retry_delay=1):self.max_concurrent = max_concurrentself.timeout = aiohttp.ClientTimeout(total=timeout)self.retry_times = retry_timesself.retry_delay = retry_delayself.semaphore = asyncio.Semaphore(max_concurrent)# 统计信息self.stats = {'total': 0,'success': 0,'failed': 0,'start_time': None,'end_time': None}async def fetch_with_retry(self, session, url):"""带重试的异步请求"""for attempt in range(self.retry_times + 1):try:start_time = time.time()async with session.get(url) as response:content = await response.text()end_time = time.time()return CrawlResult(url=url,status_code=response.status,content=content,response_time=end_time - start_time,success=True)except Exception as e:if attempt < self.retry_times:await asyncio.sleep(self.retry_delay * (attempt + 1))continueelse:return CrawlResult(url=url,error=str(e),success=False)async def process_url(self, session, url, processor=None):"""处理单个URL"""async with self.semaphore:result = await self.fetch_with_retry(session, url)# 更新统计if result.success:self.stats['success'] += 1else:self.stats['failed'] += 1# 自定义处理器if processor and result.success:try:processed_data = await processor(result)result.processed_data = processed_dataexcept Exception as e:result.error = f"处理器错误:{e}"result.success = Falsereturn resultasync def crawl_batch(self, urls, processor=None, progress_callback=None):"""批量爬取"""self.stats['total'] = len(urls)self.stats['start_time'] = time.time()connector = aiohttp.TCPConnector(limit=100,limit_per_host=self.max_concurrent,ttl_dns_cache=300,use_dns_cache=True)async with aiohttp.ClientSession(timeout=self.timeout,connector=connector,headers={'User-Agent': 'AdvancedAsyncSpider/1.0','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'}) as session:# 创建任务tasks = []for url in urls:task = self.process_url(session, url, processor)tasks.append(task)# 执行任务并收集结果results = []for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)# 进度回调if progress_callback:await progress_callback(result, len(results), len(urls))self.stats['end_time'] = time.time()return resultsasync def save_results(self, results, filename='results.json'):"""保存结果到文件"""data = []for result in results:item = {'url': result.url,'status_code': result.status_code,'success': result.success,'response_time': result.response_time,'error': result.error}if hasattr(result, 'processed_data'):item['processed_data'] = result.processed_datadata.append(item)async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))print(f"结果已保存到 {filename}")def print_stats(self):"""打印统计信息"""if self.stats['start_time'] and self.stats['end_time']:duration = self.stats['end_time'] - self.stats['start_time']rate = self.stats['total'] / duration if duration > 0 else 0print(f"\n=== 爬取统计 ===")print(f"总数:{self.stats['total']}")print(f"成功:{self.stats['success']}")print(f"失败:{self.stats['failed']}")print(f"耗时:{duration:.2f}秒")print(f"速率:{rate:.2f} URL/秒")# 自定义处理器示例
async def json_processor(result: CrawlResult):"""JSON数据处理器"""try:data = json.loads(result.content)return {'type': 'json','keys': list(data.keys()) if isinstance(data, dict) else None,'length': len(data) if isinstance(data, (list, dict)) else None}except:return {'type': 'text', 'length': len(result.content)}async def progress_callback(result, completed, total):"""进度回调函数"""percentage = (completed / total) * 100status = "✓" if result.success else "✗"print(f"[{completed}/{total}] {status} {result.url} ({percentage:.1f}%)")# 使用示例
async def main():urls = ['https://httpbin.org/json','https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/ip','https://httpbin.org/uuid'] * 10 # 50个URLspider = AdvancedAsyncSpider(max_concurrent=10)results = await spider.crawl_batch(urls, processor=json_processor,progress_callback=progress_callback)spider.print_stats()await spider.save_results(results)# 运行
asyncio.run(main())
4. 并发控制和资源管理
4.1 速率限制
import asyncio
import time
from collections import dequeclass RateLimiter:def __init__(self, max_calls, time_window):self.max_calls = max_callsself.time_window = time_windowself.calls = deque()self.lock = asyncio.Lock()async def acquire(self):"""获取访问权限"""async with self.lock:now = time.time()# 清理过期的调用记录while self.calls and self.calls[0] <= now - self.time_window:self.calls.popleft()# 检查是否超过限制if len(self.calls) >= self.max_calls:sleep_time = self.time_window - (now - self.calls[0])if sleep_time > 0:await asyncio.sleep(sleep_time)return await self.acquire()# 记录本次调用self.calls.append(now)class RateLimitedSpider:def __init__(self, max_concurrent=10, rate_limit=None):self.max_concurrent = max_concurrentself.semaphore = asyncio.Semaphore(max_concurrent)self.rate_limiter = rate_limitasync def fetch_url(self, session, url):"""带速率限制的请求"""async with self.semaphore:# 应用速率限制if self.rate_limiter:await self.rate_limiter.acquire()try:async with session.get(url) as response:return await response.text()except Exception as e:print(f"请求失败:{url} - {e}")return None# 使用示例:每秒最多5个请求
rate_limiter = RateLimiter(max_calls=5, time_window=1.0)
spider = RateLimitedSpider(max_concurrent=10, rate_limit=rate_limiter)
4.2 连接池管理
import aiohttp
import asyncioclass ConnectionPoolManager:def __init__(self, total_connections=100,connections_per_host=10,keepalive_timeout=30):self.connector = aiohttp.TCPConnector(limit=total_connections,limit_per_host=connections_per_host,keepalive_timeout=keepalive_timeout,enable_cleanup_closed=True,ttl_dns_cache=300,use_dns_cache=True)self.session = Noneasync def __aenter__(self):self.session = aiohttp.ClientSession(connector=self.connector,timeout=aiohttp.ClientTimeout(total=30),headers={'User-Agent': 'ConnectionPoolSpider/1.0'})return self.sessionasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()await self.connector.close()# 使用示例
async def main():urls = ['https://httpbin.org/get'] * 100async with ConnectionPoolManager() as session:tasks = []for url in urls:task = session.get(url)tasks.append(task)responses = await asyncio.gather(*tasks)for response in responses:print(f"状态码:{response.status}")response.close()asyncio.run(main())
4.3 内存管理
import asyncio
import aiohttp
import gc
import psutil
import osclass MemoryManagedSpider:def __init__(self, max_memory_mb=500):self.max_memory_mb = max_memory_mbself.process = psutil.Process(os.getpid())def get_memory_usage(self):"""获取当前内存使用量(MB)"""return self.process.memory_info().rss / 1024 / 1024async def check_memory(self):"""检查内存使用量"""memory_usage = self.get_memory_usage()if memory_usage > self.max_memory_mb:print(f"内存使用量过高:{memory_usage:.1f}MB,执行垃圾回收")gc.collect()await asyncio.sleep(0.1) # 让垃圾回收完成new_usage = self.get_memory_usage()print(f"垃圾回收后内存使用量:{new_usage:.1f}MB")async def fetch_with_memory_check(self, session, url):"""带内存检查的请求"""await self.check_memory()try:async with session.get(url) as response:content = await response.text()# 处理完立即释放大对象result = len(content)del contentreturn resultexcept Exception as e:return None# 使用示例
async def main():spider = MemoryManagedSpider(max_memory_mb=200)async with aiohttp.ClientSession() as session:tasks = []for i in range(1000):url = f'https://httpbin.org/bytes/{1024 * 10}' # 10KB数据task = spider.fetch_with_memory_check(session, url)tasks.append(task)results = await asyncio.gather(*tasks)print(f"处理了 {len([r for r in results if r])} 个请求")asyncio.run(main())
5. 实战案例:新闻网站并发爬虫
import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import json
import time
from dataclasses import dataclass, asdict
from typing import List, Optional@dataclass
class NewsArticle:title: strurl: strcontent: strpublish_time: Optional[str] = Noneauthor: Optional[str] = Nonecategory: Optional[str] = Noneclass NewsSpider:def __init__(self, max_concurrent=20, delay=1):self.max_concurrent = max_concurrentself.delay = delayself.semaphore = asyncio.Semaphore(max_concurrent)self.articles = []# 统计信息self.stats = {'pages_crawled': 0,'articles_found': 0,'errors': 0}async def fetch_page(self, session, url):"""获取页面内容"""async with self.semaphore:try:await asyncio.sleep(self.delay) # 延迟请求async with session.get(url) as response:if response.status == 200:content = await response.text()self.stats['pages_crawled'] += 1return contentelse:print(f"HTTP错误 {response.status}: {url}")self.stats['errors'] += 1return Noneexcept Exception as e:print(f"请求失败:{url} - {e}")self.stats['errors'] += 1return Nonedef parse_article_list(self, html, base_url):"""解析文章列表页面"""soup = BeautifulSoup(html, 'html.parser')article_links = []# 根据实际网站结构调整选择器for link in soup.select('a[href*="/article/"]'):href = link.get('href')if href:full_url = urljoin(base_url, href)article_links.append(full_url)return article_linksdef parse_article_detail(self, html, url):"""解析文章详情页面"""soup = BeautifulSoup(html, 'html.parser')try:# 根据实际网站结构调整选择器title = soup.select_one('h1.article-title')title = title.get_text(strip=True) if title else "无标题"content_elem = soup.select_one('.article-content')content = content_elem.get_text(strip=True) if content_elem else ""author_elem = soup.select_one('.article-author')author = author_elem.get_text(strip=True) if author_elem else Nonetime_elem = soup.select_one('.publish-time')publish_time = time_elem.get_text(strip=True) if time_elem else Nonecategory_elem = soup.select_one('.article-category')category = category_elem.get_text(strip=True) if category_elem else Nonereturn NewsArticle(title=title,url=url,content=content,author=author,publish_time=publish_time,category=category)except Exception as e:print(f"解析文章失败:{url} - {e}")return Noneasync def crawl_article_list(self, session, list_url):"""爬取文章列表页面"""html = await self.fetch_page(session, list_url)if html:return self.parse_article_list(html, list_url)return []async def crawl_article_detail(self, session, article_url):"""爬取文章详情页面"""html = await self.fetch_page(session, article_url)if html:article = self.parse_article_detail(html, article_url)if article:self.articles.append(article)self.stats['articles_found'] += 1print(f"✓ 爬取文章:{article.title}")return articlereturn Noneasync def crawl_news_site(self, base_url, max_pages=5):"""爬取新闻网站"""print(f"开始爬取新闻网站:{base_url}")connector = aiohttp.TCPConnector(limit=100,limit_per_host=self.max_concurrent)async with aiohttp.ClientSession(connector=connector,timeout=aiohttp.ClientTimeout(total=30),headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}) as session:# 第一步:获取文章列表article_urls = set()list_tasks = []for page in range(1, max_pages + 1):list_url = f"{base_url}/news/page/{page}"task = self.crawl_article_list(session, list_url)list_tasks.append(task)# 收集所有文章链接list_results = await asyncio.gather(*list_tasks)for urls in list_results:article_urls.update(urls)print(f"找到 {len(article_urls)} 篇文章")# 第二步:爬取文章详情detail_tasks = []for url in article_urls:task = self.crawl_article_detail(session, url)detail_tasks.append(task)# 批量处理,避免内存过大batch_size = 50for i in range(0, len(detail_tasks), batch_size):batch = detail_tasks[i:i + batch_size]await asyncio.gather(*batch)print(f"已处理 {min(i + batch_size, len(detail_tasks))}/{len(detail_tasks)} 篇文章")async def save_articles(self, filename='news_articles.json'):"""保存文章到文件"""data = [asdict(article) for article in self.articles]async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))print(f"已保存 {len(self.articles)} 篇文章到 {filename}")def print_stats(self):"""打印统计信息"""print(f"\n=== 爬取统计 ===")print(f"页面爬取:{self.stats['pages_crawled']}")print(f"文章获取:{self.stats['articles_found']}")print(f"错误次数:{self.stats['errors']}")# 使用示例
async def main():spider = NewsSpider(max_concurrent=10, delay=0.5)start_time = time.time()# 爬取新闻网站(示例URL,实际使用时需要替换)await spider.crawl_news_site('https://example-news.com', max_pages=3)end_time = time.time()# 保存结果await spider.save_articles()# 打印统计spider.print_stats()print(f"总耗时:{end_time - start_time:.2f}秒")# 运行爬虫
asyncio.run(main())
6. 性能优化技巧
6.1 选择合适的并发数
import asyncio
import aiohttp
import timeasync def benchmark_concurrency():"""测试不同并发数的性能"""url = 'https://httpbin.org/delay/0.5'total_requests = 100concurrency_levels = [1, 5, 10, 20, 50, 100]for concurrency in concurrency_levels:semaphore = asyncio.Semaphore(concurrency)async def fetch(session):async with semaphore:async with session.get(url) as response:return response.statusstart_time = time.time()async with aiohttp.ClientSession() as session:tasks = [fetch(session) for _ in range(total_requests)]results = await asyncio.gather(*tasks)end_time = time.time()duration = end_time - start_timerate = total_requests / durationprint(f"并发数 {concurrency:3d}: {duration:6.2f}秒, {rate:6.1f} req/s")asyncio.run(benchmark_concurrency())
6.2 连接复用优化
import aiohttp
import asyncioclass OptimizedSpider:def __init__(self):# 优化连接器配置self.connector = aiohttp.TCPConnector(limit=100, # 总连接数limit_per_host=20, # 每个主机连接数keepalive_timeout=60, # 保持连接时间enable_cleanup_closed=True, # 清理关闭的连接ttl_dns_cache=300, # DNS缓存时间use_dns_cache=True, # 启用DNS缓存resolver=aiohttp.AsyncResolver(), # 异步DNS解析)# 优化超时设置self.timeout = aiohttp.ClientTimeout(total=30, # 总超时connect=10, # 连接超时sock_read=10 # 读取超时)async def create_session(self):return aiohttp.ClientSession(connector=self.connector,timeout=self.timeout,headers={'User-Agent': 'OptimizedSpider/1.0','Accept-Encoding': 'gzip, deflate', # 启用压缩'Connection': 'keep-alive' # 保持连接})async def close(self):await self.connector.close()
7. 实践练习
练习1:多线程图片下载器
编写一个多线程图片下载器,支持批量下载图片。
练习2:异步API数据采集
使用异步方式采集多个API的数据并合并结果。
练习3:并发爬虫监控
为并发爬虫添加实时监控功能,显示爬取进度和性能指标。
8. 课程小结
本课程我们学习了:
- 并发编程的基本概念和优势
- 多线程爬虫的实现方法
- 异步爬虫的高级技术
- 并发控制和资源管理
- 性能优化的实用技巧
9. 下节预告
下一课我们将学习:
- 代理池的构建和使用
- IP轮换和反反爬虫技术
- 用户代理伪装
- 请求头优化策略
10. 作业
- 实现一个支持断点续传的多线程下载器
- 编写异步爬虫爬取电商网站商品信息
- 对比不同并发方式的性能差异
- 实现一个带监控的分布式爬虫系统
提示:合理的并发数通常在10-50之间,过高的并发可能导致被网站封禁或系统资源耗尽。
结尾
希望对初学者有帮助;致力于办公自动化的小小程序员一枚
希望能得到大家的【❤️一个免费关注❤️】感谢!
求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏