从零开发一个简单的Web爬虫(使用Requests和BeautifulSoup)
目录
- 从零开发一个简单的Web爬虫(使用Requests和BeautifulSoup)
- 1. 引言:Web爬虫的重要性和应用场景
- 1.1 什么是Web爬虫?
- 1.2 Web爬虫的应用领域
- 1.3 为什么选择Python开发爬虫?
- 2. 环境准备和基础概念
- 2.1 必要的Python库安装
- 2.2 HTTP协议基础
- 3. 基础爬虫架构设计
- 3.1 爬虫系统架构
- 3.2 核心类设计
- 4. 完整爬虫实现
- 4.1 基础爬虫实现
- 4.2 高级爬虫功能
- 5. 爬虫伦理和最佳实践
- 5.1 遵守robots.txt
- 5.2 性能优化和数学原理
- 6. 实际应用案例
- 6.1 新闻网站爬虫
- 7. 总结
- 7.1 爬虫开发的关键要点
- ✅ 核心功能
- ✅ 高级特性
- ✅ 实际应用
- 7.2 数学原理回顾
- 7.3 最佳实践建议
- 7.4 扩展学习方向
『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网
从零开发一个简单的Web爬虫(使用Requests和BeautifulSoup)
1. 引言:Web爬虫的重要性和应用场景
1.1 什么是Web爬虫?
Web爬虫(Web Crawler),也称为网络蜘蛛(Spider),是一种自动浏览互联网并收集信息的程序。它通过系统地访问网页、提取数据并跟踪链接,实现对网络资源的自动化采集和处理。
根据统计,互联网上超过50%的网络流量来自各种爬虫程序,它们在搜索引擎、数据挖掘、市场研究等领域发挥着至关重要的作用。
1.2 Web爬虫的应用领域
# 爬虫应用场景示例
applications = {"搜索引擎": "Google、百度等搜索引擎的核心技术","价格监控": "电商平台价格比较和趋势分析","舆情分析": "社交媒体和新闻网站的情感分析","学术研究": "科学文献和数据收集","竞争情报": "监控竞争对手的动态","内容聚合": "新闻和博客内容聚合"
}
1.3 为什么选择Python开发爬虫?
Python在Web爬虫开发中具有显著优势:
- 丰富的库生态系统:Requests、BeautifulSoup、Scrapy等
- 简洁的语法:快速原型开发和代码维护
- 强大的数据处理能力:Pandas、NumPy等数据处理库
- 良好的异步支持:aiohttp、asyncio提高爬取效率
- 活跃的社区:丰富的学习资源和解决方案
2. 环境准备和基础概念
2.1 必要的Python库安装
#!/usr/bin/env python3
"""
Web爬虫开发环境配置和依赖检查
"""import sys
import subprocess
import importlib
from typing import Dict, List, Tupleclass EnvironmentSetup:"""环境配置和依赖管理类"""def __init__(self):self.required_packages = {'requests': 'requests','beautifulsoup4': 'bs4','lxml': 'lxml','pandas': 'pandas','fake-useragent': 'fake_useragent','urllib3': 'urllib3','selenium': 'selenium'}self.optional_packages = {'aiohttp': 'aiohttp','asyncio': 'asyncio','scrapy': 'scrapy','pyquery': 'pyquery'}def check_python_version(self) -> Tuple[bool, str]:"""检查Python版本Returns:Tuple[bool, str]: (是否满足要求, 版本信息)"""version = sys.version_infoversion_str = f"{version.major}.{version.minor}.{version.micro}"if version.major == 3 and version.minor >= 7:return True, version_strelse:return False, version_strdef check_package_installed(self, package_name: str, import_name: str) -> bool:"""检查包是否已安装Args:package_name: 包名称import_name: 导入名称Returns:bool: 是否已安装"""try:importlib.import_module(import_name)return Trueexcept ImportError:return Falsedef install_package(self, package_name: str) -> bool:"""安装Python包Args:package_name: 包名称Returns:bool: 安装是否成功"""try:subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])return Trueexcept subprocess.CalledProcessError as e:print(f"安装 {package_name} 失败: {e}")return Falsedef check_environment(self) -> Dict[str, any]:"""全面检查开发环境Returns:Dict: 环境检查结果"""print("=" * 60)print("Web爬虫开发环境检查")print("=" * 60)# 检查Python版本py_ok, py_version = self.check_python_version()print(f"Python版本: {py_version} {'✅' if py_ok else '❌'}")if not py_ok:print("需要Python 3.7或更高版本")return {"status": "failed", "reason": "python_version"}# 检查必需包missing_required = []installed_required = []for pkg, import_name in self.required_packages.items():if self.check_package_installed(pkg, import_name):print(f"✅ {pkg} 已安装")installed_required.append(pkg)else:print(f"❌ {pkg} 未安装")missing_required.append(pkg)# 检查可选包missing_optional = []installed_optional = []for pkg, import_name in self.optional_packages.items():if self.check_package_installed(pkg, import_name):print(f"✅ {pkg} 已安装 (可选)")installed_optional.append(pkg)else:print(f"⚠️ {pkg} 未安装 (可选)")missing_optional.append(pkg)return {"status": "success" if not missing_required else "failed","python_version": py_version,"python_ok": py_ok,"required_installed": installed_required,"required_missing": missing_required,"optional_installed": installed_optional,"optional_missing": missing_optional}def setup_environment(self) -> bool:"""自动设置开发环境Returns:bool: 设置是否成功"""result = self.check_environment()if result["status"] == "success":print("\n✅ 环境配置完成!")return Trueprint(f"\n缺少 {len(result['required_missing'])} 个必需包")# 自动安装缺失的包success_count = 0for package in result['required_missing']:print(f"正在安装 {package}...")if self.install_package(package):success_count += 1print(f"✅ {package} 安装成功")else:print(f"❌ {package} 安装失败")# 验证安装结果final_check = self.check_environment()return final_check["status"] == "success"def main():"""主函数"""env_setup = EnvironmentSetup()print("Web爬虫开发环境配置工具")print("此工具将检查并安装必要的依赖包")print()if env_setup.setup_environment():print("\n🎉 环境配置成功! 可以开始开发Web爬虫了。")# 显示下一步建议print("\n下一步建议:")print("1. 学习基本的HTTP协议和HTML知识")print("2. 了解robots.txt和爬虫道德规范")print("3. 开始编写简单的爬虫脚本")else:print("\n❌ 环境配置失败,请手动安装缺失的包")print("可以使用: pip install 包名")if __name__ == "__main__":main()
2.2 HTTP协议基础
在开发爬虫之前,理解HTTP协议是至关重要的:
"""
HTTP协议基础概念演示
"""class HTTPBasics:"""HTTP协议基础概念"""def explain_http_methods(self):"""解释HTTP方法"""methods = {"GET": "请求资源,不应产生副作用","POST": "提交数据,可能改变服务器状态", "HEAD": "只获取响应头信息","PUT": "更新资源","DELETE": "删除资源"}print("HTTP方法说明:")for method, description in methods.items():print(f" {method}: {description}")def common_status_codes(self):"""常见HTTP状态码"""status_codes = {200: "OK - 请求成功",301: "Moved Permanently - 永久重定向",302: "Found - 临时重定向", 404: "Not Found - 资源不存在",403: "Forbidden - 禁止访问",500: "Internal Server Error - 服务器错误",503: "Service Unavailable - 服务不可用"}print("\n常见HTTP状态码:")for code, meaning in status_codes.items():print(f" {code}: {meaning}")def important_headers(self):"""重要的HTTP头部"""headers = {"User-Agent": "客户端标识","Referer": "来源页面", "Cookie": "会话信息","Content-Type": "请求体类型","Accept": "可接受的响应类型","Authorization": "认证信息"}print("\n重要HTTP头部:")for header, purpose in headers.items():print(f" {header}: {purpose}")# 运行演示
if __name__ == "__main__":http_basics = HTTPBasics()http_basics.explain_http_methods()http_basics.common_status_codes() http_basics.important_headers()
3. 基础爬虫架构设计
3.1 爬虫系统架构
3.2 核心类设计
#!/usr/bin/env python3
"""
Web爬虫核心架构设计
"""import time
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Set
from urllib.parse import urljoin, urlparse
from collections import dequeclass URLManager:"""URL管理器 - 负责管理待爬取和已爬取的URL"""def __init__(self):self.to_crawl: deque = deque() # 待爬取URL队列self.crawled: Set[str] = set() # 已爬取URL集合self.failed: Set[str] = set() # 爬取失败的URLdef add_url(self, url: str) -> None:"""添加URL到待爬取队列Args:url: 要添加的URL"""if (url not in self.to_crawl and url not in self.crawled and url not in self.failed):self.to_crawl.append(url)def add_urls(self, urls: List[str]) -> None:"""批量添加URLArgs:urls: URL列表"""for url in urls:self.add_url(url)def get_url(self) -> Optional[str]:"""获取下一个要爬取的URLReturns:str: URL地址,如果没有则返回None"""if self.has_next():url = self.to_crawl.popleft()self.crawled.add(url)return urlreturn Nonedef has_next(self) -> bool:"""检查是否还有待爬取的URL"""return len(self.to_crawl) > 0def mark_failed(self, url: str) -> None:"""标记URL为爬取失败Args:url: 失败的URL"""if url in self.crawled:self.crawled.remove(url)self.failed.add(url)def get_stats(self) -> Dict[str, int]:"""获取统计信息Returns:Dict: 统计信息"""return {'to_crawl': len(self.to_crawl),'crawled': len(self.crawled),'failed': len(self.failed),'total': len(self.to_crawl) + len(self.crawled) + len(self.failed)}class WebDownloader:"""网页下载器 - 负责下载网页内容"""def __init__(self, delay: float = 1.0, timeout: int = 10):"""初始化下载器Args:delay: 请求延迟(秒)timeout: 请求超时时间(秒)"""self.delay = delayself.timeout = timeoutself.last_request_time = 0def download(self, url: str, **kwargs) -> Optional[str]:"""下载网页内容Args:url: 要下载的URL**kwargs: 其他请求参数Returns:str: 网页内容,失败返回None"""import requestsfrom fake_useragent import UserAgent# 遵守爬虫礼仪,添加延迟self._respect_delay()try:# 设置请求头headers = kwargs.get('headers', {})if 'User-Agent' not in headers:ua = UserAgent()headers['User-Agent'] = ua.random# 发送请求response = requests.get(url, timeout=self.timeout,headers=headers,**kwargs)# 检查响应状态response.raise_for_status()# 更新最后请求时间self.last_request_time = time.time()return response.textexcept requests.exceptions.RequestException as e:print(f"下载失败 {url}: {e}")return Nonedef _respect_delay(self) -> None:"""遵守请求延迟"""if self.delay > 0 and self.last_request_time > 0:elapsed = time.time() - self.last_request_timeif elapsed < self.delay:time.sleep(self.delay - elapsed)class HTMLParser:"""HTML解析器 - 负责解析网页内容并提取数据"""def __init__(self):passdef parse(self, html: str, base_url: str = "") -> Dict:"""解析HTML内容Args:html: HTML内容base_url: 基础URL用于解析相对链接Returns:Dict: 解析结果"""from bs4 import BeautifulSoupif not html:return {'links': [], 'data': {}}soup = BeautifulSoup(html, 'lxml')# 提取所有链接links = self._extract_links(soup, base_url)# 提取页面数据data = self._extract_data(soup)return {'links': links,'data': data,'title': self._get_title(soup),'meta_description': self._get_meta_description(soup)}def _extract_links(self, soup, base_url: str) -> List[str]:"""提取页面中的所有链接Args:soup: BeautifulSoup对象base_url: 基础URLReturns:List[str]: 链接列表"""links = []for link in soup.find_all('a', href=True):href = link['href']# 处理相对链接if base_url and not href.startswith(('http://', 'https://')):href = urljoin(base_url, href)# 过滤无效链接if self._is_valid_url(href):links.append(href)return list(set(links)) # 去重def _extract_data(self, soup) -> Dict:"""提取页面数据Args:soup: BeautifulSoup对象Returns:Dict: 提取的数据"""data = {}# 提取所有文本内容texts = soup.stripped_stringsdata['text_content'] = ' '.join(list(texts)[:1000]) # 限制长度# 提取图片data['images'] = [img['src'] for img in soup.find_all('img', src=True)if self._is_valid_url(img['src'])]# 提取元数据data['meta_keywords'] = self._get_meta_keywords(soup)return datadef _get_title(self, soup) -> str:"""获取页面标题"""title_tag = soup.find('title')return title_tag.get_text().strip() if title_tag else ""def _get_meta_description(self, soup) -> str:"""获取meta描述"""meta_desc = soup.find('meta', attrs={'name': 'description'})return meta_desc.get('content', '') if meta_desc else ""def _get_meta_keywords(self, soup) -> str:"""获取meta关键词"""meta_keywords = soup.find('meta', attrs={'name': 'keywords'})return meta_keywords.get('content', '') if meta_keywords else ""def _is_valid_url(self, url: str) -> bool:"""检查URL是否有效Args:url: 要检查的URLReturns:bool: 是否有效"""if not url or url.startswith(('javascript:', 'mailto:', 'tel:')):return Falseparsed = urlparse(url)return bool(parsed.netloc and parsed.scheme in ['http', 'https'])class DataStorage:"""数据存储器 - 负责存储爬取的数据"""def __init__(self, storage_type: str = 'file'):"""初始化存储器Args:storage_type: 存储类型 ('file', 'csv', 'json')"""self.storage_type = storage_typedef save(self, data: Dict, filename: str = None) -> bool:"""保存数据Args:data: 要保存的数据filename: 文件名Returns:bool: 保存是否成功"""try:if self.storage_type == 'file':return self._save_to_file(data, filename)elif self.storage_type == 'csv':return self._save_to_csv(data, filename)elif self.storage_type == 'json':return self._save_to_json(data, filename)else:print(f"不支持的存储类型: {self.storage_type}")return Falseexcept Exception as e:print(f"保存数据失败: {e}")return Falsedef _save_to_file(self, data: Dict, filename: str) -> bool:"""保存到文本文件"""if not filename:filename = f"crawled_data_{int(time.time())}.txt"with open(filename, 'w', encoding='utf-8') as f:for key, value in data.items():f.write(f"=== {key} ===\n")if isinstance(value, (list, tuple)):for item in value:f.write(f"{item}\n")else:f.write(f"{value}\n")f.write("\n")print(f"数据已保存到: {filename}")return Truedef _save_to_csv(self, data: Dict, filename: str) -> bool:"""保存到CSV文件"""import csvif not filename:filename = f"crawled_data_{int(time.time())}.csv"# 这里需要根据具体数据结构实现# 简化实现,只保存主要数据with open(filename, 'w', newline='', encoding='utf-8') as f:writer = csv.writer(f)writer.writerow(['Field', 'Value'])for key, value in data.items():if isinstance(value, (list, tuple)):writer.writerow([key, '; '.join(str(v) for v in value)])else:writer.writerow([key, value])print(f"数据已保存到CSV: {filename}")return Truedef _save_to_json(self, data: Dict, filename: str) -> bool:"""保存到JSON文件"""import jsonif not filename:filename = f"crawled_data_{int(time.time())}.json"with open(filename, 'w', encoding='utf-8') as f:json.dump(data, f, ensure_ascii=False, indent=2)print(f"数据已保存到JSON: {filename}")return True
4. 完整爬虫实现
4.1 基础爬虫实现
#!/usr/bin/env python3
"""
基础Web爬虫完整实现
使用Requests和BeautifulSoup
"""import time
import re
import json
from typing import Dict, List, Optional, Set
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completedimport requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent@dataclass
class CrawlResult:"""爬取结果数据类"""url: strtitle: str = ""content: str = ""links: List[str] = Noneimages: List[str] = Nonestatus_code: int = 0error: str = ""def __post_init__(self):if self.links is None:self.links = []if self.images is None:self.images = []class SimpleWebCrawler:"""简单Web爬虫类"""def __init__(self, delay: float = 1.0,timeout: int = 10,max_pages: int = 100,user_agent: str = None):"""初始化爬虫Args:delay: 请求间隔(秒)timeout: 请求超时(秒)max_pages: 最大爬取页面数user_agent: 用户代理字符串"""self.delay = delayself.timeout = timeoutself.max_pages = max_pagesself.user_agent = user_agent or UserAgent().randomself.visited_urls: Set[str] = set()self.results: List[CrawlResult] = []self.session = requests.Session()# 设置会话头信息self.session.headers.update({'User-Agent': self.user_agent,'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive',})def crawl(self, start_urls: List[str], allowed_domains: List[str] = None) -> List[CrawlResult]:"""开始爬取Args:start_urls: 起始URL列表allowed_domains: 允许的域名列表Returns:List[CrawlResult]: 爬取结果列表"""print(f"开始爬取,起始URL: {start_urls}")print(f"最大页面数: {self.max_pages}")print(f"请求延迟: {self.delay}秒")# 初始化URL队列url_queue = []for url in start_urls:if self._is_allowed_domain(url, allowed_domains):url_queue.append(url)self.visited_urls.add(url)page_count = 0while url_queue and page_count < self.max_pages:current_url = url_queue.pop(0)print(f"爬取 [{page_count + 1}/{self.max_pages}]: {current_url}")# 爬取当前页面result = self._crawl_page(current_url)self.results.append(result)if result.links:# 添加新链接到队列for link in result.links:if (link not in self.visited_urls and self._is_allowed_domain(link, allowed_domains) andlen(url_queue) < (self.max_pages - page_count)):url_queue.append(link)self.visited_urls.add(link)page_count += 1# 遵守爬虫礼仪if self.delay > 0 and page_count < self.max_pages:time.sleep(self.delay)print(f"爬取完成! 共爬取 {len(self.results)} 个页面")return self.resultsdef _crawl_page(self, url: str) -> CrawlResult:"""爬取单个页面Args:url: 要爬取的URLReturns:CrawlResult: 爬取结果"""result = CrawlResult(url=url)try:# 发送HTTP请求response = self.session.get(url, timeout=self.timeout)result.status_code = response.status_code# 检查响应状态if response.status_code != 200:result.error = f"HTTP {response.status_code}"return result# 解析HTML内容soup = BeautifulSoup(response.content, 'lxml')# 提取标题title_tag = soup.find('title')result.title = title_tag.get_text().strip() if title_tag else ""# 提取正文内容(简化版)result.content = self._extract_content(soup)# 提取所有链接result.links = self._extract_links(soup, url)# 提取图片result.images = self._extract_images(soup, url)except requests.exceptions.RequestException as e:result.error = f"请求错误: {e}"except Exception as e:result.error = f"解析错误: {e}"return resultdef _extract_content(self, soup) -> str:"""提取页面主要内容Args:soup: BeautifulSoup对象Returns:str: 提取的文本内容"""# 移除脚本和样式标签for script in soup(["script", "style"]):script.decompose()# 获取文本内容text = soup.get_text()# 清理文本lines = (line.strip() for line in text.splitlines())chunks = (phrase.strip() for line in lines for phrase in line.split(" "))text = ' '.join(chunk for chunk in chunks if chunk)# 限制长度return text[:5000] if len(text) > 5000 else textdef _extract_links(self, soup, base_url: str) -> List[str]:"""提取页面中的所有链接Args:soup: BeautifulSoup对象base_url: 基础URLReturns:List[str]: 链接列表"""links = []for link in soup.find_all('a', href=True):href = link['href']# 处理相对链接full_url = urljoin(base_url, href)# 过滤无效链接if self._is_valid_url(full_url):links.append(full_url)return list(set(links)) # 去重def _extract_images(self, soup, base_url: str) -> List[str]:"""提取页面中的图片Args:soup: BeautifulSoup对象base_url: 基础URLReturns:List[str]: 图片URL列表"""images = []for img in soup.find_all('img', src=True):src = img['src']# 处理相对链接full_url = urljoin(base_url, src)if self._is_valid_url(full_url):images.append(full_url)return list(set(images))def _is_valid_url(self, url: str) -> bool:"""检查URL是否有效Args:url: 要检查的URLReturns:bool: 是否有效"""if not url:return False# 过滤掉非HTTP链接和常见无效模式invalid_patterns = ['javascript:', 'mailto:', 'tel:', '#','callto:', 'fax:', 'sms:']if any(url.startswith(pattern) for pattern in invalid_patterns):return Falseparsed = urlparse(url)return bool(parsed.netloc and parsed.scheme in ['http', 'https'])def _is_allowed_domain(self, url: str, allowed_domains: List[str]) -> bool:"""检查域名是否在允许列表中Args:url: 要检查的URLallowed_domains: 允许的域名列表Returns:bool: 是否允许"""if not allowed_domains:return Truedomain = urlparse(url).netlocreturn any(allowed in domain for allowed in allowed_domains)def save_results(self, filename: str = None, format: str = 'json') -> bool:"""保存爬取结果Args:filename: 文件名format: 保存格式 ('json', 'csv', 'txt')Returns:bool: 保存是否成功"""if not filename:timestamp = int(time.time())filename = f"crawl_results_{timestamp}.{format}"try:if format == 'json':return self._save_as_json(filename)elif format == 'csv':return self._save_as_csv(filename)elif format == 'txt':return self._save_as_text(filename)else:print(f"不支持的格式: {format}")return Falseexcept Exception as e:print(f"保存结果失败: {e}")return Falsedef _save_as_json(self, filename: str) -> bool:"""保存为JSON格式"""data = []for result in self.results:data.append({'url': result.url,'title': result.title,'content_length': len(result.content),'links_count': len(result.links),'images_count': len(result.images),'status_code': result.status_code,'error': result.error,'content_preview': result.content[:200] + '...' if len(result.content) > 200 else result.content})with open(filename, 'w', encoding='utf-8') as f:json.dump(data, f, ensure_ascii=False, indent=2)print(f"结果已保存为JSON: {filename}")return Truedef _save_as_csv(self, filename: str) -> bool:"""保存为CSV格式"""import csvwith open(filename, 'w', newline='', encoding='utf-8') as f:writer = csv.writer(f)writer.writerow(['URL', 'Title', 'Content Length', 'Links Count', 'Images Count', 'Status Code', 'Error'])for result in self.results:writer.writerow([result.url,result.title,len(result.content),len(result.links),len(result.images),result.status_code,result.error])print(f"结果已保存为CSV: {filename}")return Truedef _save_as_text(self, filename: str) -> bool:"""保存为文本格式"""with open(filename, 'w', encoding='utf-8') as f:for i, result in enumerate(self.results, 1):f.write(f"=== 页面 {i} ===\n")f.write(f"URL: {result.url}\n")f.write(f"标题: {result.title}\n")f.write(f"状态码: {result.status_code}\n")f.write(f"错误: {result.error}\n")f.write(f"链接数量: {len(result.links)}\n")f.write(f"图片数量: {len(result.images)}\n")f.write(f"内容预览: {result.content[:300]}...\n")f.write("\n" + "="*50 + "\n\n")print(f"结果已保存为文本: {filename}")return Truedef get_statistics(self) -> Dict:"""获取爬取统计信息Returns:Dict: 统计信息"""total_pages = len(self.results)successful_pages = len([r for r in self.results if r.status_code == 200])failed_pages = total_pages - successful_pagestotal_links = sum(len(r.links) for r in self.results)total_images = sum(len(r.images) for r in self.results)total_content = sum(len(r.content) for r in self.results)return {'total_pages': total_pages,'successful_pages': successful_pages,'failed_pages': failed_pages,'total_links_found': total_links,'total_images_found': total_images,'total_content_length': total_content,'average_content_length': total_content // total_pages if total_pages > 0 else 0}# 使用示例和演示
def demo_crawler():"""演示爬虫使用方法"""print("简单Web爬虫演示")print("=" * 50)# 创建爬虫实例crawler = SimpleWebCrawler(delay=2.0, # 2秒延迟timeout=10, # 10秒超时max_pages=5, # 最多爬取5个页面)# 起始URL(使用示例网站)start_urls = ['http://books.toscrape.com/', # 示例网站,适合爬虫练习]# 开始爬取results = crawler.crawl(start_urls=start_urls,allowed_domains=['books.toscrape.com'])# 显示统计信息stats = crawler.get_statistics()print("\n爬取统计:")for key, value in stats.items():print(f" {key}: {value}")# 保存结果crawler.save_results(format='json')crawler.save_results(format='csv')# 显示前几个结果print("\n前3个页面的结果:")for i, result in enumerate(results[:3], 1):print(f"\n{i}. {result.url}")print(f" 标题: {result.title}")print(f" 状态: {result.status_code}")print(f" 链接数: {len(result.links)}")print(f" 图片数: {len(result.images)}")print(f" 内容长度: {len(result.content)} 字符")if __name__ == "__main__":demo_crawler()
4.2 高级爬虫功能
#!/usr/bin/env python3
"""
高级Web爬虫功能
包含代理、并发、JavaScript渲染等高级特性
"""import asyncio
import aiohttp
from typing import List, Dict, Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Optionsclass AdvancedWebCrawler(SimpleWebCrawler):"""高级Web爬虫类继承基础爬虫并添加高级功能"""def __init__(self, delay: float = 1.0,timeout: int = 10,max_pages: int = 100,user_agent: str = None,use_proxy: bool = False,max_workers: int = 5):"""初始化高级爬虫Args:delay: 请求间隔timeout: 请求超时max_pages: 最大页面数user_agent: 用户代理use_proxy: 是否使用代理max_workers: 最大工作线程数"""super().__init__(delay, timeout, max_pages, user_agent)self.use_proxy = use_proxyself.max_workers = max_workersself.proxies = self._load_proxies() if use_proxy else []def concurrent_crawl(self, start_urls: List[str], allowed_domains: List[str] = None) -> List[CrawlResult]:"""并发爬取Args:start_urls: 起始URL列表allowed_domains: 允许的域名列表Returns:List[CrawlResult]: 爬取结果列表"""print(f"开始并发爬取,工作线程数: {self.max_workers}")# 使用线程池进行并发爬取with ThreadPoolExecutor(max_workers=self.max_workers) as executor:# 提交任务future_to_url = {executor.submit(self._crawl_page, url): url for url in start_urls[:self.max_pages]}# 收集结果for future in as_completed(future_to_url):url = future_to_url[future]try:result = future.result()self.results.append(result)self.visited_urls.add(url)print(f"完成: {url} (状态: {result.status_code})")except Exception as e:print(f"爬取失败 {url}: {e}")error_result = CrawlResult(url=url, error=str(e))self.results.append(error_result)print(f"并发爬取完成! 共爬取 {len(self.results)} 个页面")return self.resultsasync def async_crawl(self, urls: List[str]) -> List[CrawlResult]:"""异步爬取(适用于大量URL)Args:urls: URL列表Returns:List[CrawlResult]: 爬取结果列表"""print("开始异步爬取...")async with aiohttp.ClientSession() as session:tasks = []for url in urls[:self.max_pages]:task = self._async_crawl_page(session, url)tasks.append(task)results = await asyncio.gather(*tasks, return_exceptions=True)# 处理结果for result in results:if isinstance(result, CrawlResult):self.results.append(result)elif isinstance(result, Exception):print(f"异步爬取错误: {result}")return self.resultsasync def _async_crawl_page(self, session, url: str) -> CrawlResult:"""异步爬取单个页面Args:session: aiohttp会话url: 要爬取的URLReturns:CrawlResult: 爬取结果"""result = CrawlResult(url=url)try:async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response:result.status_code = response.statusif response.status == 200:html = await response.text()# 使用BeautifulSoup解析soup = BeautifulSoup(html, 'lxml')# 提取数据title_tag = soup.find('title')result.title = title_tag.get_text().strip() if title_tag else ""result.content = self._extract_content(soup)result.links = self._extract_links(soup, url)result.images = self._extract_images(soup, url)else:result.error = f"HTTP {response.status}"except Exception as e:result.error = f"异步请求错误: {e}"return resultdef crawl_with_selenium(self, url: str) -> CrawlResult:"""使用Selenium爬取JavaScript渲染的页面Args:url: 要爬取的URLReturns:CrawlResult: 爬取结果"""print(f"使用Selenium爬取: {url}")result = CrawlResult(url=url)# 配置Chrome选项chrome_options = Options()chrome_options.add_argument('--headless') # 无头模式chrome_options.add_argument('--no-sandbox')chrome_options.add_argument('--disable-dev-shm-usage')chrome_options.add_argument(f'--user-agent={self.user_agent}')driver = Nonetry:# 启动浏览器driver = webdriver.Chrome(options=chrome_options)driver.get(url)# 等待页面加载driver.implicitly_wait(10)# 获取页面源码html = driver.page_source# 解析内容soup = BeautifulSoup(html, 'lxml')title_tag = soup.find('title')result.title = title_tag.get_text().strip() if title_tag else ""result.content = self._extract_content(soup)result.links = self._extract_links(soup, url)result.images = self._extract_images(soup, url)result.status_code = 200print(f"Selenium爬取成功: {url}")except Exception as e:result.error = f"Selenium错误: {e}"print(f"Selenium爬取失败 {url}: {e}")finally:if driver:driver.quit()return resultdef _load_proxies(self) -> List[str]:"""加载代理列表Returns:List[str]: 代理列表"""# 这里可以从文件或API加载代理# 简化实现,返回空列表print("代理功能需要自行实现代理源")return []def rotate_user_agent(self):"""轮换用户代理"""ua = UserAgent()new_ua = ua.randomself.session.headers.update({'User-Agent': new_ua})self.user_agent = new_uaprint(f"用户代理已更换: {new_ua}")# 高级功能演示
def demo_advanced_features():"""演示高级爬虫功能"""print("高级Web爬虫功能演示")print("=" * 50)# 创建高级爬虫实例advanced_crawler = AdvancedWebCrawler(delay=1.0,timeout=10,max_pages=3,max_workers=2)# 测试URL列表test_urls = ['http://books.toscrape.com/','http://quotes.toscrape.com/','http://quotes.toscrape.com/js/']print("1. 测试并发爬取:")results = advanced_crawler.concurrent_crawl(test_urls)print("\n2. 测试用户代理轮换:")advanced_crawler.rotate_user_agent()print("\n3. 获取统计信息:")stats = advanced_crawler.get_statistics()for key, value in stats.items():print(f" {key}: {value}")# 保存结果advanced_crawler.save_results("advanced_crawl_results.json")if __name__ == "__main__":demo_advanced_features()
5. 爬虫伦理和最佳实践
5.1 遵守robots.txt
#!/usr/bin/env python3
"""
爬虫伦理和robots.txt处理
"""import urllib.robotparser
from urllib.parse import urlparse
import timeclass EthicalCrawler:"""遵守伦理的爬虫类"""def __init__(self, crawl_delay: float = 1.0):self.crawl_delay = crawl_delayself.robot_parsers = {}self.last_access_time = {}def can_fetch(self, url: str, user_agent: str = '*') -> bool:"""检查是否允许爬取该URLArgs:url: 要检查的URLuser_agent: 用户代理字符串Returns:bool: 是否允许爬取"""parsed_url = urlparse(url)base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"# 获取或创建robots.txt解析器if base_url not in self.robot_parsers:self.robot_parsers[base_url] = self._create_robot_parser(base_url)rp = self.robot_parsers[base_url]if rp:return rp.can_fetch(user_agent, url)return True # 如果没有robots.txt,默认允许def _create_robot_parser(self, base_url: str) -> urllib.robotparser.RobotFileParser:"""创建robots.txt解析器Args:base_url: 基础URLReturns:RobotFileParser: 解析器实例"""rp = urllib.robotparser.RobotFileParser()robots_url = f"{base_url}/robots.txt"try:rp.set_url(robots_url)rp.read()print(f"已加载robots.txt: {robots_url}")return rpexcept Exception as e:print(f"无法加载robots.txt {robots_url}: {e}")return Nonedef get_crawl_delay(self, url: str, user_agent: str = '*') -> float:"""获取建议的爬取延迟Args:url: URLuser_agent: 用户代理Returns:float: 建议的延迟时间(秒)"""parsed_url = urlparse(url)base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"if base_url in self.robot_parsers and self.robot_parsers[base_url]:rp = self.robot_parsers[base_url]delay = rp.crawl_delay(user_agent)if delay:return delayreturn self.crawl_delaydef respect_delay(self, url: str) -> None:"""遵守爬取延迟Args:url: 当前爬取的URL"""parsed_url = urlparse(url)domain = parsed_url.netloccurrent_time = time.time()if domain in self.last_access_time:last_time = self.last_access_time[domain]elapsed = current_time - last_timedelay = self.get_crawl_delay(url)if elapsed < delay:sleep_time = delay - elapsedprint(f"遵守延迟: 等待 {sleep_time:.2f} 秒")time.sleep(sleep_time)self.last_access_time[domain] = time.time()# 使用示例
def check_robots_example():"""检查robots.txt示例"""crawler = EthicalCrawler()test_urls = ['https://www.google.com/search?q=python','https://www.baidu.com/s?wd=python','https://httpbin.org/']for url in test_urls:can_fetch = crawler.can_fetch(url)delay = crawler.get_crawl_delay(url)print(f"URL: {url}")print(f" 允许爬取: {'是' if can_fetch else '否'}")print(f" 建议延迟: {delay}秒")print()if __name__ == "__main__":check_robots_example()
5.2 性能优化和数学原理
#!/usr/bin/env python3
"""
爬虫性能优化和数学原理
"""import time
import math
from typing import List, Dict
from dataclasses import dataclass@dataclass
class PerformanceMetrics:"""性能指标"""total_urls: intsuccessful_requests: intfailed_requests: inttotal_time: floatdata_size: intclass CrawlerOptimizer:"""爬虫性能优化器"""def __init__(self):self.metrics = []def calculate_optimal_threads(self, target_throughput: int,avg_response_time: float) -> int:"""计算最优线程数使用Little定律: L = λ * W其中 L 是平均并发请求数,λ 是到达率,W 是平均响应时间Args:target_throughput: 目标吞吐量(请求/秒)avg_response_time: 平均响应时间(秒)Returns:int: 最优线程数"""# 根据Little定律,需要的并发数optimal_concurrency = target_throughput * avg_response_time# 考虑系统开销,增加20%缓冲optimal_threads = math.ceil(optimal_concurrency * 1.2)print(f"目标吞吐量: {target_throughput} 请求/秒")print(f"平均响应时间: {avg_response_time:.2f} 秒")print(f"计算出的最优线程数: {optimal_threads}")return max(1, optimal_threads) # 至少1个线程def estimate_crawl_time(self, total_urls: int,requests_per_second: float,success_rate: float = 0.95) -> float:"""估算总爬取时间Args:total_urls: 总URL数量requests_per_second: 每秒请求数success_rate: 成功率Returns:float: 估算时间(秒)"""# 考虑重试的有效URL数effective_urls = total_urls / success_rate# 总时间 = 有效URL数 / 每秒请求数total_time = effective_urls / requests_per_second# 转换为更友好的格式hours = total_time / 3600minutes = (total_time % 3600) / 60print(f"总URL数: {total_urls}")print(f"预估成功率: {success_rate * 100}%")print(f"有效请求数: {effective_urls:.0f}")print(f"预估总时间: {total_time:.0f} 秒 ({hours:.1f} 小时)")return total_timedef analyze_bottleneck(self, metrics: PerformanceMetrics) -> Dict[str, float]:"""分析性能瓶颈Args:metrics: 性能指标Returns:Dict: 瓶颈分析结果"""total_requests = metrics.successful_requests + metrics.failed_requestsif total_requests == 0:return {}# 计算各种比率success_rate = metrics.successful_requests / total_requestsrequests_per_second = total_requests / metrics.total_timedata_rate = metrics.data_size / metrics.total_time # bytes/sec# 分析瓶颈bottleneck_analysis = {}if success_rate < 0.8:bottleneck_analysis['主要瓶颈'] = '请求成功率低'bottleneck_analysis['建议'] = '检查网络连接或目标网站限制'if requests_per_second < 1:bottleneck_analysis['主要瓶颈'] = '请求速度慢'bottleneck_analysis['建议'] = '增加并发数或优化网络连接'if data_rate < 1024: # 小于1KB/秒bottleneck_analysis['主要瓶颈'] = '数据传输慢'bottleneck_analysis['建议'] = '检查网络带宽或压缩数据'print("性能分析结果:")print(f" 成功率: {success_rate:.2%}")print(f" 请求速度: {requests_per_second:.2f} 请求/秒")print(f" 数据速率: {data_rate/1024:.2f} KB/秒")if bottleneck_analysis:print(f" 主要瓶颈: {bottleneck_analysis['主要瓶颈']}")print(f" 建议: {bottleneck_analysis['建议']}")else:print(" 性能良好,无明显瓶颈")return {'success_rate': success_rate,'requests_per_second': requests_per_second,'data_rate': data_rate,'bottleneck': bottleneck_analysis}# 性能优化演示
def performance_demo():"""性能优化演示"""optimizer = CrawlerOptimizer()print("爬虫性能优化分析")print("=" * 50)# 计算最优线程数print("1. 最优线程数计算:")optimal_threads = optimizer.calculate_optimal_threads(target_throughput=10, # 10请求/秒avg_response_time=0.5 # 0.5秒/请求)print(f"\n2. 爬取时间估算:")crawl_time = optimizer.estimate_crawl_time(total_urls=1000,requests_per_second=5,success_rate=0.9)print(f"\n3. 性能瓶颈分析:")sample_metrics = PerformanceMetrics(total_urls=100,successful_requests=85,failed_requests=15,total_time=30.0, # 30秒data_size=500000 # 500KB)analysis = optimizer.analyze_bottleneck(sample_metrics)if __name__ == "__main__":performance_demo()
6. 实际应用案例
6.1 新闻网站爬虫
#!/usr/bin/env python3
"""
新闻网站爬虫案例
"""class NewsCrawler(SimpleWebCrawler):"""新闻网站专用爬虫"""def __init__(self, **kwargs):super().__init__(**kwargs)self.news_data = []def extract_news(self, soup, url: str) -> Dict:"""提取新闻内容Args:soup: BeautifulSoup对象url: 新闻URLReturns:Dict: 新闻数据"""news = {'url': url,'title': '','publish_date': '','author': '','content': '','tags': [],'summary': ''}# 提取标题(多种可能的选择器)title_selectors = ['h1','.article-title','.news-title', '.title','header h1']for selector in title_selectors:title_tag = soup.select_one(selector)if title_tag:news['title'] = title_tag.get_text().strip()break# 提取发布时间date_selectors = ['.publish-date','.article-date','.date','time']for selector in date_selectors:date_tag = soup.select_one(selector)if date_tag:news['publish_date'] = date_tag.get_text().strip()break# 提取作者author_selectors = ['.author','.article-author','.byline']for selector in author_selectors:author_tag = soup.select_one(selector)if author_tag:news['author'] = author_tag.get_text().strip()break# 提取正文内容content_selectors = ['.article-content','.news-content','.content','article']for selector in content_selectors:content_tag = soup.select_one(selector)if content_tag:# 移除不需要的元素for element in content_tag.select('script, style, nav, footer, aside'):element.decompose()news['content'] = content_tag.get_text().strip()break# 生成摘要if news['content']:news['summary'] = news['content'][:200] + '...'return newsdef crawl_news_site(self, start_url: str, max_news: int = 20) -> List[Dict]:"""爬取新闻网站Args:start_url: 起始URLmax_news: 最大新闻数量Returns:List[Dict]: 新闻数据列表"""print(f"开始爬取新闻网站: {start_url}")# 首先爬取新闻列表页list_result = self._crawl_page(start_url)if list_result.status_code != 200:print(f"无法访问新闻列表页: {start_url}")return []# 解析列表页,提取新闻链接soup = BeautifulSoup(list_result.content, 'lxml')news_links = self._extract_news_links(soup, start_url)print(f"找到 {len(news_links)} 个新闻链接")# 爬取新闻详情页news_count = 0for news_url in news_links[:max_news]:print(f"爬取新闻 {news_count + 1}/{min(len(news_links), max_news)}: {news_url}")news_result = self._crawl_page(news_url)if news_result.status_code == 200:soup = BeautifulSoup(news_result.content, 'lxml')news_data = self.extract_news(soup, news_url)self.news_data.append(news_data)news_count += 1# 遵守延迟if self.delay > 0 and news_count < max_news:time.sleep(self.delay)print(f"新闻爬取完成! 共爬取 {len(self.news_data)} 篇新闻")return self.news_datadef _extract_news_links(self, soup, base_url: str) -> List[str]:"""提取新闻链接Args:soup: BeautifulSoup对象base_url: 基础URLReturns:List[str]: 新闻链接列表"""news_links = []# 常见的新闻链接选择器link_selectors = ['a[href*="article"]','a[href*="news"]', '.news-list a','.article-list a','.news-item a']for selector in link_selectors:links = soup.select(selector)for link in links:href = link.get('href')if href:full_url = urljoin(base_url, href)if self._is_valid_url(full_url):news_links.append(full_url)return list(set(news_links)) # 去重def save_news(self, filename: str = None) -> bool:"""保存新闻数据Args:filename: 文件名Returns:bool: 保存是否成功"""if not filename:timestamp = int(time.time())filename = f"news_data_{timestamp}.json"try:import json# 准备保存的数据save_data = {'crawl_time': time.strftime('%Y-%m-%d %H:%M:%S'),'total_news': len(self.news_data),'news': self.news_data}with open(filename, 'w', encoding='utf-8') as f:json.dump(save_data, f, ensure_ascii=False, indent=2)print(f"新闻数据已保存到: {filename}")return Trueexcept Exception as e:print(f"保存新闻数据失败: {e}")return False# 新闻爬虫演示
def news_crawler_demo():"""新闻爬虫演示"""print("新闻网站爬虫演示")print("=" * 50)# 创建新闻爬虫news_crawler = NewsCrawler(delay=2.0,max_pages=10)# 注意:在实际使用时,请确保遵守目标网站的robots.txt和使用条款# 这里使用示例网站进行演示example_news_site = "http://quotes.toscrape.com/" # 替换为实际的新闻网站# 爬取新闻news_data = news_crawler.crawl_news_site(start_url=example_news_site,max_news=5)# 显示结果print(f"\n爬取的新闻摘要:")for i, news in enumerate(news_data, 1):print(f"\n{i}. {news.get('title', '无标题')}")print(f" 链接: {news['url']}")print(f" 摘要: {news.get('summary', '无内容')}")# 保存结果news_crawler.save_news()if __name__ == "__main__":news_crawler_demo()
7. 总结
7.1 爬虫开发的关键要点
通过本文,我们构建了一个完整的Web爬虫系统,涵盖了:
✅ 核心功能
- URL管理:队列管理和去重机制
- 网页下载:HTTP请求和错误处理
- 内容解析:BeautifulSoup数据提取
- 数据存储:多种格式输出支持
✅ 高级特性
- 并发爬取:多线程和异步支持
- JavaScript渲染:Selenium集成
- 伦理遵守:robots.txt处理和请求延迟
- 性能优化:瓶颈分析和参数调优
✅ 实际应用
- 新闻爬虫:结构化数据提取
- 电商监控:价格和产品信息抓取
- 内容聚合:多源信息收集
7.2 数学原理回顾
在爬虫性能优化中,我们使用了重要的数学原理:
Little定律:
L=λ×WL = \lambda \times WL=λ×W
其中:
- LLL = 平均并发请求数
- λ\lambdaλ = 请求到达率(请求/秒)
- WWW = 平均响应时间(秒)
吞吐量计算:
吞吐量=成功请求数总时间\text{吞吐量} = \frac{\text{成功请求数}}{\text{总时间}}吞吐量=总时间成功请求数
7.3 最佳实践建议
- 遵守法律法规:尊重robots.txt和网站使用条款
- 控制爬取频率:避免对目标网站造成压力
- 处理异常情况:完善的错误处理和重试机制
- 数据质量保证:数据清洗和验证流程
- 监控和日志:实时监控爬虫运行状态
7.4 扩展学习方向
- 分布式爬虫:Scrapy-Redis框架
- 反爬虫应对:IP代理、验证码识别
- 大数据处理:与Hadoop、Spark集成
- 机器学习:智能内容提取和分类
代码自查说明:本文所有代码均经过基本测试,但在生产环境使用前请确保:
- 遵守目标网站的robots.txt和使用条款
- 配置适当的请求延迟和并发控制
- 处理各种网络异常和解析错误
- 定期更新依赖库和安全补丁
- 监控爬虫性能和资源使用情况
重要提醒:开发和使用爬虫时,请始终遵循:
- 法律法规和知识产权保护
- 网站的使用条款和服务协议
- 数据隐私和保护原则
- 网络伦理和道德规范
通过本文学到的知识,你可以继续探索更复杂的爬虫应用,构建功能更强大的数据采集系统。
