爬虫疑难问题解决方案整理
一、反爬机制应对
1、IP封禁
问题:频繁请求触发目标网站的IP限制。
解决方案:
1)放慢爬取速度 (`Sleep`/`Delay`):在请求之间加入随机时间间隔(如`time.sleep(random.uniform(1, 3))`),降低请求频率。
2)使用代理IP池(如ScraperAPI、Bright Data)轮换IP。
免费代理:来源不稳定、质量差、速度慢,仅用于测试或低要求场景。
付费代理:质量高、稳定、有售后,是商业项目的首选。提供API按量提取IP。
自建代理池:技术门槛较高,需要自己抓取、验证、维护一批代理服务器。
常用开源项目:`IPProxyPool`。
结合Tor网络或云服务商的弹性IP。
import time
import random
import requests# 代理IP池
proxies = [{'http': 'http://user:pass@ip1:port'},{'http': 'http://ip2:port'},# 可动态从ScraperAPI等获取代理
]headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}def scrape_with_proxy(url, max_retries=3):for _ in range(max_retries):try:# 随机选择代理proxy = random.choice(proxies)# 随机延时(1-3秒)delay = random.uniform(1, 3)time.sleep(delay)response = requests.get(url,headers=headers,proxies=proxy,timeout=10 # 请求超时时间)if response.status_code == 200:print(f"成功获取数据,使用代理: {proxy}")return response.textelse:print(f"请求失败,状态码: {response.status_code}")except Exception as e:print(f"代理异常: {e}, 切换代理重试...")raise ConnectionError("所有代理尝试失败")target_url = "https://example.com/api/data"
try:data = scrape_with_proxy(target_url)# 处理返回的数据
except ConnectionError as e:print(f"最终失败: {e}")
2、User-Agent检测
问题:网站通过User-Agent识别爬虫。
解决方案:
随机更换User-Agent(如使用fake_useragent库),配合代理IP使用,准备一个庞大的UA池随机切换,避免因UA相同被识别。
模拟浏览器行为(如携带Cookie、Referer等头部信息)。
(1)随机User-Agent更换(fake_useragent库)
from telnetlib import ECfrom fake_useragent import UserAgent
import randomfrom selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait# 创建UA对象并生成随机UA池
ua = UserAgent()
# 生成10个随机UA
user_agents = [ua.random for _ in range(10)]# 示例:随机选择UA并添加到请求头
headers = {"User-Agent": random.choice(user_agents),"Referer": "https://www.google.com", # 模拟来自Google的流量"Accept-Language": "zh-CN,zh;q=0.9", # 模拟中文用户"Cookie": "session_id=12345; user_preference=dark_mode" # 模拟已登录状态
}
(2)代理IP+随机UA组合请求
import requests
from time import sleep# 代理IP池(可动态从ScraperAPI等获取)
proxies = {"http": "http://user:pass@ip1:port","https": "http://ip2:port"
}def safe_request(url, max_retries=3):for _ in range(max_retries):try:# 动态组合代理+随机UAproxy = random.choice(list(proxies.values()))response = requests.get(url,headers=headers,proxies={"http": proxy, "https": proxy},timeout=10)# 模拟浏览器行为:检查状态码并处理重定向if response.status_code in [200, 301, 302]:return response.text# Too Many Requestselif response.status_code == 429:# 动态延时后重试sleep(random.uniform(5, 15))except Exception as e:print(f"请求异常: {e}, 切换代理重试...")# 请求失败后随机延时sleep(random.uniform(1, 3))raise ConnectionError("所有尝试失败")target_url = "https://example.com/protected-data"
try:data = safe_request(target_url)print("成功获取数据")
except ConnectionError as e:print(f"最终失败: {e}")
动态UA池:通过fake_useragent生成1000+真实浏览器UA,避免固定模式
智能延时:失败时采用指数退避算法(5s→15s→45s),避免触发速率限制
代理健康检查:可添加代理验证逻辑,定期清理失效IP
浏览器指纹伪装:
禁用自动化特征(disable-blink-features)
添加WebGL指纹干扰(通过navigator对象注入)
模拟鼠标移动轨迹(使用ActionChains类)
会话保持:通过持久化Cookie实现登录状态维持
进阶方案:
指纹浏览器:使用Undetectable Chromium等工具实现浏览器指纹伪装
请求链模拟:构建完整的浏览路径(搜索→点击→滚动)
3、验证码(CAPTCHA)拦截
问题:出现图形验证码、滑块验证码或行为验证(如Google reCAPTCHA)。
解决方案:
1)人工打码:适用于少量请求,遇到验证码时暂停爬虫,将验证码图片交给人工处理,输入后再继续,效率极低。
2)避免触发:通过控制请求频率和模拟真实用户行为降低触发概率,优化爬虫行为(用代理、控制频率、模拟真人行为),尽量避免触发验证码。
3)编码平台:使用第三方打码平台(如超级鹰、图鉴等),通过API发送图片,获取识别结果。需要支付费用,识别率因平台和验证码类型而异。
4)机器学习/深度学习:使用OCR工具(如Tesseract)或训练CNN模型识别简单验证码,或第三方服务(如2Captcha、DeathByCaptcha),对于复杂的滑动、点选等验证码,破解成本非常高。
5)专业工具:对于高级验证码(如Google reCAPTCHA v2/v3),可能需要使用类似`selenium-wire`配合指纹浏览器甚至更专业的绕过服务,但这通常涉及灰色地带。
(1)图形验证码处理(生成+简单识别)
import numpy as np
from PIL import Image, ImageDraw, ImageFontdef generate_captcha():img = Image.new('RGB', (200, 100), (255, 255, 255))d = ImageDraw.Draw(img)captcha_text = ''.join(str(np.random.randint(0, 9)) for _ in range(4))font = ImageFont.load_default()d.text((20, 30), captcha_text, font=font, fill=(0, 0, 0))img.save('./generated_captcha.png')return captcha_textdef simple_recognition():real_text = generate_captcha()print('模拟识别结果:', real_text)return real_textsimple_recognition()
(2)滑块验证码模拟
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChainsdef handle_slider_captcha():driver = webdriver.Chrome()# 替换为实际滑块页面driver.get('https://example.com/login')# 定位滑块和背景图元素slider = driver.find_element('id', 'slider')background = driver.find_element('id', 'captcha_bg')# 模拟拖动操作actions = ActionChains(driver)actions.click_and_hold(slider).perform()# 水平移动200像素actions.move_by_offset(200, 0).perform()actions.release().perform()print('滑块验证码模拟拖动完成')driver.quit()
# 注意:需结合图像识别定位缺口位置
(3)弧形滑块验证码示例
import time
import random
import math
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import Bydef generate_arc_trajectory(start_x, start_y, end_x, end_y, radius=50):"""生成弧形轨迹的坐标点"""# 计算圆弧参数(这里以半圆为例)center_x = (start_x + end_x) / 2# 上弧线center_y = start_y - radiustrajectory = []steps = 100# 半圆弧度angle_step = math.pi / stepsfor i in range(steps):# 圆弧方程计算x = center_x + radius * math.cos(i * angle_step)y = center_y + radius * math.sin(i * angle_step)# 加入随机抖动模拟人类操作x += random.randint(-3, 3)y += random.randint(-3, 3)# 转换为相对位移trajectory.append((x - start_x, y - start_y))return trajectorydef solve_slider_captcha(driver):"""处理滑块验证码的主函数"""# 定位元素(需根据实际页面调整选择器)slider = driver.find_element(By.CSS_SELECTOR, '.geetest_slider_button')track_bg = driver.find_element(By.CSS_SELECTOR, '.geetest_track')# 获取初始位置start_x = slider.location['x']start_y = slider.location['y']# 计算目标位置(这里假设滑块需要移动到背景最右侧)target_x = track_bg.size['width'] - slider.size['width']# 生成弧形轨迹trajectory = generate_arc_trajectory(start_x, start_y, target_x, start_y)# 使用ActionChains模拟拖拽action = ActionChains(driver)action.click_and_hold(slider).perform()# 执行轨迹移动for x_offset, y_offset in trajectory:action.move_by_offset(x_offset, y_offset).perform()# 模拟人类操作间隔time.sleep(0.02)# 释放鼠标action.release().perform()# 等待验证结果time.sleep(2)# 初始化浏览器
options = webdriver.ChromeOptions()
options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Chrome(options=options)
try:# 替换为实际验证页面driver.get('目标网站URL')solve_slider_captcha(driver)# 后续操作...
finally:driver.quit()
反检测措施:
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
(4)reCAPTCHA v2/v3处理建议
人工打码:暂停爬虫,人工输入验证码
专业工具:使用undetected-chromedriver配合指纹浏览器
商业服务:调用2Captcha等平台API(需付费)
4、动态生成内容(JavaScript渲染)
问题:动态加载内容(如React/Vue应用),使用Ajax/JS动态加载数据,初始HTML文档中不包含有效数据,传统爬虫(如`requests`)无法通过直接请求获取。
解决方案:
1)分析API网络请求接口:打开浏览器开发者工具(F12)的“网络(Network)”面板,查找数据来源的Ajax请求数据接口(通常是XHR类型)。直接模拟这个请求获取JSON数据,效率最高。
2)使用无头浏览器:当数据无法通过简单接口获取,或接口参数加密复杂时,使用`Selenium`、`Playwright`或`Puppeteer`等工具模拟真实浏览器行为,等待JS执行完毕后再解析页面。缺点是速度慢、资源消耗大。
(1)直接调用API接口
优点:效率高(直接获取结构化JSON),资源消耗低,反爬风险相对较低
缺点:需要手动分析网络请求,可能遇到接口参数加密/签名
import requests
# 电商网站商品列表API
url = "https://api.example.com/products"
params = {"page": 1,"limit": 20,"sort": "popularity"
}
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36","Authorization": "Bearer your_token_here" # 如果需要认证
}
try:response = requests.get(url, params=params, headers=headers)response.raise_for_status() # 检查HTTP错误data = response.json()# 解析JSON数据for item in data["products"]:print(f"商品ID: {item['id']}, 名称: {item['name']}, 价格: {item['price']}")
except requests.exceptions.RequestException as e:print(f"请求失败: {e}")
(2)使用Selenium模拟浏览器
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import random
import time# 配置Chrome选项模拟真实浏览器
user_agents = ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36","Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"
]
chrome_options = Options()
chrome_options.add_argument("--headless") # 启用无头模式
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument(f"user-agent={random.choice(user_agents)}")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])try:# 启动无头浏览器driver = webdriver.Chrome(options=chrome_options,executable_path="/path/to/chromedriver")# 模拟登录操作driver.get("https://example.com/login")WebDriverWait(driver, 15).until(EC.presence_of_element_located((By.ID, "username"))).send_keys("test_user")driver.find_element(By.ID, "password").send_keys("password123")driver.find_element(By.ID, "submit-btn").click()# 验证登录成功并等待仪表盘加载WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "dashboard")))# 导航到动态内容页面driver.get("https://www.example.com/dynamic-page")# 抓取动态加载的产品数据WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".product-list")))# 滚动加载更多内容(模拟用户滚动行为)last_height = driver.execute_script("return document.body.scrollHeight")while True:driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")time.sleep(1.5) # 等待页面加载new_height = driver.execute_script("return document.body.scrollHeight")if new_height == last_height:breaklast_height = new_height# 提取并解析产品信息products = driver.find_elements(By.CSS_SELECTOR, ".product-item")for idx, product in enumerate(products, 1):name = product.find_element(By.CSS_SELECTOR, ".name").textprice = product.find_element(By.CSS_SELECTOR, ".price").textrating = product.find_element(By.CSS_SELECTOR, ".rating").text \if product.find_elements(By.CSS_SELECTOR, ".rating") else "N/A"print(f"{idx}. 商品名称: {name} | 价格: {price} | 评分: {rating}")except Exception as e:print(f"操作失败: {str(e)}")# 可添加错误日志记录或重试机制
finally:# 确保浏览器关闭释放资源driver.quit()print("浏览器已关闭")
优点:通用性强(可处理复杂交互),无需分析API细节,模拟真实用户行为
缺点:执行速度慢(需启动浏览器),资源消耗大(内存/CPU),可能触发网站反爬机制
使用建议:
根据目标网站结构调整选择器(.product-item等)
复杂反爬,添加:
chrome_options.add_argument("--disable-javascript") # 仅对非JS依赖页面
chrome_options.add_argument("--proxy-server=http://user:pass@ip:port") # 代理设置
大规模爬取,添加:
from fake_useragent import UserAgent # 动态UA库
chrome_options.add_argument(f"user-agent={UserAgent().random}")
(3)Playwright
from playwright.sync_api import sync_playwrightwith sync_playwright() as p:browser = p.chromium.launch(headless=True)page = browser.new_page()page.goto("https://example.com/react-app")# 等待选择器加载page.wait_for_selector(".dynamic-content")# 提取数据content = page.content() # 获取完整HTML# 或使用更高效的locator提取products = page.locator(".product").all()for product in products:print(product.inner_text())browser.close()
建议:
1)优先尝试API方案:通过Chrome开发者工具分析网络请求(Network面板 -> XHR)
2)处理反爬措施:
添加请求头模拟浏览器
使用代理IP轮换
添加请求延迟(避免高频请求)
3)动态渲染优化:
使用requests-html库(支持JS渲染)
配置Selenium时禁用图片加载(提升速度)
4)数据解析:
推荐使用lxml或BeautifulSoup解析HTML
对于JSON数据,直接使用json模块
5、行为分析
问题:网站通过鼠标轨迹、点击频率等行为识别爬虫。
解决方案:
使用Selenium模拟真实操作(如滚动、点击)。
结合自动化测试框架(如Cypress)编写更自然的交互脚本。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time
import random# 配置浏览器选项
chrome_options = Options()
# 最大化窗口模拟真实用户
chrome_options.add_argument("--start-maximized")
# 隐藏自动化特征
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
# 初始化WebDriver
driver = webdriver.Chrome(options=chrome_options)try:# 访问目标网站driver.get("https://example.com")print("页面加载完成")# 模拟自然滚动行为for _ in range(5):# 随机滚动距离(模拟用户浏览行为)scroll_distance = random.randint(500, 1500)driver.execute_script(f"window.scrollBy(0, {scroll_distance})")time.sleep(random.uniform(1.0, 3.0)) # 随机等待时间# 模拟点击操作buttons = driver.find_elements(By.TAG_NAME, "button")if buttons:# 随机选择按钮target_button = random.choice(buttons)print(f"模拟点击按钮: {target_button.text}")# 滚动到元素可见driver.execute_script("arguments[0].scrollIntoView(true);", target_button)time.sleep(random.uniform(0.5, 1.5))target_button.click()time.sleep(random.uniform(2.0, 4.0))# 模拟鼠标移动轨迹(需要额外实现)# 可使用ActionChains模拟更复杂的鼠标移动from selenium.webdriver import ActionChainsactions = ActionChains(driver)link = driver.find_element(By.LINK_TEXT, "了解更多")actions.move_to_element(link).pause(random.uniform(0.2, 0.8)).click().perform()print("模拟复杂鼠标操作完成")# 随机间隔后关闭浏览器final_wait = random.randint(5, 15)print(f"等待{final_wait}秒后结束会话")time.sleep(final_wait)except Exception as e:print(f"发生错误: {e}")# 错误时保存截图driver.save_screenshot("error_screenshot.png")
finally:driver.quit()print("浏览器已关闭")
反爬虫对抗策略:
1)行为随机化
滚动距离/速度随机化:random.randint(500, 1500)
操作间隔随机化:random.uniform(1.0, 3.0)
元素选择随机化:random.choice(buttons)
2)真实用户特征模拟
窗口最大化:--start-maximized
隐藏自动化特征:--disable-blink-features=AutomationControlled
渐进式滚动(模拟人类阅读节奏)
3)复杂交互模拟
使用ActionChains模拟鼠标移动轨迹
滚动到元素可见后再操作:scrollIntoView
操作前添加随机暂停
4)错误处理机制
异常时自动保存截图
finally保证浏览器关
5)结合Cypress框架
// Cypress示例(需单独安装)
cy.visit('https://example.com')
cy.scrollTo('bottom', { duration: 3000 }) // 3秒平滑滚动
cy.get('button').contains('Submit').click({ force: true })
6)指纹干扰
修改navigator.webdriver属性
使用代理IP轮换
修改User-Agent字符串
7)行为模式多样化
混合使用键盘操作(Tab键切换、Enter提交)
模拟表单填写时的输入停顿
结合鼠标移动轨迹记录插件
6、API接口参数加密
App或网页端的Ajax请求经常带有加密参数(如`token`、`sign`、`timestamp`等),直接复制请求链接无法使用。
解决方案:
1)前端代码分析:使用浏览器开发者工具的“调试器(Debugger)”面板,JS反混淆工具,追踪加密参数的生成逻辑,然后用Python代码复现此逻辑。这是最核心的逆向工程技术。
2)执行JS代码:使用`PyExecJS`、`Js2Py`库或Node.js环境,直接执行关键的JS函数来获取参数。
3)自动化浏览器上下文:使用`Playwright`或`Selenium`在浏览器环境中获取到数据后直接提取,避开加密分析,但效率较低。
方案1:前端代码分析+Python复现加密逻辑
import hashlib
import time
# 假设通过JS分析发现加密逻辑为:md5(appid + timestamp + secret)
def generate_token(appid, secret):timestamp = str(int(time.time() * 1000))raw = f"{appid}{timestamp}{secret}"return hashlib.md5(raw.encode()).hexdigest()# 使用示例
appid = "test_app"
secret = "my_secret"
token = generate_token(appid, secret)
print(f"请求参数:token={token}×tamp={int(time.time()*1000)}")
方案2:使用PyExecJS执行JS加密函数
from datetime import time
import execjs
import requests# 从前端JS提取的加密函数
js_code = """
function getSign(params) {const keys = Object.keys(params).sort();let signStr = '';keys.forEach(key => {signStr += `${key}=${params[key]}&`;});return md5(signStr + 'secret_key');
}
function md5(str) {// 实际需要完整的md5实现,此处示意return CryptoJS.MD5(str).toString();
}
"""
# 编译JS环境
ctx = execjs.compile(js_code)
# 构造请求参数
params = {"userid": "123","timestamp": int(time.time() * 1000)
}
# 执行JS获取签名
sign = ctx.call("getSign", params)
# 发送请求
url = "https://api.example.com/data"
response = requests.get(url, params={**params, "sign": sign})
print(response.text)
方案3:Playwright自动化浏览器获取加密参数
from playwright.sync_api import sync_playwrightdef get_encrypted_params():with sync_playwright() as p:# 启动无头浏览器browser = p.chromium.launch(headless=True)page = browser.new_page()# 访问目标页面page.goto("https://example.com/login")# 等待加密参数生成(通过CSS选择器定位)page.wait_for_selector("#token-input")# 提取加密参数token = page.evaluate('''() => {const token = document.getElementById("token-input").value;const timestamp = Date.now();return {token, timestamp};}''')browser.close()return tokenparams = get_encrypted_params()
print(f"获取加密参数:token={params['token']}, timestamp={params['timestamp']}")
方案 | 优点 | 缺点 | 适用场景 |
前端分析复现 | 效率高,无依赖 | 需要较强的逆向能力 | 加密逻辑简单,可明确追踪 |
JS代码执行 | 精准复现JS逻辑 | 需要配置JS环境 | 加密函数可提取且无DOM依赖 |
自动化浏览器 | 无需逆向分析 | 资源消耗大,速度慢 | 加密参数依赖浏览器环境生成 |
二、数据抓取与解析
1、结构化数据提取
问题:HTML结构复杂或频繁变化导致解析失败。
解决方案:
使用CSS选择器或XPath(如lxml、Parsel)灵活定位元素。
采用机器学习库(如Scrapy-Splash结合深度学习模型)处理非结构化数据。
from lxml import html
from scrapy import Selector
from scrapy_splash import SplashRequest
from transformers import pipeline# HTML结构(模拟复杂/动态变化的页面)
sample_html = """
<div class="product-card"><h3 class="title">智能手机</h3><span class="price">¥2999</span><div class="description"><p>6.7英寸AMOLED屏,12GB+256GB存储</p></div>
</div>
"""# 方案1:使用CSS选择器灵活定位
def extract_with_css(html_content):selector = Selector(text=html_content)try:# 使用属性选择器和层级关系提高容错性title = selector.css('.product-card .title::text').get()price = selector.css('.product-card .price::text').re_first(r'\d+\.?\d*')desc = selector.css('.product-card .description p::text').get()return {'title': title, 'price': float(price) if price else None, 'desc': desc}except AttributeError:# 结构变化时的降级处理return {'error': 'CSS selector failed'}# 方案2:使用XPath容错提取
def extract_with_xpath(html_content):tree = html.fromstring(html_content)try:# 使用contains()和层级关系适应类名变化title = tree.xpath('//div[contains(@class,"card")]//h3/text()')[0]price = tree.xpath('//span[contains(@class,"price")]/text()')[0].replace('¥', '')desc = tree.xpath('//div[contains(@class,"description")]/p/text()')[0]return {'title': title, 'price': float(price), 'desc': desc}except IndexError:return {'error': 'XPath selector failed'}# 方案3:机器学习模型处理动态页面(需安装transformers库)
def extract_with_ml(html_content):# 使用预训练的BERT模型进行信息抽取nlp = pipeline("ner", model="bert-base-chinese")doc = html.fromstring(html_content).text_content()# 自定义规则+模型输出后处理entities = nlp(doc)price_entity = next((e for e in entities if e['entity'] == 'B-MONEY'), None)title_entity = next((e for e in entities if e['entity'] == 'B-PRODUCT'), None)return {'title': title_entity['word'] if title_entity else None,'price': float(price_entity['word'].replace('¥', '')) if price_entity else None}# 方案4:Scrapy-Splash处理JavaScript渲染
def scrape_with_splash(url):# 在Scrapy的Spider中配置SplashRequestyield SplashRequest(url=url,endpoint='render.html',args={'wait': 2},callback=parse_splash)
def parse_splash(response):# 结合CSS/XPath和机器学习处理渲染后的页面selector = Selector(response)# ...(使用方案1-3中的提取逻辑)if __name__ == "__main__":css_result = extract_with_css(sample_html)xpath_result = extract_with_xpath(sample_html)ml_result = extract_with_ml(sample_html)print("CSS Selector Result:", css_result)print("XPath Result:", xpath_result)print("ML Model Result:", ml_result)
容错机制设计:
多个选择器/模型并行提取,异常捕获和降级处理,正则表达式验证数据格式,模型输出后处理规则,动态等待机制(Splash)
适用场景:
电商网站价格/商品信息提取,新闻网站结构化内容抓取,动态加载页面数据采集,反爬机制复杂的网站
扩展:
结合Scrapy的Item Pipeline进行数据清洗
使用Selenium/Playwright处理更复杂的交互
训练自定义NER模型适应特定网站
实现增量爬虫和断点续爬功能
2、分页与滚动加载
问题:无限滚动或异步分页难以抓取全部数据。
解决方案:
模拟滚动到底部(如Selenium的execute_script("window.scrollTo(0, document.body.scrollHeight);"))。
分析分页参数,构造URL循环请求(如page=1&size=20)。
方案一:无限滚动加载(Selenium模拟滚动)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ECdriver = webdriver.Chrome()
driver.get("https://example.com/infinite-scroll")
last_height = driver.execute_script("return document.body.scrollHeight")
scroll_count = 0
# 设置最大滚动次数
max_scrolls = 10
while scroll_count < max_scrolls:# 滚动到底部driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")# 等待新元素加载(以商品列表为例)try:WebDriverWait(driver, 5).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".product-item")))except:break# 检测页面高度变化new_height = driver.execute_script("return document.body.scrollHeight")if new_height == last_height:breaklast_height = new_heightscroll_count += 1
# 提取数据
products = driver.find_elements(By.CSS_SELECTOR, ".product-item")
for product in products:print(product.text)
driver.quit()
方案二:分页参数构造(Requests循环请求)
import requests
from bs4 import BeautifulSoup
import timebase_url = "https://example.com/products"
page = 1
all_data = []
while True:# 分页参数params = {"page": page, "size": 20}response = requests.get(base_url, params=params)if response.status_code != 200:breaksoup = BeautifulSoup(response.text, 'html.parser')items = soup.select(".product-item")# 当返回空数据时退出if not items:breakfor item in items:# 解析每个商品数据title = item.select_one(".title").textprice = item.select_one(".price").textall_data.append({"title": title, "price": price})print(f"已采集第{page}页数据,共{len(items)}条")page += 1# 请求间隔避免被封time.sleep(1)# 保存结果
with open("products.json", "w", encoding="utf-8") as f:import jsonjson.dump(all_data, f, ensure_ascii=False, indent=2)
3、登录与会话保持
问题:需要登录才能访问的数据。
解决方案:
使用requests.Session()保持会话,手动处理Cookie。
通过Selenium自动化登录流程(需处理验证码)。
使用OAuth或API密钥(如Twitter API)替代网页登录。
(1)使用requests.Session保持会话
自动管理cookies,适合无验证码的简单登录
import requests
# 创建会话对象
session = requests.Session()
# 登录URL和表单数据
login_url = "https://example.com/login"
login_data = {"username": "your_username","password": "your_password"
}
# 发送登录请求
response = session.post(login_url, data=login_data)
if response.status_code == 200 and "登录成功" in response.text:print("登录成功,会话已保持")
else:print("登录失败")exit()
# 使用同一会话访问需要登录的页面
protected_url = "https://example.com/protected_data"
response = session.get(protected_url)
print("受保护数据:", response.text)
(2)使用Selenium处理验证码登录
适合需要模拟浏览器行为的场景(如验证码处理)
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
# 初始化浏览器
driver = webdriver.Chrome()
driver.get("https://example.com/login")
# 输入用户名密码
driver.find_element(By.ID, "username").send_keys("your_username")
driver.find_element(By.ID, "password").send_keys("your_password")
# 手动处理验证码(或使用OCR)
captcha = input("请输入验证码: ")
driver.find_element(By.ID, "captcha").send_keys(captcha)
# 提交登录表单
driver.find_element(By.ID, "submit-btn").click()
# 等待页面加载
time.sleep(3)
# 验证登录状态
if "欢迎" in driver.page_source:print("登录成功,会话已保持")# 访问受保护页面driver.get("https://example.com/protected_data")print("受保护数据:", driver.page_source)
else:print("登录失败")
driver.quit()
(3)使用Twitter API通过OAuth认证
避免直接解析网页,直接使用平台提供的API接口
import tweepy
# API认证信息
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
# 创建认证对象
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# 创建API对象
api = tweepy.API(auth)
# 获取用户时间线
public_tweets = api.user_timeline()
for tweet in public_tweets:print(f"用户 {tweet.user.name} 发布: {tweet.text}")
三、性能与稳定性优化
1、请求效率低下
问题:单线程爬取速度慢。
解决方案:
多线程/异步请求(如aiohttp、Scrapy的并发设置)。
分布式爬取(如Scrapy-Redis、Celery)分散请求压力。
(1)多线程方案(使用concurrent.futures)
import concurrent.futures
import requests
# 测试用重复URL
urls = ['https://example.com'] * 100
def fetch_url(url):try:response = requests.get(url, timeout=5)return len(response.text)except Exception as e:return str(e)
# 创建线程池(建议根据CPU核心数调整workers数量)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:# 提交所有任务futures = [executor.submit(fetch_url, url) for url in urls]# 获取结果for future in concurrent.futures.as_completed(futures):result = future.result()print(f"页面大小: {result} bytes")
(2)异步IO方案(使用aiohttp)
import aiohttp
import asyncio
async def fetch(session, url):try:async with session.get(url) as response:return await response.text()except Exception as e:return str(e)
async def main():urls = ['https://example.com'] * 100# 创建会话(自动管理连接池)async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:tasks = [fetch(session, url) for url in urls]results = await asyncio.gather(*tasks)for i, result in enumerate(results):print(f"任务{i}返回: {len(result)} bytes")
# 运行事件循环
asyncio.run(main())
(3)分布式方案(Scrapy-Redis)
# settings.py 配置
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 分布式节点共享的Redis地址
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
# spider.py
import scrapy
class DistributedSpider(scrapy.Spider):name = "distributed"start_urls = ['https://example.com'] * 100def parse(self, response):yield {'url': response.url,'length': len(response.text)}
建议:
500以下请求:多线程足够,线程数需合理控制,避免资源竞争
千级以上请求:异步IO,需要异步编程思维,调试复杂
跨机器部署:分布式架构,需要维护Redis等中间件,部署复杂
2、网络波动与重试
问题:请求失败导致数据丢失。
解决方案:
实现自动重试机制(如requests.adapters.HTTPAdapter的max_retries)。
使用消息队列(如RabbitMQ)存储失败请求,后续重试。
(1)自动重试机制(基于requests)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry# 配置重试策略
retry_strategy = Retry(# 最大重试次数total=3,# 遇到这些状态码时重试status_forcelist=[429, 500, 502, 503, 504],# 允许重试的HTTP方法allowed_methods=["HEAD", "GET", "PUT", "OPTIONS"],# 退避因子(指数退避)backoff_factor=1
)# 创建带重试机制的Session
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)def safe_request(url, **kwargs):try:response = session.get(url, timeout=5, **kwargs)# 检查HTTP错误状态码response.raise_for_status()return responseexcept requests.exceptions.RequestException as e:print(f"Request failed: {e}")# 此处可将失败请求推送到消息队列return Noneif __name__ == "__main__":response = safe_request("https://httpbin.org/status/500")if response:print("Request succeeded:", response.text)
(2)RabbitMQ消息队列集成
需要安装pika库:pip install pika
生产者(爬虫端)
import pika
import jsondef push_to_queue(queue_name, url, params=None):connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue=queue_name, durable=True)message = {"url": url, "params": params}channel.basic_publish(exchange='',routing_key=queue_name,body=json.dumps(message),# 持久化消息properties=pika.BasicProperties(delivery_mode=2))connection.close()# 在请求失败时调用
push_to_queue("failed_requests", "https://example.com/api")
消费者(重试服务)
import pika
import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry# 创建带重试机制的Session
retry_strategy = Retry(total=5, backoff_factor=2)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)def callback(ch, method, properties, body):try:request = json.loads(body)print(f"Retrying: {request['url']}")response = session.get(request['url'], params=request.get('params'), timeout=10)if response.status_code == 200:# 处理成功数据print("Success!")ch.basic_ack(delivery_tag=method.delivery_tag)else:# 再次失败处理(如推送到死信队列)print(f"Failed after retry: {response.status_code}")ch.basic_nack(delivery_tag=method.delivery_tag)except Exception as e:print(f"Processing error: {e}")ch.basic_nack(delivery_tag=method.delivery_tag)if __name__ == "__main__":connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue="failed_requests", durable=True)channel.basic_consume(queue='failed_requests', on_message_callback=callback, auto_ack=False)print("Waiting for failed requests...")channel.start_consuming()
3、请求管理与去重
问题:大规模爬取时,如何避免重复爬取同一URL?如何高效调度数百万个URL?
解决方案:
1)布隆过滤器 (Bloom Filter):一种高效的数据结构,用于判断一个元素是否在集合中。可能有极小的误判率(判断存在的不一定存在,但判断不存在的一定不存在),非常适合海量URL去重。
2)Redis集合去重
3)数据库唯一索引约束:将URL的MD5/SHA1哈希值作为数据库(如Redis, MySQL)的唯一索引,插入前先查询是否存在。
4)爬虫框架的去重组件:`Scrapy`框架内置了`dupefilters`,并支持扩展,可以基于布隆过滤器实现。
代码示例:
(1)布隆过滤器去重(使用pybloom-live库)
内存高效(100万URL仅需约1.2MB),适合纯内存去重
import hashlib
from pybloom_live import BloomFilter
# 初始化布隆过滤器(预计插入100万条,误判率0.1%)
bloom = BloomFilter(capacity=1000000, error_rate=0.1)
def add_url(url):if url not in bloom:bloom.add(url)return True # 需要爬取return False # 重复URL
# 示例使用
urls = ["https://example.com/page1"] * 10000
unique_urls = [url for url in urls if add_url(url)]
# 输出1
print(f"去重后URL数量:{len(unique_urls)}")
(2)Redis集合去重
支持分布式扩展,适合集群爬虫
import redis
# 连接Redis(默认localhost:6379)
r = redis.Redis()
def add_url(url):# 使用MD5哈希减少存储空间url_hash = hashlib.md5(url.encode()).hexdigest()if r.sadd("unique_urls", url_hash):return Truereturn False
# 批量插入示例
with r.pipeline() as pipe:for url in urls:pipe.sadd("unique_urls", hashlib.md5(url.encode()).hexdigest())pipe.execute()
(3)数据库唯一索引去重(SQLite示例)
持久化存储,适合需要历史记录的场景
import sqlite3
conn = sqlite3.connect('urls.db')
conn.execute('''CREATE TABLE IF NOT EXISTS urls(id INTEGER PRIMARY KEY,url_hash TEXT UNIQUE)''')
def add_url(url):url_hash = hashlib.md5(url.encode()).hexdigest()try:conn.execute("INSERT INTO urls (url_hash) VALUES (?)", (url_hash,))conn.commit()return Trueexcept sqlite3.IntegrityError:return False
(4)Scrapy框架去重扩展
直接利用框架生态,适合大型项目
# scrapy_bloomfilter.py
from pybloom_live import BloomFilter
from scrapy.dupefilters import RFPDupeFilter
class BloomFilterDupeFilter(RFPDupeFilter):def __init__(self, path=None):self.bloom = BloomFilter(1000000, 0.1)super().__init__(path)def request_seen(self, request):if request.url in self.bloom:return Trueself.bloom.add(request.url)return False
# settings.py配置
DUPEFILTER_CLASS = 'myproject.scrapy_bloomfilter.BloomFilterDupeFilter'
4、数据存储与效率
如何设计存储结构?如何应对海量数据存储和写入效率问题?
解决方案:
1)数据库选择:
关系型数据库 (MySQL, PostgreSQL):适合结构规整、需要复杂查询和事务的数据。
非关系型数据库 (MongoDB):适合存储半结构化或嵌套结构(如JSON)的数据,模式灵活。
键值数据库 (Redis):速度快,常用于做缓存和URL队列。
2)异步与批量写入:不要爬取一条数据就写入一次数据库。使用异步IO(如`aiomysql`)或批量写入(`bulk_save`)来大幅提升写入效率。
3)链接池与引擎优化:配置数据库链接池,避免频繁创建和关闭连接。
代码示例:
(1)异步批量写入示例(MySQL + aiomysql)
import asyncio
import aiomysql
from datetime import datetime
async def batch_insert(data_list):pool = await aiomysql.create_pool(host='127.0.0.1',port=3306,user='root',password='password',db='scrapy_db',# 连接池大小maxsize=20)async with pool.acquire() as conn:async with conn.cursor() as cur:sql = "INSERT INTO items (url, title, content, created_at) VALUES (%s, %s, %s, %s)"# 批量构造参数params = [(item['url'], item['title'], item['content'], datetime.now()) for item in data_list]await cur.executemany(sql, params)await conn.commit()
# 模拟爬取数据
async def main():data = [{'url': 'https://example.com/1', 'title': 'Title1', 'content': 'Content1'},{'url': 'https://example.com/2', 'title': 'Title2', 'content': 'Content2'},]await batch_insert(data)
if __name__ == '__main__':asyncio.run(main())
(2)MongoDB批量写入优化
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
client = MongoClient('mongodb://localhost:27017/')
db = client['scrapy_db']
collection = db['items']
# 批量插入数据
def bulk_insert(data_list):try:collection.insert_many(data_list, ordered=False)except BulkWriteError as e:print(f"部分写入失败: {e.details}")
data = [{"url": "https://example.com/1", "tags": ["tech", "news"]},{"url": "https://example.com/2", "tags": ["finance", "market"]}
]
bulk_insert(data)
(3)监控与调优
# 使用Python监控数据库性能
import psutil
import mysql.connector
def monitor_db():# 监控CPU/内存使用cpu_percent = psutil.cpu_percent()mem_percent = psutil.virtual_memory().percent# 监控MySQL状态conn = mysql.connector.connect(user='monitor', password='pass', host='localhost')cursor = conn.cursor()cursor.execute("SHOW GLOBAL STATUS LIKE 'Threads_running'")threads_running = cursor.fetchone()[1]print(f"CPU: {cpu_percent}%, Memory: {mem_percent}%, Threads_running: {threads_running}")# 根据监控数据动态调整if threads_running > 100:print("警告:数据库连接过高,建议扩容或优化查询")
四、调试与维护
1、日志与错误监控
爬虫在后台长时间运行,如何知道它的状态?出错如何排查?
解决方案:
1)完善的日志系统:使用Python`logging`模块,记录不同级别(INFO, DEBUG, ERROR)的日志,包括爬取URL、状态码、异常信息等。
2)使用Sentry或ELK栈监控错误和性能指标
3)状态监控:监控爬虫的运行时长、请求速度、成功率、IP消耗速度等指标。
4)错误重试机制:为请求设置重试策略(如Scrapy的`RETRY_TIMES`),对网络错误、状态码500等进行处理。
5)断点续爬:将爬取状态(如offset,page number)定期保存,程序重启后可以从断点继续,而不是从头开始。
代码示例:
import logging
import time
import json
import os
from urllib.parse import urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry# 配置日志系统
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("crawler.log"),logging.StreamHandler()]
)
logger = logging.getLogger("crawler")class ResilientCrawler:def __init__(self, base_url, start_page=1):self.base_url = base_urlself.start_page = start_pageself.current_page = start_pageself.session = self._create_session()self.state_file = "crawler_state.json"self.load_state()def _create_session(self):"""创建带有重试机制的会话"""session = requests.Session()retry_strategy = Retry(total=3,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504],allowed_methods=["GET", "POST"])adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("https://", adapter)session.mount("http://", adapter)return sessiondef load_state(self):"""从文件加载爬取状态"""if os.path.exists(self.state_file):try:with open(self.state_file, 'r') as f:state = json.load(f)self.current_page = state.get('current_page', self.start_page)logger.info(f"从断点恢复爬取,当前页码:{self.current_page}")except Exception as e:logger.error(f"加载状态失败:{str(e)}")def save_state(self):"""保存当前爬取状态"""state = {'current_page': self.current_page}try:with open(self.state_file, 'w') as f:json.dump(state, f)logger.debug("状态保存成功")except Exception as e:logger.error(f"状态保存失败:{str(e)}")def fetch_page(self, page):"""带错误处理的页面获取方法"""url = urljoin(self.base_url, f"?page={page}")logger.info(f"开始爬取:{url}")try:start_time = time.time()response = self.session.get(url, timeout=5)latency = time.time() - start_time# 记录性能指标logger.info(f"请求耗时:{latency:.2f}s")logger.info(f"状态码:{response.status_code}")response.raise_for_status()# 模拟处理响应数据data = response.text[:100] + "..." # 模拟数据处理logger.debug(f"页面内容摘要:{data}")return Trueexcept requests.exceptions.HTTPError as e:logger.error(f"HTTP错误:{str(e)}")except requests.exceptions.ConnectionError as e:logger.error(f"连接错误:{str(e)}")except requests.exceptions.Timeout as e:logger.error(f"请求超时:{str(e)}")except Exception as e:logger.error(f"未知错误:{str(e)}")return Falsedef run(self, max_pages=10):"""主运行逻辑"""logger.info(f"开始爬取任务,起始页:{self.start_page}")for page in range(self.current_page, self.current_page + max_pages):try:success = self.fetch_page(page)if success:self.current_page = page + 1self.save_state() # 定期保存状态else:logger.warning(f"页面爬取失败,页码:{page}")# 模拟请求间隔time.sleep(1)except KeyboardInterrupt:logger.info("用户中断爬取,保存当前状态")self.save_state()breakexcept Exception as e:logger.exception(f"爬取过程中发生异常:{str(e)}")logger.info(f"爬取任务完成,共处理页面:{page}个")if __name__ == "__main__":# 配置示例crawler = ResilientCrawler(base_url="https://example.com/api/items",start_page=1)# 启动爬虫crawler.run(max_pages=10)
2、反爬策略更新
问题:目标网站升级反爬机制导致爬虫失效。
解决方案:
定期测试爬虫(如每日运行少量请求检测封禁情况)。
维护反爬策略库,快速适配新规则(如更新代理IP池、验证码识别方法)。
五、法律与合规风险
1、版权与隐私侵犯
问题:抓取受版权保护的数据或用户隐私信息,遵守`robots.txt`协议与法律法规`robots.txt`是网站告知爬虫哪些页面可以抓取的协议,无视它可能导致法律风险。
解决方案:
1)遵守目标网站的robots.txt规则(如User-agent: * Disallow: /private/),使用Python的`urllib.robotparser`模块来解析目标网站的`robots.txt`,并遵守其规定。
2)查看网站条款:阅读网站的`Terms of Service`,明确是否禁止爬取,确保符合《网络安全法》或GDPR等法规。
3)控制访问压力:即使允许爬取,也应将爬取速度控制在合理范围内,避免对对方服务器造成DoS攻击般的压力。
4)仅抓取公开数据,避免存储敏感信息(如邮箱、手机号)。
5)注意数据用途:爬取的数据尤其是个⼈信息,不能用于非法用途或违反《网络安全法》《个人信息保护法》等法律法规。切勿公开或出售用户隐私数据。
2、服务条款违反
问题:目标网站明确禁止爬虫。
解决方案:
优先使用官方API(如Twitter API、Google Search API)。
联系网站管理员申请数据访问权限。
六、选择合适工具
爬虫框架:Scrapy(全功能)、Playwright(浏览器自动化)、Apify(云爬虫)。
代理服务:Bright Data、Smartproxy、ScraperAPI。
验证码识别:2Captcha、Anti-Captcha、Tesseract OCR。
存储方案:MongoDB(非结构化)、PostgreSQL(结构化)、Redis(缓存)。
轻量级请求:`requests`+`BeautifulSoup`/`lxml`
大型框架:`Scrapy` (异步、高性能、插件丰富)
JS渲染:`Scrapy`+`Splash`或`Playwright`/`Selenium`