爬取新浪新闻网的全部策略
!说在前面,学而时习之不亦乐乎,我们学了的东西能够运用到我们的项目中,这既是复习,也是学习
爬取网站 新浪新闻中心
教程 大规模异步新闻爬虫:实现功能强大、简洁易用的网址池(URL Pool) - 猿人学Python爬虫
我们先看一般爬虫的思路
#!/usr/bin/env python3
# Author: veelionimport re
import time
import requests
import tldextractdef save_to_db(url, html):# 保存网页到数据库,我们暂时用打印相关信息代替print('%s : %s' % (url, len(html)))def crawl():# 1. download baidu newshub_url = 'http://news.baidu.com/'res = requests.get(hub_url)html = res.text# 2. extract news links## 2.1 extract all links with 'href'links = re.findall(r'href=[\'"]?(.*?)[\'"\s]', html)print('find links:', len(links))news_links = []## 2.2 filter non-news linkfor link in links:if not link.startswith('http'):continuetld = tldextract.extract(link)if tld.domain == 'baidu':continuenews_links.append(link)print('find news links:', len(news_links))# 3. download news and save to databasefor link in news_links:html = requests.get(link).textsave_to_db(link, html)print('works done!')def main():while 1:crawl()time.sleep(300)if __name__ == '__main__':main()
步骤
1. 使用requests下载百度新闻首页;
2. 先用正则表达式提取a标签的href属性,也就是网页中的链接;然后找出新闻的链接,方法是:假定非百度的外链都是新闻链接;
3. 逐个下载找到的所有新闻链接并保存到数据库;保存到数据库的函数暂时用打印相关信息代替。
4. 每隔300秒重复1-3步,以抓取更新的新闻。
以上代码能工作,但也仅仅是能工作,槽点多得也不是一点半点,那就让我们一起边吐槽边完善这个爬虫吧。
即爬取,清取,存储。
但不可忽略的需要下面的几点作为巩固
. 增加异常处理
在写爬虫,尤其是网络请求相关的代码,一定要有异常处理。目标服务器是否正常,当时的网络连接是否顺畅(超时)等状况都是爬虫无法控制的,所以在处理网络请求时必须要处理异常。网络请求最好设置timeout,别在某个请求耗费太多时间。timeout 导致的识别,有可能是服务器响应不过来,也可能是暂时的网络出问题。所以,对于timeout的异常,我们需要过段时间再尝试。
2. 要对服务器返回的状态,如404,500等做出处理
服务器返回的状态很重要,这决定着我们爬虫下一步该怎么做。需要处理的常见状态有:
- 301, 该URL被永久转移到其它URL,以后请求的话就请求被转移的URL
- 404,基本上是这个网站已经失效了,后面也就别试了
- 500,服务器内部出错了,可能是暂时的,后面要再次请求试试
3. 管理好URL的状态
记录下此次失败的URL,以便后面再试一次。对于timeout的URL,需要后面再次抓取,所以需要记录所有URL的各种状态,包括:
- 已经下载成功
- 下载多次失败无需再下载
- 正在下载
- 下载失败要再次尝试
增加了对网络请求的各种处理,这个爬虫就健壮多了,不会动不动就异常退出,给后面运维带来很多的工作量。
返回的数据,编码格式不同: res.text判断中文编码时有时候会出错,还是自己通过cchardet(用C语言实现的chardet)获取更准确。这里,
然后封装一个downloder的函数
import requests # 导入 requests 库用于发送 HTTP 请求
import cchardet # 导入 cchardet 库用于检测字符编码
import traceback # 导入 traceback 库用于打印详细的错误堆栈信息def downloader(url, timeout=10, headers=None, debug=False, binary=False):"""下载指定 URL 的内容,支持文本和二进制格式参数:url (str): 要下载的 URLtimeout (int): 请求超时时间(秒),默认 10 秒headers (dict): 自定义请求头,默认使用 IE9 浏览器的 User-Agentdebug (bool): 是否打印调试信息,默认 Falsebinary (bool): 是否以二进制模式返回内容,默认 False(返回文本)返回:tuple: (状态码, 内容, 最终请求的 URL)"""# 设置默认请求头,模拟 IE9 浏览器_headers = {'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; ''Windows NT 6.1; Win64; x64; Trident/5.0)'),}redirected_url = url # 初始化最终 URL 为原始 URL# 如果提供了自定义请求头,则使用自定义头if headers:_headers = headerstry:# 发送 HTTP GET 请求r = requests.get(url, headers=_headers, timeout=timeout)# 根据 binary 参数决定返回二进制内容还是文本内容if binary:html = r.content # 二进制内容else:# 使用 cchardet 检测内容编码并解码为字符串encoding = cchardet.detect(r.content)['encoding']html = r.content.decode(encoding)status = r.status_code # 获取 HTTP 状态码redirected_url = r.url # 获取最终请求的 URL(处理重定向)except:# 发生异常时的处理if debug:traceback.print_exc() # 打印详细的错误堆栈msg = 'failed download: {}'.format(url)print(msg) # 打印错误信息# 根据 binary 参数返回空内容if binary:html = b'' # 空二进制内容else:html = '' # 空字符串status = 0 # 状态码设为 0 表示下载失败return status, html, redirected_url # 返回三元组if __name__ == '__main__':url = 'http://news.baidu.com/' # 测试 URL# 调用下载函数,获取状态码、内容和最终 URLs, html, redirected_url = downloader(url)# 打印状态码、内容长度和最终 URLprint(s, len(html), redirected_url)
这个downloader()函数,内置了默认的User-Agent模拟成一个IE9浏览器,同时接受调用者自定义的headers和timeout。使用cchardet来处理编码问题,返回数据包括:状态码:如果出现异常,设置为0
内容: 默认返回str内容。但是URL链接的是图片等二进制内容时,注意调用时要设binary=True
重定向URL: 有些URL会被重定向,最终页面的url包含在响应对象里面
主要功能
个downloader()函数,内置了默认的User-Agent模拟成一个IE9浏览器,同时接受调用者自定义的headers和timeout。使用cchardet
来处理编码问题,返回数据包括:
- 状态码:如果出现异常,设置为0
- 内容: 默认返回str内容。但是URL链接的是图片等二进制内容时,注意调用时要设binary=True
- 重定向URL: 有些URL会被重定向,最终页面的url包含在响应对象里面
url库
因为我们在爬取一个网站中所有数据的时候,有很多url,在这个时候就得
我们从网址池的使用目的出发来设计网址池的接口,它应该具有以下功能:
- 往池子里面添加URL;
- 从池子里面取URL以下载;
- 池子内部要管理URL状态;
前面我提到URL的状态有以下4中:
- 已经下载成功
- 下载多次失败无需再下载
- 正在下载
- 下载失败要再次尝试
保证这个状态
对于比较大型的爬虫来说,URL管理的管理是个核心问题,管理不好,就可能重复下载,也可能遗漏下载。这里,我们设计一个URL Pool来管理URL。
这个URL Pool就是一个生产者-消费者模式:生产者-消费者流程图
我们从网址池的使用目的出发来设计网址池的接口,它应该具有以下功能:往池子里面添加URL;
从池子里面取URL以下载;
池子内部要管理URL状态;
前面我提到URL的状态有以下4中:已经下载成功
下载多次失败无需再下载
正在下载
下载失败要再次尝试
前两个是永久状态,也就是已经下载成功的不再下载,多次尝试后仍失败的也就不再下载,它们需要永久存储起来,以便爬虫重启后,这种永久状态记录不会消失,已经成功下载的URL不再被重复下载。永久存储的方法有很多种:比如,直接写入文本文件,但它不利于查找某个URL是否已经存在文本中;
比如,直接写入MySQL等关系型数据库,它利用查找,但是速度又比较慢;
比如,使用key-value数据库,查找和速度都符合要求,是不错的选择!我们这个URL Pool选用LevelDB来作为URL状态的永久存储。LevelDB是Google开源的一个key-value数据库,速度非常快,同时自动压缩数据。我们用它先来实现一个UrlDB作为永久存储数据库。
爬虫-网网址解析,然后爬取,然后再解析
以下是添加了详细注释的代码:```python
import leveldbclass UrlDB:'''使用 LevelDB 存储已处理的 URL 及其状态(成功或失败)'''status_failure = b'0' # 失败状态标识(字节类型)status_success = b'1' # 成功状态标识(字节类型)def __init__(self, db_name):"""初始化 URL 数据库参数:db_name (str): 数据库名称,实际存储为 db_name.urldb"""self.name = db_name + '.urldb' # 数据库文件名self.db = leveldb.LevelDB(self.name) # 打开/创建 LevelDB 数据库def set_success(self, url):"""标记 URL 为成功状态参数:url (str): 需要标记的 URL返回:bool: 操作是否成功"""if isinstance(url, str):url = url.encode('utf8') # 将字符串转换为 UTF-8 字节try:self.db.Put(url, self.status_success) # 存储 URL 及其成功状态s = Trueexcept:s = Falsereturn sdef set_failure(self, url):"""标记 URL 为失败状态参数:url (str): 需要标记的 URL返回:bool: 操作是否成功"""if isinstance(url, str):url = url.encode('utf8') # 将字符串转换为 UTF-8 字节try:self.db.Put(url, self.status_failure) # 存储 URL 及其失败状态s = Trueexcept:s = Falsereturn sdef has(self, url):"""检查 URL 是否已存在于数据库中,并返回其状态参数:url (str): 需要检查的 URL返回:bytes: 如果存在,返回状态字节(b'0' 或 b'1')False: 如果不存在"""if isinstance(url, str):url = url.encode('utf8') # 将字符串转换为 UTF-8 字节try:attr = self.db.Get(url) # 获取 URL 对应的状态return attrexcept:pass # 捕获异常但不处理(表示 URL 不存在)return False
```### 代码功能说明这段代码实现了一个基于 LevelDB 的 URL 状态管理系统,主要功能包括:1. **状态存储**:- 使用 `status_success` (b'1') 表示 URL 已成功处理- 使用 `status_failure` (b'0') 表示 URL 处理失败2. **数据库操作**:- `__init__`:初始化并连接到 LevelDB 数据库- `set_success`:将 URL 标记为成功状态- `set_failure`:将 URL 标记为失败状态- `has`:检查 URL 是否已存在并返回其状态3. **编码处理**:- 所有字符串在存储前都会转换为 UTF-8 字节- 确保中文等非 ASCII 字符能正确存储### 注意事项1. **异常处理**:- 所有 LevelDB 操作都包含异常处理- 操作失败时返回布尔值,而不是抛出异常- `has` 方法在 URL 不存在时返回 `False`,存在时返回状态字节2. **性能特点**:- LevelDB 是高性能的键值存储,适合大量 URL 的快速读写- 数据存储在磁盘上,重启程序后数据不会丢失3. **使用建议**:- 在爬虫开始处理 URL 前,先调用 `has` 方法检查是否已处理- 处理成功后调用 `set_success`,失败后调用 `set_failure`- 可通过检查状态值区分成功和失败的 URL这个类提供了一个简单而高效的方法来跟踪和管理爬虫处理过的 URL,避免重复抓取相同内容。
。同样也是可以对字典UrlDB将被UrlPool使用,主要有三个方法被使用:has(url) 查看是否已经存在某url
set_success(url) 存储url状态为成功
set_failure(url) 存储url状态为失败以下是添加了详细注释的代码:```python
#Author: veelionimport pickle
import leveldb
import time
import urllib.parse as urlparse
from urllib.parse import urlparseclass UrlPool:'''爬虫URL池管理类,用于调度和跟踪URL状态'''def __init__(self, pool_name):"""初始化URL池参数:pool_name (str): URL池名称,用于标识数据库和缓存文件"""self.name = pool_name # URL池名称self.db = UrlDB(pool_name) # 底层存储:使用UrlDB记录URL处理状态(成功/失败)# 核心数据结构self.waiting = {} # {主机: URL集合} 按主机分组的待下载URLself.pending = {} # {URL: 取出时间} 已取出但未完成下载的URL(处理中)self.failure = {} # {URL: 失败次数} 下载失败的URL及其重试次数self.failure_threshold = 3 # 最大失败次数,超过则标记为永久失败self.pending_threshold = 10 # pending状态超时时间(秒),超时后重新加入队列self.waiting_count = 0 # 待下载URL总数self.max_hosts = ['', 0] # 记录当前URL最多的主机及其数量self.hub_pool = {} # {枢纽URL: 上次访问时间} 存放重要URL(如首页)self.hub_refresh_span = 0 # 枢纽URL刷新间隔(秒)self.load_cache() # 初始化时加载缓存数据def __del__(self):"""析构函数:程序退出时自动保存缓存"""self.dump_cache()def load_cache(self):"""从文件加载缓存数据(waiting状态)"""path = self.name + '.pkl'try:with open(path, 'rb') as f:self.waiting = pickle.load(f) # 加载待下载URL分组数据# 计算总URL数并打印日志cc = [len(v) for k, v in self.waiting.items()]print(f'saved pool loaded! urls: {sum(cc)}')except:pass # 首次运行或缓存损坏时忽略错误def dump_cache(self):"""将缓存数据(waiting状态)保存到文件"""path = self.name + '.pkl'try:with open(path, 'wb') as f:pickle.dump(self.waiting, f) # 序列化待下载URL分组数据print('self.waiting saved!')except:pass # 保存失败时忽略错误def set_hubs(self, urls, hub_refresh_span):"""设置枢纽URL及其刷新间隔参数:urls (list): 枢纽URL列表(如网站首页)hub_refresh_span (int): 刷新间隔(秒),控制枢纽URL的抓取频率"""self.hub_refresh_span = hub_refresh_spanself.hub_pool = {url: 0 for url in urls} # 初始化枢纽URL池def set_status(self, url, status_code):"""更新URL状态(成功/失败/重试)参数:url (str): 目标URLstatus_code (int): 下载状态码(200=成功,404=永久失败,其他=可重试)"""if url in self.pending:self.pending.pop(url) # 从处理中列表移除if status_code == 200:self.db.set_success(url) # 标记为成功elif status_code == 404:self.db.set_failure(url) # 标记为永久失败else:# 处理可重试的失败情况if url in self.failure:self.failure[url] += 1# 超过失败阈值则标记为永久失败if self.failure[url] > self.failure_threshold:self.db.set_failure(url)self.failure.pop(url)else:self.add(url) # 重新加入待下载队列else:self.failure[url] = 1self.add(url) # 首次失败,加入队列def push_to_pool(self, url):"""将URL按主机分组加入待下载队列参数:url (str): 目标URL返回:bool: 操作是否成功(非法URL会被过滤)"""# 解析URL主机部分,过滤非法URLparsed = urlparse(url)host = parsed.netlocif not host or '.' not in host:print(f'try to push_to_pool with bad url: {url}, len: {len(url)}')return False# 更新主机分组数据if host in self.waiting:if url in self.waiting[host]:return True # 避免重复添加self.waiting[host].add(url)# 更新最大URL主机记录if len(self.waiting[host]) > self.max_hosts[1]:self.max_hosts = [host, len(self.waiting[host])]else:self.waiting[host] = {url} # 初始化主机分组self.waiting_count += 1 # 总URL数+1return Truedef add(self, url, always=False):"""向URL池添加URL(自动去重和状态检查)参数:url (str): 目标URLalways (bool): 是否强制添加(忽略已存在状态)"""if always:return self.push_to_pool(url) # 强制添加# 检查是否正在处理中(pending状态)pended_time = self.pending.get(url, 0)if time.time() - pended_time < self.pending_threshold:print(f'being downloading: {url}')return # 处理中,跳过# 检查是否已处理(成功/失败)if self.db.has(url):return # 已处理,跳过# 移除旧的pending记录if pended_time:self.pending.pop(url)return self.push_to_pool(url) # 正常添加def addmany(self, urls, always=False):"""批量添加URL(支持字符串或列表)参数:urls (str/list): URL或URL列表always (bool): 是否强制添加(忽略已存在状态)"""if isinstance(urls, str):print(f'urls is a str !!!! {urls}')self.add(urls, always)else:for url in urls:self.add(url, always)def pop(self, count, hub_percent=50):"""按策略取出URL(优先枢纽URL和热门主机)参数:count (int): 需取出的URL数量hub_percent (int): 枢纽URL占比(%)返回:dict: {URL: 类型标识},类型1=枢纽URL,0=普通URL"""print(f"\n\tmax of host: {self.max_hosts}")url_attr_url = 0 # 普通URL标识url_attr_hub = 1 # 枢纽URL标识# 1. 优先取出枢纽URLhubs = {}hub_count = count * hub_percent // 100 # 计算枢纽URL数量for hub in list(self.hub_pool.keys()): # 使用list()避免迭代时修改字典span = time.time() - self.hub_pool[hub]if span >= self.hub_refresh_span: # 超过刷新间隔则取出hubs[hub] = url_attr_hubself.hub_pool[hub] = time.time() # 更新最后访问时间if len(hubs) >= hub_count:break # 达到数量上限则停止# 2. 取出普通URL(按主机分组优先,热门主机优先)left_count = count - len(hubs)urls = {}for host in list(self.waiting.keys()): # 使用list()避免迭代时修改字典if not self.waiting[host]:continue # 空分组跳过# 从分组中取出一个URLurl = self.waiting[host].pop()urls[url] = url_attr_urlself.pending[url] = time.time() # 标记为处理中# 更新热门主机记录if host == self.max_hosts[0]:self.max_hosts[1] -= 1if self.max_hosts[1] == 0:self.max_hosts[0] = '' # 主机分组为空时重置if len(urls) >= left_count:break # 达到数量上限则停止self.waiting_count -= len(urls) # 更新待下载总数print(f'To pop:{count}, hubs: {len(hubs)}, urls: {len(urls)}, hosts:{len(self.waiting)}')urls.update(hubs) # 合并枢纽URL和普通URLreturn urlsdef size(self):"""返回待下载URL总数"""return self.waiting_countdef empty(self):"""判断URL池是否为空"""return self.waiting_count == 0
```### 核心功能模块说明1. **数据存储与缓存**:- **LevelDB**:底层使用`UrlDB`存储URL处理状态(成功/失败),避免重复抓取- **Pickle缓存**:定期保存`waiting`状态到文件,防止程序中断导致URL队列丢失2. **URL调度策略**:- **枢纽URL优先**:通过`set_hubs()`设置重要URL(如首页),按指定间隔定期抓取- **主机分组管理**:将URL按主机分组,优先处理热门主机(`max_hosts`),避免集中请求单一站点- **状态管理**:- `waiting`:待下载URL(按主机分组)- `pending`:已取出但未完成下载的URL(处理中)- `failure`:记录失败URL的重试次数,超过阈值则标记为永久失败3. **去重与过滤**:- 添加URL时自动检查`db.has(url)`,避免重复添加已处理的URL- 过滤非法URL(无主机或主机格式错误)4. **异常处理与重试**:- 下载失败时根据状态码判断是否重试(404直接标记失败,其他状态码允许重试)- `pending_threshold`防止长时间占用URL(处理超时后重新加入队列)### 关键参数说明| 参数名称 | 作用 |
|------------------------|----------------------------------------------------------------------|
| `failure_threshold` | 最大失败次数,超过则标记URL为永久失败 |
| `pending_threshold` | 处理中URL的超时时间,超时后重新加入待下载队列 |
| `hub_refresh_span` | 枢纽URL的刷新间隔,控制其抓取频率 |
| `hub_percent` | 每次取出URL时枢纽URL的占比,平衡重要内容和新内容的抓取 |
| `max_hosts` | 跟踪当前URL最多的主机,实现热门主机优先的调度策略 |### 使用建议1. **初始化流程**:```pythonpool = UrlPool("news_pool") # 创建URL池pool.set_hubs(["https://news.example.com"], 86400) # 设置枢纽URL(每天抓取一次)```2. **添加URL**:```pythonpool.addmany(["https://news.example.com/article1.html", "https://news.example.com/article2.html"])```3. **取出URL进行下载**:```pythonurls = pool.pop(10) # 每次取出10个URL(默认50%为枢纽URL)for url, attr in urls.items():# 执行下载逻辑status_code = download(url)pool.set_status(url, status_code) # 更新URL状态```4. **监控状态**:```pythonprint(f"待下载URL数: {pool.size()}")print(f"热门主机: {pool.max_hosts[0]} ({pool.max_hosts[1]}个URL)")```这个URL池实现了爬虫中常见的调度策略,结合了优先级管理、去重、失败重试和状态持久化,适用于大规模新闻抓取场景。先看看它的主要成员及其用途:self.db 是一个UrlDB的示例,用来永久存储url的永久状态
self.pool 是用来存放url的,它是一个字典(dict)结构,key是url的host,value是一个用来存储这个host的所有url的集合(set)。
self.pending 用来管理正在下载的url状态。它是一个字典结构,key是url,value是它被pop的时间戳。当一个url被pop()时,就是它被下载的开始。当该url被set_status()时,就是下载结束的时刻。如果一个url被add() 入pool时,发现它已经被pended的时间超过pending_threshold时,就可以再次入库等待被下载。否则,暂不入池。
self.failue 是一个字典,key是url,value是识别的次数,超过failure_threshold就会被永久记录为失败,不再尝试下载。
hub_pool 是一个用来存储hub页面的字典,key是hub url,value是上次刷新该hub页面的时间.
以上成员就构成了我们这个网址池的数据结构,再通过以下成员方法对这个网址池进行操作:1. load_cache() 和 dump_cache() 对网址池进行缓存
load_cache() 在 init()中调用,创建pool的时候,尝试去加载上次退出时缓存的URL pool;
dump_cache() 在 del() 中调用,也就是在网址池销毁前(比如爬虫意外退出),把内存中的URL pool缓存到硬盘。
这里使用了pickle 模块,这是一个把内存数据序列化到硬盘的工具。** 2. set_hubs() 方法设置hub URL**
hub网页就是像百度新闻那样的页面,整个页面都是新闻的标题和链接,是我们真正需要的新闻的聚合页面,并且这样的页面会不断更新,把最新的新闻聚合到这样的页面,我们称它们为hub页面,其URL就是hub url。在新闻爬虫中添加大量的这样的url,有助于爬虫及时发现并抓取最新的新闻。
该方法就是将这样的hub url列表传给网址池,在爬虫从池中取URL时,根据时间间隔(self.hub_refresh_span)来取hub url。** 3. add(), addmany(), push_to_pool() 对网址池进行入池操作**
把url放入网址池时,先检查内存中的self.pending是否存在该url,即是否正在下载该url。如果正在下载就不入池;如果正下载或已经超时,就进行到下一步;
接着检查该url是否已经在leveldb中存在,存在就表明之前已经成功下载或彻底失败,不再下载了也不入池。如果没有则进行到下一步;
最后通过push_to_pool() 把url放入self.pool中。存放的规则是,按照url的host进行分类,相同host的url放到一起,在取出时每个host取一个url,尽量保证每次取出的一批url都是指向不同的服务器的,这样做的目的也是为了尽量减少对抓取目标服务器的请求压力。力争做一个服务器友好的爬虫 O(∩_∩)O** 4. pop() 对网址池进行出池操作**
爬虫通过该方法,从网址池中获取一批url去下载。取出url分两步:
第一步,先从self.hub_pool中获得,方法是遍历hub_pool,检查每个hub-url距上次被pop的时间间隔是否超过hub页面刷新间隔(self.hub_refresh_span),来决定hub-url是否应该被pop。
第二步,从self.pool中获取。前面push_to_pool中,介绍了pop的原则,就是每次取出的一批url都是指向不同服务器的,有了self.pool的特殊数据结构,安装这个原则获取url就简单了,按host(self.pool的key)遍历self.pool即可。** 5. set_status() 方法设置网址池中url的状态**
其参数status_code 是http响应的状态码。爬虫在下载完URL后进行url状态设置。
首先,把该url成self.pending中删除,已经下载完毕,不再是pending状态;
接着,根据status_code来设置url状态,200和404的直接设置为永久状态;其它status就记录失败次数,并再次入池进行后续下载尝试。通过以上成员变量和方法,我们把这个网址池(UrlPool)解析的清清楚楚。小猿们可以毫不客气的收藏起来,今后在写爬虫时可以用它方便的管理URL,并且这个实现只有一个py文件,方便加入到任何项目中。爬虫知识点
1. 网址的管理
网址的管理,其目的就是为了:不重抓,不漏抓。2. pickle 模块
把内存数据保存到硬盘,再把硬盘数据重新加载到内存,这是很多程序停止和启动的必要步骤。pickle就是实现数据在内存和硬盘之间转移的模块。3. leveldb 模块
这是一个经典且强大的硬盘型key-value数据库,非常适合url-status这种结构的存储。4. urllib.parse 4. urllib.解析
解析网址的模块,在处理url时首先想到的模块就应该是它。下一篇我们把mysql再封装一下。
大规模异步新闻爬虫:实现功能强大、简洁易用的网址池(URL Pool) - 猿人学Python爬虫
这里的主要,函数解释
init是初始函数,这个是永远
- self.db 是一个UrlDB的示例,用来永久存储url的永久状态
- self.pool 是用来存放url的,它是一个字典(dict)结构,key是url的host,value是一个用来存储这个host的所有url的集合(set)。
- self.pending 用来管理正在下载的url状态。它是一个字典结构,key是url,value是它被pop的时间戳。当一个url被pop()时,就是它被下载的开始。当该url被set_status()时,就是下载结束的时刻。如果一个url被add() 入pool时,发现它已经被pended的时间超过pending_threshold时,就可以再次入库等待被下载。否则,暂不入池。
- self.failue 是一个字典,key是url,value是识别的次数,超过failure_threshold就会被永久记录为失败,不再尝试下载。
- hub_pool 是一个用来存储hub页面的字典,key是hub url,value是上次刷新该hub页面的时间.
两个类,数据库类和和url类
数据库类,记录状态
成功,失败,是否存在数据库中
这里用了三个函数实现
def set_success(self, url):
"""
标记 URL 为成功状态
参数:
url (str): 需要标记的 URL
返回:
bool: 操作是否成功
"""
if isinstance(url, str):
url = url.encode('utf8') # 将字符串转换为 UTF-8 字节
try:
self.db.Put(url, self.status_success) # 存储 URL 及其成功状态
s = True
except:
s = False
return s
- 如果这个url是str的子类,就将其转化为utf-8的格式
- 然后尝试用初始化中init初始化中
def __init__(self, db_name):
"""
初始化 URL 数据库
参数:
db_name (str): 数据库名称,实际存储为 db_name.urldb
"""
self.name = db_name + '.urldb' # 数据库文件名
self.db = leveldb.LevelDB(self.name) # 打开/创建 LevelDB 数据库
这个的self.db.Put(url, self.status_success) # 存储 URL 及其成功状态,放到数据库中,并且给予一个状态,放入成功的话,就将其为true
- 然后reutrn这个s 的状态函数的值
def set_failure(self, url):
"""
标记 URL 为失败状态
参数:
url (str): 需要标记的 URL
返回:
bool: 操作是否成功
"""
if isinstance(url, str):
url = url.encode('utf8') # 将字符串转换为 UTF-8 字节
try:
self.db.Put(url, self.status_failure) # 存储 URL 及其失败状态
s = True
except:
s = False
return s
如果url放入的话是失败状态的话,同样存储其失败的状态
def has(self, url):"""检查 URL 是否已存在于数据库中,并返回其状态参数:url (str): 需要检查的 URL返回:bytes: 如果存在,返回状态字节(b'0' 或 b'1')False: 如果不存在"""if isinstance(url, str):url = url.encode('utf8') # 将字符串转换为 UTF-8 字节try:attr = self.db.Get(url) # 获取 URL 对应的状态return attrexcept:pass # 捕获异常但不处理(表示 URL 不存在)return False
再用得到的状态,从数据库中拿出这个url,则可以知道这个状态是什么
. **状态存储**:
- 使用 `status_success` (b'1') 表示 URL 已成功处理
- 使用 `status_failure` (b'0') 表示 URL 处理失败2. **数据库操作**:
- `__init__`:初始化并连接到 LevelDB 数据库
- `set_success`:将 URL 标记为成功状态
- `set_failure`:将 URL 标记为失败状态
- `has`:检查 URL 是否已存在并返回其状态3. **编码处理**:
- 所有字符串在存储前都会转换为 UTF-8 字节
- 确保中文等非 ASCII 字符能正确存储### 注意事项
1. **异常处理**:
- 所有 LevelDB 操作都包含异常处理
- 操作失败时返回布尔值,而不是抛出异常
- `has` 方法在 URL 不存在时返回 `False`,存在时返回状态字节2. **性能特点**:
- LevelDB 是高性能的键值存储,适合大量 URL 的快速读写
- 数据存储在磁盘上,重启程序后数据不会丢失3. **使用建议**:
- 在爬虫开始处理 URL 前,先调用 `has` 方法检查是否已处理
- 处理成功后调用 `set_success`,失败后调用 `set_failure`
- 可通过检查状态值区分成功和失败的 URL这个类提供了一个简单而高效的方法来跟踪和管理爬虫处理过的 URL,避免重复抓取相同内容。
。同样也是可以对字典UrlDB将被UrlPool使用,主要有三个方法被使用:has(url) 查看是否已经存在某url
set_success(url) 存储url状态为成功
set_failure(url) 存储url状态为失败
urlpool
`python
#Author: veelionimport pickle
import leveldb
import time
import urllib.parse as urlparse
from urllib.parse import urlparseclass UrlPool:'''爬虫URL池管理类,用于调度和跟踪URL状态'''def __init__(self, pool_name):"""初始化URL池参数:pool_name (str): URL池名称,用于标识数据库和缓存文件"""self.name = pool_name # URL池名称self.db = UrlDB(pool_name) # 底层存储:使用UrlDB记录URL处理状态(成功/失败)# 核心数据结构self.waiting = {} # {主机: URL集合} 按主机分组的待下载URLself.pending = {} # {URL: 取出时间} 已取出但未完成下载的URL(处理中)self.failure = {} # {URL: 失败次数} 下载失败的URL及其重试次数self.failure_threshold = 3 # 最大失败次数,超过则标记为永久失败self.pending_threshold = 10 # pending状态超时时间(秒),超时后重新加入队列self.waiting_count = 0 # 待下载URL总数self.max_hosts = ['', 0] # 记录当前URL最多的主机及其数量self.hub_pool = {} # {枢纽URL: 上次访问时间} 存放重要URL(如首页)self.hub_refresh_span = 0 # 枢纽URL刷新间隔(秒)self.load_cache() # 初始化时加载缓存数据def __del__(self):"""析构函数:程序退出时自动保存缓存"""self.dump_cache()def load_cache(self):"""从文件加载缓存数据(waiting状态)"""path = self.name + '.pkl'try:with open(path, 'rb') as f:self.waiting = pickle.load(f) # 加载待下载URL分组数据# 计算总URL数并打印日志cc = [len(v) for k, v in self.waiting.items()]print(f'saved pool loaded! urls: {sum(cc)}')except:pass # 首次运行或缓存损坏时忽略错误def dump_cache(self):"""将缓存数据(waiting状态)保存到文件"""path = self.name + '.pkl'try:with open(path, 'wb') as f:pickle.dump(self.waiting, f) # 序列化待下载URL分组数据print('self.waiting saved!')except:pass # 保存失败时忽略错误def set_hubs(self, urls, hub_refresh_span):"""设置枢纽URL及其刷新间隔参数:urls (list): 枢纽URL列表(如网站首页)hub_refresh_span (int): 刷新间隔(秒),控制枢纽URL的抓取频率"""self.hub_refresh_span = hub_refresh_spanself.hub_pool = {url: 0 for url in urls} # 初始化枢纽URL池def set_status(self, url, status_code):"""更新URL状态(成功/失败/重试)参数:url (str): 目标URLstatus_code (int): 下载状态码(200=成功,404=永久失败,其他=可重试)"""if url in self.pending:self.pending.pop(url) # 从处理中列表移除if status_code == 200:self.db.set_success(url) # 标记为成功elif status_code == 404:self.db.set_failure(url) # 标记为永久失败else:# 处理可重试的失败情况if url in self.failure:self.failure[url] += 1# 超过失败阈值则标记为永久失败if self.failure[url] > self.failure_threshold:self.db.set_failure(url)self.failure.pop(url)else:self.add(url) # 重新加入待下载队列else:self.failure[url] = 1self.add(url) # 首次失败,加入队列def push_to_pool(self, url):"""将URL按主机分组加入待下载队列参数:url (str): 目标URL返回:bool: 操作是否成功(非法URL会被过滤)"""# 解析URL主机部分,过滤非法URLparsed = urlparse(url)host = parsed.netlocif not host or '.' not in host:print(f'try to push_to_pool with bad url: {url}, len: {len(url)}')return False# 更新主机分组数据if host in self.waiting:if url in self.waiting[host]:return True # 避免重复添加self.waiting[host].add(url)# 更新最大URL主机记录if len(self.waiting[host]) > self.max_hosts[1]:self.max_hosts = [host, len(self.waiting[host])]else:self.waiting[host] = {url} # 初始化主机分组self.waiting_count += 1 # 总URL数+1return Truedef add(self, url, always=False):"""向URL池添加URL(自动去重和状态检查)参数:url (str): 目标URLalways (bool): 是否强制添加(忽略已存在状态)"""if always:return self.push_to_pool(url) # 强制添加# 检查是否正在处理中(pending状态)pended_time = self.pending.get(url, 0)if time.time() - pended_time < self.pending_threshold:print(f'being downloading: {url}')return # 处理中,跳过# 检查是否已处理(成功/失败)if self.db.has(url):return # 已处理,跳过# 移除旧的pending记录if pended_time:self.pending.pop(url)return self.push_to_pool(url) # 正常添加def addmany(self, urls, always=False):"""批量添加URL(支持字符串或列表)参数:urls (str/list): URL或URL列表always (bool): 是否强制添加(忽略已存在状态)"""if isinstance(urls, str):print(f'urls is a str !!!! {urls}')self.add(urls, always)else:for url in urls:self.add(url, always)def pop(self, count, hub_percent=50):"""按策略取出URL(优先枢纽URL和热门主机)参数:count (int): 需取出的URL数量hub_percent (int): 枢纽URL占比(%)返回:dict: {URL: 类型标识},类型1=枢纽URL,0=普通URL"""print(f"\n\tmax of host: {self.max_hosts}")url_attr_url = 0 # 普通URL标识url_attr_hub = 1 # 枢纽URL标识# 1. 优先取出枢纽URLhubs = {}hub_count = count * hub_percent // 100 # 计算枢纽URL数量for hub in list(self.hub_pool.keys()): # 使用list()避免迭代时修改字典span = time.time() - self.hub_pool[hub]if span >= self.hub_refresh_span: # 超过刷新间隔则取出hubs[hub] = url_attr_hubself.hub_pool[hub] = time.time() # 更新最后访问时间if len(hubs) >= hub_count:break # 达到数量上限则停止# 2. 取出普通URL(按主机分组优先,热门主机优先)left_count = count - len(hubs)urls = {}for host in list(self.waiting.keys()): # 使用list()避免迭代时修改字典if not self.waiting[host]:continue # 空分组跳过# 从分组中取出一个URLurl = self.waiting[host].pop()urls[url] = url_attr_urlself.pending[url] = time.time() # 标记为处理中# 更新热门主机记录if host == self.max_hosts[0]:self.max_hosts[1] -= 1if self.max_hosts[1] == 0:self.max_hosts[0] = '' # 主机分组为空时重置if len(urls) >= left_count:break # 达到数量上限则停止self.waiting_count -= len(urls) # 更新待下载总数print(f'To pop:{count}, hubs: {len(hubs)}, urls: {len(urls)}, hosts:{len(self.waiting)}')urls.update(hubs) # 合并枢纽URL和普通URLreturn urlsdef size(self):"""返回待下载URL总数"""return self.waiting_countdef empty(self):"""判断URL池是否为空"""return self.waiting_count == 0
```
我们构建了一个urlpool池类
这个池子初始状态
ef __init__(self, pool_name):"""初始化URL池参数:pool_name (str): URL池名称,用于标识数据库和缓存文件"""self.name = pool_name # URL池名称self.db = UrlDB(pool_name) # 底层存储:使用UrlDB记录URL处理状态(成功/失败)# 核心数据结构self.waiting = {} # {主机: URL集合} 按主机分组的待下载URLself.pending = {} # {URL: 取出时间} 已取出但未完成下载的URL(处理中)self.failure = {} # {URL: 失败次数} 下载失败的URL及其重试次数self.failure_threshold = 3 # 最大失败次数,超过则标记为永久失败self.pending_threshold = 10 # pending状态超时时间(秒),超时后重新加入队列self.waiting_count = 0 # 待下载URL总数self.max_hosts = ['', 0] # 记录当前URL最多的主机及其数量self.hub_pool = {} # {枢纽URL: 上次访问时间} 存放重要URL(如首页)self.hub_refresh_span = 0 # 枢纽URL刷新间隔(秒)self.load_cache() # 初始化时加载缓存数据
. 网址的管理
网址的管理,其目的就是为了:不重抓,不漏抓。
,然后我们后面
这里的分组是为了避免多次请求一个url
def __del__(self):
"""析构函数:程序退出时自动保存缓存"""
self.dump_cache()
用这个进行缓存设置,
def load_cache(self):
"""从文件加载缓存数据(waiting状态)"""
path = self.name + '.pkl'
try:
with open(path, 'rb') as f:
self.waiting = pickle.load(f) # 加载待下载URL分组数据
# 计算总URL数并打印日志
cc = [len(v) for k, v in self.waiting.items()]
print(f'saved pool loaded! urls: {sum(cc)}')
except:
pass # 首次运行或缓存损坏时忽略错误
用的库是pkl库,在内存中进行加载,缓存,先把文件中的url放入缓存池子中
然后构建一个等待的池子
def dump_cache(self):
"""将缓存数据(waiting状态)保存到文件"""
path = self.name + '.pkl'
try:
with open(path, 'wb') as f:
pickle.dump(self.waiting, f) # 序列化待下载URL分组数据
print('self.waiting saved!')
except:
pass # 保存失败时忽略错误
将这个序列化等待的url,将这里全部装进去
后面就是抓取门户网站首页了
def set_hubs(self, urls, hub_refresh_span):
"""设置枢纽URL及其刷新间隔
参数:
urls (list): 枢纽URL列表(如网站首页)
hub_refresh_span (int): 刷新间隔(秒),控制枢纽URL的抓取频率
"""
self.hub_refresh_span = hub_refresh_span
self.hub_pool = {url: 0 for url in urls} # 初始化枢纽URL池
更新url状态池子
def set_status(self, url, status_code):
"""更新URL状态(成功/失败/重试)
参数:
url (str): 目标URL
status_code (int): 下载状态码(200=成功,404=永久失败,其他=可重试)
"""
if url in self.pending:
self.pending.pop(url) # 从处理中列表移除
if status_code == 200:
self.db.set_success(url) # 标记为成功
elif status_code == 404:
self.db.set_failure(url) # 标记为永久失败
else:
# 处理可重试的失败情况
if url in self.failure:
self.failure[url] += 1
# 超过失败阈值则标记为永久失败
if self.failure[url] > self.failure_threshold:
self.db.set_failure(url)
self.failure.pop(url)
else:
self.add(url) # 重新加入待下载队列
else:
self.failure[url] = 1
self.add(url) # 首次失败,加入队列
这里就是更新url状态码了。这里有三种状态,成功、失败、、重试
如果当前url在pending 中,即超时列表中,请求时间太长了 ,那么重新请求后,就将这个移除了
再用if status_code == 200:
self.db.set_success(url) # 标记为成功
elif status_code == 404:
self.db.set_failure(url) # 标记为永久失败
else:
这个返回的状态码的情况,看哪些是成功,哪些是失败
这里失败后的情况,也分几种
if url in self.failure:
self.failure[url] += 1
# 超过失败阈值则标记为永久失败
if self.failure[url] > self.failure_threshold:
self.db.set_failure(url)
self.failure.pop(url)
else:
self.add(url) # 重新加入待下载队列
else:
self.failure[url] = 1
self.add(url。
如果这个url在失败url中,在这个失败的队列中的url的次数增加1(这个url的),因为是字典,键值对,。如果尝试的次数大于这个阈值的话,就永久失效,将这个在数据库中标记,并且在失败字典中弹出去
否则再次尝试。
如果是第一次失败,将其加入失败队列中。
def push_to_pool(self, url):
"""将URL按主机分组加入待下载队列
参数:
url (str): 目标URL
返回:
bool: 操作是否成功(非法URL会被过滤)
"""
# 解析URL主机部分,过滤非法URL
parsed = urlparse(url)
host = parsed.netloc
if not host or '.' not in host:
print(f'try to push_to_pool with bad url: {url}, len: {len(url)}')
return False
# 更新主机分组数据
if host in self.waiting:
if url in self.waiting[host]:
return True # 避免重复添加
self.waiting[host].add(url)
# 更新最大URL主机记录
if len(self.waiting[host]) > self.max_hosts[1]:
self.max_hosts = [host, len(self.waiting[host])]
else:
self.waiting[host] = {url} # 初始化主机分组
self.waiting_count += 1 # 总URL数+1
return True
这里有过滤url,避免那些乱七八糟的url进入,
if host in self.waiting:
if url in self.waiting[host]:
return True # 避免重复添加
self.waiting[host].add(url)
host主域名添加进去
代码功能总览
这段代码的作用是将 URL 按主机(域名)分组存储到self.waiting
字典中,并:
- 检查同一主机下是否已存在该 URL,避免重复添加
- 跟踪每个主机的 URL 数量,更新「热门主机」记录
- 维护待下载 URL 的总数统计
代码逐行解析
python
运行
if host in self.waiting:# 情况1:主机分组已存在if url in self.waiting[host]:return True # 同一主机下已存在该URL,直接返回(去重)self.waiting[host].add(url) # 向主机分组添加新URL# 更新「热门主机」记录if len(self.waiting[host]) > self.max_hosts[1]:self.max_hosts = [host, len(self.waiting[host])]
else:# 情况2:主机分组不存在,初始化新分组self.waiting[host] = {url} # 创建新的主机分组并添加URLself.waiting_count += 1 # 待下载URL总数+1
return True
核心逻辑拆解
1. 主机分组管理
- 数据结构:
self.waiting
是一个字典,格式为{主机: URL集合}
,例如:python
运行
{"example.com": {"http://example.com/a", "http://example.com/b"},"news.com": {"http://news.com/c", "http://news.com/d"} }
- 分组目的:按主机分组可实现「同一主机每次只取一个 URL」的调度策略,避免集中请求同一服务器,降低被封 IP 的风险。
2. URL 去重机制
- 当向主机分组添加 URL 时,先检查
self.waiting[host]
中是否已存在该 URL - 若存在则直接返回
True
,跳过添加操作,确保同一主机下无重复 URL
3. 热门主机跟踪
self.max_hosts
记录当前 URL 最多的主机及其数量,格式为[主机名, 数量]
- 每次向主机分组添加 URL 后,若该分组的 URL 数量超过当前
max_hosts
,则更新max_hosts
- 作用:在
pop()
方法中优先处理热门主机的 URL,提高爬虫效率
4. 总数统计
self.waiting_count
维护待下载 URL 的总数,用于size()
和empty()
方法的快速查询
设计意图与应用场景
为什么按主机分组?
- 爬虫友好性:避免对单一服务器发起大量请求,符合「爬虫伦理」
- 调度优化:热门主机(URL 多的主机)会被优先处理,提升抓取效率
- 故障隔离:若某主机响应慢或封禁 IP,仅影响该主机的 URL 处理
与其他模块的联动
- 与
pop()
方法配合:按主机分组取出 URL 时,每次从不同主机取一个,实现「分散请求」 - 与
max_hosts
配合:优先处理热门主机,确保重要网站的 URL 被及时抓取 - 与
self.waiting_count
配合:快速判断 URL 池是否为空,控制爬虫流程
优化建议
- 线程安全问题:若在多线程环境下使用,需添加锁机制避免
self.waiting
的并发修改 - 主机分组大小限制:可添加阈值,当某主机 URL 数量超过阈值时暂停添加,防止单一主机占用过多资源
- URL 去重增强:可结合布隆过滤器(Bloom Filter)优化去重性能,减少内存占用
分完后,实现添加url的功能
def add(self, url, always=False):
"""向URL池添加URL(自动去重和状态检查)
参数:
url (str): 目标URL
always (bool): 是否强制添加(忽略已存在状态)
"""
if always:
return self.push_to_pool(url) # 强制添加
# 检查是否正在处理中(pending状态)
pended_time = self.pending.get(url, 0)
if time.time() - pended_time < self.pending_threshold:
print(f'being downloading: {url}')
return # 处理中,跳过
# 检查是否已处理(成功/失败)
if self.db.has(url):
return # 已处理,跳过
# 移除旧的pending记录
if pended_time:
self.pending.pop(url)
return self.push_to_pool(url) # 正常添加
一个是强制添加的。self.push_to_pool(url)]
如果db已经有了跳过
如果正在添加,那么将 if pended_time:
self.pending.pop(url)这个移除
多个url,批次添加,如果是[url,url2.url3]这样子的情况,就需要for循环或者,其他方式添加
def addmany(self, urls, always=False):
"""批量添加URL(支持字符串或列表)
参数:
urls (str/list): URL或URL列表
always (bool): 是否强制添加(忽略已存在状态)
"""
if isinstance(urls, str):
print(f'urls is a str !!!! {urls}')
self.add(urls, always)
else:
for url in urls:
self.add(url, always)
def pop(self, count, hub_percent=50):
"""按策略取出URL(优先枢纽URL和热门主机)
参数:
count (int): 需取出的URL数量
hub_percent (int): 枢纽URL占比(%)
返回:
dict: {URL: 类型标识},类型1=枢纽URL,0=普通URL
"""
print(f"\n\tmax of host: {self.max_hosts}")
url_attr_url = 0 # 普通URL标识
url_attr_hub = 1 # 枢纽URL标识# 1. 优先取出枢纽URL
hubs = {}
hub_count = count * hub_percent // 100 # 计算枢纽URL数量
for hub in list(self.hub_pool.keys()): # 使用list()避免迭代时修改字典
span = time.time() - self.hub_pool[hub]
if span >= self.hub_refresh_span: # 超过刷新间隔则取出
hubs[hub] = url_attr_hub
self.hub_pool[hub] = time.time() # 更新最后访问时间
if len(hubs) >= hub_count:
break # 达到数量上限则停止# 2. 取出普通URL(按主机分组优先,热门主机优先)
left_count = count - len(hubs)
urls = {}
for host in list(self.waiting.keys()): # 使用list()避免迭代时修改字典
if not self.waiting[host]:
continue # 空分组跳过
# 从分组中取出一个URL
url = self.waiting[host].pop()
urls[url] = url_attr_url
self.pending[url] = time.time() # 标记为处理中
# 更新热门主机记录
if host == self.max_hosts[0]:
self.max_hosts[1] -= 1
if self.max_hosts[1] == 0:
self.max_hosts[0] = '' # 主机分组为空时重置
if len(urls) >= left_count:
break # 达到数量上限则停止
self.waiting_count -= len(urls) # 更新待下载总数
print(f'To pop:{count}, hubs: {len(hubs)}, urls: {len(urls)}, hosts:{len(self.waiting)}')
urls.update(hubs) # 合并枢纽URL和普通URL
return urls
pop弹出设置
# 初始化时设置枢纽URL(如新闻网站首页)
pool.set_hubs(["https://news.example.com", "https://tech.example.com"], 86400) # 每天刷新一次# 每次取出10个URL,其中50%为枢纽URL
urls = pool.pop(10, hub_percent=50)
# 返回结果可能是:
# {
# "https://news.example.com": 1, # 枢纽URL
# "https://tech.example.com": 1, # 枢纽URL
# "https://blog.another-site.com/article1": 0, # 普通URL
# "https://forum.site.com/thread2": 0, # 普通URL
# ...
# }