Python爬虫第8课:代理池与反反爬虫技术
目录
- 课程目标
- 1. 代理基础知识
- 1.1 什么是代理?
- 1.2 代理类型
- 1.3 代理验证
- 2. 代理池构建
- 2.1 基础代理池
- 2.2 高级代理池
- 3. 免费代理获取
- 3.1 代理网站爬取
- 3.2 代理API集成
- 4. 反反爬虫技术
- 4.1 用户代理轮换
- 4.2 请求头优化
- 4.3 会话管理和Cookie处理
- 5. 实战案例:电商网站商品爬虫
- 6. 练习作业
- 6.1 基础练习
- 6.2 进阶练习
- 6.3 实战项目
- 7. 常见问题解答
- Q1: 如何判断代理是否被检测?
- Q2: 免费代理不稳定怎么办?
- Q3: 如何避免被网站封IP?
- 8. 下节预告
专栏导读
🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手
🏳️🌈 个人博客主页:请点击——> 个人的博客主页 求收藏
🏳️🌈 Github主页:请点击——> Github主页 求Star⭐
🏳️🌈 知乎主页:请点击——> 知乎主页 求关注
🏳️🌈 CSDN博客主页:请点击——> CSDN的博客主页 求关注
👍 该系列文章专栏:请点击——>Python办公自动化专栏 求订阅
🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏 求订阅
📕 此外还有python基础专栏:请点击——>Python基础学习专栏 求订阅
文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
❤️ 欢迎各位佬关注! ❤️
课程目标
- 理解代理的工作原理和类型
- 掌握代理池的构建和管理
- 学会IP轮换和反反爬虫技术
- 了解用户代理伪装和请求头优化
1. 代理基础知识
1.1 什么是代理?
代理服务器是客户端和目标服务器之间的中介,可以隐藏客户端的真实IP地址。
import requests# 不使用代理的请求
response = requests.get('https://httpbin.org/ip')
print("真实IP:", response.json())# 使用代理的请求
proxies = {'http': 'http://proxy-server:port','https': 'https://proxy-server:port'
}response = requests.get('https://httpbin.org/ip', proxies=proxies)
print("代理IP:", response.json())
1.2 代理类型
# HTTP代理
http_proxy = {'http': 'http://username:password@proxy-server:port','https': 'http://username:password@proxy-server:port'
}# HTTPS代理
https_proxy = {'http': 'https://username:password@proxy-server:port','https': 'https://username:password@proxy-server:port'
}# SOCKS代理
socks_proxy = {'http': 'socks5://username:password@proxy-server:port','https': 'socks5://username:password@proxy-server:port'
}
1.3 代理验证
import requests
import time
from urllib.parse import urlparseclass ProxyValidator:def __init__(self, timeout=10):self.timeout = timeoutself.test_urls = ['https://httpbin.org/ip','https://httpbin.org/get','https://www.google.com']def validate_proxy(self, proxy_url):"""验证代理是否可用"""proxies = {'http': proxy_url,'https': proxy_url}try:start_time = time.time()response = requests.get(self.test_urls[0], proxies=proxies, timeout=self.timeout)end_time = time.time()if response.status_code == 200:response_data = response.json()return {'proxy': proxy_url,'status': 'valid','ip': response_data.get('origin', ''),'response_time': end_time - start_time,'anonymity': self.check_anonymity(response_data)}else:return {'proxy': proxy_url,'status': 'invalid','error': f'HTTP {response.status_code}'}except Exception as e:return {'proxy': proxy_url,'status': 'invalid','error': str(e)}def check_anonymity(self, response_data):"""检查代理匿名性"""headers = response_data.get('headers', {})# 检查是否暴露真实IPforwarded_headers = ['X-Forwarded-For','X-Real-Ip','X-Originating-Ip','Client-Ip']for header in forwarded_headers:if header in headers:return 'transparent' # 透明代理# 检查是否暴露代理信息proxy_headers = ['Via','X-Proxy-Id','Proxy-Connection']for header in proxy_headers:if header in headers:return 'anonymous' # 匿名代理return 'elite' # 高匿代理# 使用示例
validator = ProxyValidator()
result = validator.validate_proxy('http://proxy-server:port')
print(result)
2. 代理池构建
2.1 基础代理池
import requests
import random
import time
import threading
from queue import Queue, Empty
import jsonclass ProxyPool:def __init__(self, max_size=100):self.max_size = max_sizeself.proxies = Queue(maxsize=max_size)self.bad_proxies = set()self.lock = threading.Lock()# 统计信息self.stats = {'total_proxies': 0,'valid_proxies': 0,'failed_proxies': 0,'requests_made': 0}def add_proxy(self, proxy_url):"""添加代理到池中"""if proxy_url not in self.bad_proxies:try:self.proxies.put_nowait(proxy_url)with self.lock:self.stats['total_proxies'] += 1return Trueexcept:return Falsereturn Falsedef get_proxy(self):"""从池中获取代理"""try:proxy = self.proxies.get_nowait()return proxyexcept Empty:return Nonedef return_proxy(self, proxy_url, success=True):"""归还代理到池中"""if success:self.add_proxy(proxy_url)with self.lock:self.stats['valid_proxies'] += 1else:self.bad_proxies.add(proxy_url)with self.lock:self.stats['failed_proxies'] += 1def make_request(self, url, **kwargs):"""使用代理池发起请求"""max_retries = 3for attempt in range(max_retries):proxy = self.get_proxy()if not proxy:raise Exception("代理池为空")proxies = {'http': proxy,'https': proxy}try:response = requests.get(url, proxies=proxies, **kwargs)self.return_proxy(proxy, success=True)with self.lock:self.stats['requests_made'] += 1return responseexcept Exception as e:self.return_proxy(proxy, success=False)if attempt == max_retries - 1:raise econtinuedef get_stats(self):"""获取统计信息"""with self.lock:return self.stats.copy()def size(self):"""获取代理池大小"""return self.proxies.qsize()# 使用示例
pool = ProxyPool()# 添加代理
proxy_list = ['http://proxy1:port','http://proxy2:port','http://proxy3:port'
]for proxy in proxy_list:pool.add_proxy(proxy)# 使用代理池发起请求
try:response = pool.make_request('https://httpbin.org/ip', timeout=10)print(response.json())
except Exception as e:print(f"请求失败:{e}")print("代理池统计:", pool.get_stats())
2.2 高级代理池
import asyncio
import aiohttp
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging@dataclass
class ProxyInfo:url: strip: str = ""port: int = 0protocol: str = "http"anonymity: str = "unknown"response_time: float = 0.0success_count: int = 0fail_count: int = 0last_used: float = 0.0last_checked: float = 0.0@propertydef success_rate(self):total = self.success_count + self.fail_countreturn self.success_count / total if total > 0 else 0.0@propertydef score(self):"""代理评分(响应时间越短、成功率越高,分数越高)"""if self.response_time == 0:return 0time_score = 1.0 / (self.response_time + 0.1)rate_score = self.success_ratereturn time_score * rate_scoreclass AdvancedProxyPool:def __init__(self, max_size=200, check_interval=300):self.max_size = max_sizeself.check_interval = check_intervalself.proxies = {} # url -> ProxyInfoself.lock = asyncio.Lock()# 配置日志self.logger = logging.getLogger(__name__)# 启动后台检查任务self.check_task = Noneasync def add_proxy(self, proxy_url):"""添加代理"""async with self.lock:if proxy_url not in self.proxies:proxy_info = ProxyInfo(url=proxy_url)# 解析代理信息if '://' in proxy_url:protocol, address = proxy_url.split('://', 1)proxy_info.protocol = protocolif '@' in address:auth, address = address.split('@', 1)if ':' in address:ip, port = address.split(':', 1)proxy_info.ip = ipproxy_info.port = int(port)self.proxies[proxy_url] = proxy_infoself.logger.info(f"添加代理:{proxy_url}")return Truereturn Falseasync def validate_proxy(self, proxy_info: ProxyInfo):"""验证代理"""test_url = 'https://httpbin.org/ip'try:connector = aiohttp.TCPConnector(limit=10,ttl_dns_cache=300,use_dns_cache=True)timeout = aiohttp.ClientTimeout(total=15)async with aiohttp.ClientSession(connector=connector,timeout=timeout) as session:start_time = time.time()async with session.get(test_url,proxy=proxy_info.url) as response:end_time = time.time()proxy_info.response_time = end_time - start_timeif response.status == 200:data = await response.json()proxy_info.success_count += 1proxy_info.last_checked = time.time()# 检查匿名性headers = data.get('headers', {})proxy_info.anonymity = self.check_anonymity(headers)return Trueelse:proxy_info.fail_count += 1return Falseexcept Exception as e:proxy_info.fail_count += 1self.logger.debug(f"代理验证失败:{proxy_info.url} - {e}")return Falsedef check_anonymity(self, headers):"""检查代理匿名性"""forwarded_headers = ['X-Forwarded-For', 'X-Real-Ip', 'X-Originating-Ip', 'Client-Ip']proxy_headers = ['Via', 'X-Proxy-Id', 'Proxy-Connection']for header in forwarded_headers:if header in headers:return 'transparent'for header in proxy_headers:if header in headers:return 'anonymous'return 'elite'async def get_best_proxy(self, min_score=0.1):"""获取最佳代理"""async with self.lock:valid_proxies = [proxy for proxy in self.proxies.values()if proxy.success_rate > 0.5 and proxy.score >= min_score]if not valid_proxies:return None# 按评分排序,选择最佳代理valid_proxies.sort(key=lambda x: x.score, reverse=True)# 随机选择前10%的代理,避免总是使用同一个top_count = max(1, len(valid_proxies) // 10)selected = random.choice(valid_proxies[:top_count])selected.last_used = time.time()return selectedasync def make_request(self, url, **kwargs):"""使用代理发起请求"""max_retries = 3for attempt in range(max_retries):proxy_info = await self.get_best_proxy()if not proxy_info:raise Exception("没有可用的代理")try:connector = aiohttp.TCPConnector(limit=50,ttl_dns_cache=300,use_dns_cache=True)timeout = aiohttp.ClientTimeout(total=30)async with aiohttp.ClientSession(connector=connector,timeout=timeout) as session:async with session.get(url,proxy=proxy_info.url,**kwargs) as response:proxy_info.success_count += 1return responseexcept Exception as e:proxy_info.fail_count += 1self.logger.warning(f"请求失败:{proxy_info.url} - {e}")if attempt == max_retries - 1:raise econtinueasync def check_all_proxies(self):"""检查所有代理"""self.logger.info("开始检查所有代理...")async with self.lock:proxy_list = list(self.proxies.values())# 并发检查代理semaphore = asyncio.Semaphore(20)async def check_proxy(proxy_info):async with semaphore:return await self.validate_proxy(proxy_info)tasks = [check_proxy(proxy) for proxy in proxy_list]results = await asyncio.gather(*tasks, return_exceptions=True)# 清理失效代理async with self.lock:to_remove = []for proxy_info in proxy_list:if proxy_info.success_rate < 0.1 and proxy_info.fail_count > 5:to_remove.append(proxy_info.url)for url in to_remove:del self.proxies[url]self.logger.info(f"移除失效代理:{url}")valid_count = sum(1 for r in results if r is True)self.logger.info(f"代理检查完成,有效代理:{valid_count}/{len(proxy_list)}")async def start_background_check(self):"""启动后台检查任务"""async def check_loop():while True:try:await self.check_all_proxies()await asyncio.sleep(self.check_interval)except Exception as e:self.logger.error(f"后台检查出错:{e}")await asyncio.sleep(60)self.check_task = asyncio.create_task(check_loop())async def stop_background_check(self):"""停止后台检查任务"""if self.check_task:self.check_task.cancel()try:await self.check_taskexcept asyncio.CancelledError:passasync def get_stats(self):"""获取统计信息"""async with self.lock:total = len(self.proxies)valid = sum(1 for p in self.proxies.values() if p.success_rate > 0.5)avg_response_time = 0if total > 0:total_time = sum(p.response_time for p in self.proxies.values())avg_response_time = total_time / totalreturn {'total_proxies': total,'valid_proxies': valid,'avg_response_time': avg_response_time,'anonymity_distribution': self.get_anonymity_stats()}def get_anonymity_stats(self):"""获取匿名性统计"""stats = {'elite': 0, 'anonymous': 0, 'transparent': 0, 'unknown': 0}for proxy in self.proxies.values():stats[proxy.anonymity] += 1return stats# 使用示例
async def main():# 配置日志logging.basicConfig(level=logging.INFO)pool = AdvancedProxyPool()# 添加代理proxy_list = ['http://proxy1:port','http://proxy2:port','http://proxy3:port']for proxy in proxy_list:await pool.add_proxy(proxy)# 启动后台检查await pool.start_background_check()try:# 使用代理池response = await pool.make_request('https://httpbin.org/ip')print(await response.json())# 获取统计信息stats = await pool.get_stats()print("代理池统计:", stats)finally:await pool.stop_background_check()# 运行示例
# asyncio.run(main())
3. 免费代理获取
3.1 代理网站爬取
import requests
from bs4 import BeautifulSoup
import re
import time
import randomclass FreeProxyCollector:def __init__(self):self.session = requests.Session()self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})self.proxy_sources = [{'name': 'free-proxy-list','url': 'https://free-proxy-list.net/','parser': self.parse_free_proxy_list},{'name': 'proxy-list','url': 'https://www.proxy-list.download/api/v1/get?type=http','parser': self.parse_proxy_list_download}]def parse_free_proxy_list(self, html):"""解析 free-proxy-list.net"""soup = BeautifulSoup(html, 'html.parser')proxies = []table = soup.find('table', {'id': 'proxylisttable'})if table:rows = table.find_all('tr')[1:] # 跳过表头for row in rows:cols = row.find_all('td')if len(cols) >= 7:ip = cols[0].text.strip()port = cols[1].text.strip()country = cols[2].text.strip()anonymity = cols[4].text.strip()https = cols[6].text.strip()protocol = 'https' if https == 'yes' else 'http'proxy_url = f"{protocol}://{ip}:{port}"proxies.append({'url': proxy_url,'ip': ip,'port': int(port),'country': country,'anonymity': anonymity.lower(),'protocol': protocol})return proxiesdef parse_proxy_list_download(self, content):"""解析 proxy-list.download API"""proxies = []lines = content.strip().split('\n')for line in lines:if ':' in line:ip, port = line.strip().split(':', 1)proxy_url = f"http://{ip}:{port}"proxies.append({'url': proxy_url,'ip': ip,'port': int(port),'country': 'unknown','anonymity': 'unknown','protocol': 'http'})return proxiesdef collect_from_source(self, source):"""从单个源收集代理"""try:print(f"正在收集代理:{source['name']}")response = self.session.get(source['url'], timeout=15)response.raise_for_status()if response.headers.get('content-type', '').startswith('application/json'):proxies = source['parser'](response.text)else:proxies = source['parser'](response.text)print(f"从 {source['name']} 收集到 {len(proxies)} 个代理")return proxiesexcept Exception as e:print(f"收集代理失败:{source['name']} - {e}")return []def collect_all_proxies(self):"""收集所有源的代理"""all_proxies = []for source in self.proxy_sources:proxies = self.collect_from_source(source)all_proxies.extend(proxies)# 随机延迟,避免被封time.sleep(random.uniform(1, 3))# 去重unique_proxies = {}for proxy in all_proxies:unique_proxies[proxy['url']] = proxyresult = list(unique_proxies.values())print(f"总共收集到 {len(result)} 个唯一代理")return resultdef validate_proxies(self, proxies, max_workers=20):"""验证代理列表"""from concurrent.futures import ThreadPoolExecutor, as_completedvalidator = ProxyValidator(timeout=10)valid_proxies = []with ThreadPoolExecutor(max_workers=max_workers) as executor:future_to_proxy = {executor.submit(validator.validate_proxy, proxy['url']): proxyfor proxy in proxies}for future in as_completed(future_to_proxy):proxy = future_to_proxy[future]try:result = future.result()if result['status'] == 'valid':valid_proxies.append(result)print(f"✓ 有效代理:{result['proxy']} - {result['response_time']:.2f}s")else:print(f"✗ 无效代理:{result['proxy']}")except Exception as e:print(f"✗ 验证出错:{proxy['url']} - {e}")return valid_proxies# 使用示例
collector = FreeProxyCollector()# 收集代理
proxies = collector.collect_all_proxies()# 验证代理
valid_proxies = collector.validate_proxies(proxies[:50]) # 只验证前50个print(f"验证完成,有效代理:{len(valid_proxies)} 个")# 保存有效代理
import json
with open('valid_proxies.json', 'w') as f:json.dump(valid_proxies, f, indent=2)
3.2 代理API集成
import requests
import json
import timeclass ProxyAPI:def __init__(self):self.apis = {'proxylist': {'url': 'https://www.proxy-list.download/api/v1/get','params': {'type': 'http'},'parser': self.parse_text_list},'gimmeproxy': {'url': 'https://gimmeproxy.com/api/getProxy','params': {'format': 'json', 'protocol': 'http'},'parser': self.parse_gimmeproxy}}def parse_text_list(self, content):"""解析文本格式的代理列表"""proxies = []lines = content.strip().split('\n')for line in lines:if ':' in line:ip, port = line.strip().split(':', 1)proxies.append(f"http://{ip}:{port}")return proxiesdef parse_gimmeproxy(self, content):"""解析 gimmeproxy API 响应"""try:data = json.loads(content)proxy_url = f"{data['protocol']}://{data['ip']}:{data['port']}"return [proxy_url]except:return []def get_proxies_from_api(self, api_name, count=10):"""从指定API获取代理"""if api_name not in self.apis:raise ValueError(f"未知的API:{api_name}")api_config = self.apis[api_name]proxies = []for i in range(count):try:response = requests.get(api_config['url'],params=api_config['params'],timeout=10)if response.status_code == 200:new_proxies = api_config['parser'](response.text)proxies.extend(new_proxies)if api_name == 'gimmeproxy':time.sleep(1) # gimmeproxy 有速率限制except Exception as e:print(f"API请求失败:{api_name} - {e}")breakreturn list(set(proxies)) # 去重def get_all_proxies(self, count_per_api=20):"""从所有API获取代理"""all_proxies = []for api_name in self.apis:print(f"正在从 {api_name} 获取代理...")proxies = self.get_proxies_from_api(api_name, count_per_api)all_proxies.extend(proxies)print(f"从 {api_name} 获取到 {len(proxies)} 个代理")return list(set(all_proxies))# 使用示例
api = ProxyAPI()
proxies = api.get_all_proxies(count_per_api=10)
print(f"总共获取到 {len(proxies)} 个代理")
4. 反反爬虫技术
4.1 用户代理轮换
import random
import requests
import timeclass UserAgentRotator:def __init__(self):self.user_agents = [# Chrome'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',# Firefox'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',# Safari'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Safari/605.1.15',# Edge'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59','Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36 Edg/90.0.818.66']self.mobile_agents = [# Mobile Chrome'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1','Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0','Mozilla/5.0 (Linux; Android 11; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36']def get_random_desktop_ua(self):"""获取随机桌面用户代理"""return random.choice(self.user_agents)def get_random_mobile_ua(self):"""获取随机移动用户代理"""return random.choice(self.mobile_agents)def get_random_ua(self, mobile_ratio=0.3):"""获取随机用户代理(可指定移动端比例)"""if random.random() < mobile_ratio:return self.get_random_mobile_ua()else:return self.get_random_desktop_ua()class AntiDetectionSpider:def __init__(self, proxy_pool=None):self.ua_rotator = UserAgentRotator()self.proxy_pool = proxy_poolself.session = requests.Session()# 请求间隔配置self.min_delay = 1self.max_delay = 3self.last_request_time = 0def get_headers(self, url=None):"""生成随机请求头"""headers = {'User-Agent': self.ua_rotator.get_random_ua(),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': random.choice(['en-US,en;q=0.9','zh-CN,zh;q=0.9,en;q=0.8','ja-JP,ja;q=0.9,en;q=0.8']),'Accept-Encoding': 'gzip, deflate, br','DNT': '1','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}# 随机添加一些可选头optional_headers = {'Cache-Control': random.choice(['no-cache', 'max-age=0']),'Pragma': 'no-cache','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Sec-Fetch-User': '?1'}# 随机添加部分可选头for key, value in optional_headers.items():if random.random() > 0.5:headers[key] = value# 如果有referer,添加referer头if url and random.random() > 0.3:headers['Referer'] = urlreturn headersdef wait_between_requests(self):"""请求间隔控制"""current_time = time.time()elapsed = current_time - self.last_request_timemin_interval = random.uniform(self.min_delay, self.max_delay)if elapsed < min_interval:sleep_time = min_interval - elapsedtime.sleep(sleep_time)self.last_request_time = time.time()def make_request(self, url, **kwargs):"""发起请求"""self.wait_between_requests()# 设置请求头headers = self.get_headers(url)kwargs.setdefault('headers', {}).update(headers)# 设置代理if self.proxy_pool:proxy = self.proxy_pool.get_proxy()if proxy:kwargs['proxies'] = {'http': proxy,'https': proxy}# 设置超时kwargs.setdefault('timeout', 15)try:response = self.session.get(url, **kwargs)# 检查是否被反爬虫检测if self.is_blocked(response):raise Exception("请求被阻止,可能触发了反爬虫机制")return responseexcept Exception as e:# 如果使用了代理且失败,标记代理为失效if self.proxy_pool and 'proxies' in kwargs:self.proxy_pool.return_proxy(kwargs['proxies']['http'], success=False)raise edef is_blocked(self, response):"""检测是否被反爬虫阻止"""# 检查状态码if response.status_code in [403, 429, 503]:return True# 检查响应内容content = response.text.lower()blocked_keywords = ['access denied','blocked','captcha','verification','robot','bot detection','访问被拒绝','验证码','机器人检测']for keyword in blocked_keywords:if keyword in content:return True# 检查响应大小(异常小的响应可能是阻止页面)if len(content) < 100:return Truereturn False# 使用示例
spider = AntiDetectionSpider()try:response = spider.make_request('https://httpbin.org/headers')print("请求成功")print("响应头:", response.json())
except Exception as e:print(f"请求失败:{e}")
4.2 请求头优化
import random
import time
from urllib.parse import urlparseclass HeaderOptimizer:def __init__(self):self.browser_versions = {'chrome': ['91.0.4472.124', '90.0.4430.212', '89.0.4389.128'],'firefox': ['89.0', '88.0', '87.0'],'safari': ['14.1.1', '14.0.3', '13.1.2'],'edge': ['91.0.864.59', '90.0.818.66', '89.0.774.68']}self.os_versions = {'windows': ['Windows NT 10.0; Win64; x64', 'Windows NT 6.1; Win64; x64'],'macos': ['Macintosh; Intel Mac OS X 10_15_7', 'Macintosh; Intel Mac OS X 10_14_6'],'linux': ['X11; Linux x86_64', 'X11; Ubuntu; Linux x86_64']}def generate_realistic_headers(self, target_url=None):"""生成真实的浏览器请求头"""# 选择浏览器和操作系统browser = random.choice(['chrome', 'firefox', 'safari', 'edge'])os_type = random.choice(['windows', 'macos', 'linux'])# 生成User-Agentuser_agent = self.generate_user_agent(browser, os_type)# 基础请求头headers = {'User-Agent': user_agent,'Accept': self.get_accept_header(browser),'Accept-Language': self.get_accept_language(),'Accept-Encoding': 'gzip, deflate, br','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Sec-Fetch-User': '?1'}# 根据浏览器添加特定头if browser == 'chrome':headers.update({'sec-ch-ua': f'"Chromium";v="{self.browser_versions[browser][0].split(".")[0]}", "Google Chrome";v="{self.browser_versions[browser][0].split(".")[0]}", ";Not A Brand";v="99"','sec-ch-ua-mobile': '?0','sec-ch-ua-platform': f'"{os_type.title()}"'})# 添加缓存控制if random.random() > 0.5:headers['Cache-Control'] = random.choice(['no-cache', 'max-age=0'])headers['Pragma'] = 'no-cache'# 添加DNT头if random.random() > 0.3:headers['DNT'] = '1'# 如果有目标URL,添加相关头if target_url:parsed = urlparse(target_url)# 添加Host头headers['Host'] = parsed.netloc# 随机添加Refererif random.random() > 0.4:referers = ['https://www.google.com/','https://www.bing.com/','https://www.baidu.com/',f'https://{parsed.netloc}/']headers['Referer'] = random.choice(referers)return headersdef generate_user_agent(self, browser, os_type):"""生成用户代理字符串"""browser_version = random.choice(self.browser_versions[browser])os_version = random.choice(self.os_versions[os_type])if browser == 'chrome':webkit_version = '537.36'return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{browser_version} Safari/{webkit_version}'elif browser == 'firefox':return f'Mozilla/5.0 ({os_version}; rv:{browser_version}) Gecko/20100101 Firefox/{browser_version}'elif browser == 'safari':webkit_version = '605.1.15'return f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Version/{browser_version} Safari/{webkit_version}'elif browser == 'edge':webkit_version = '537.36'chrome_version = '91.0.4472.124' # Edge基于Chromiumreturn f'Mozilla/5.0 ({os_version}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{webkit_version} Edg/{browser_version}'def get_accept_header(self, browser):"""获取Accept头"""accept_headers = {'chrome': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9','firefox': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','safari': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','edge': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'}return accept_headers.get(browser, accept_headers['chrome'])def get_accept_language(self):"""获取Accept-Language头"""languages = ['en-US,en;q=0.9','zh-CN,zh;q=0.9,en;q=0.8','ja-JP,ja;q=0.9,en;q=0.8','ko-KR,ko;q=0.9,en;q=0.8','de-DE,de;q=0.9,en;q=0.8','fr-FR,fr;q=0.9,en;q=0.8']return random.choice(languages)# 使用示例
optimizer = HeaderOptimizer()# 生成针对特定URL的请求头
headers = optimizer.generate_realistic_headers('https://example.com')
print("生成的请求头:")
for key, value in headers.items():print(f"{key}: {value}")# 使用生成的请求头发起请求
import requestsresponse = requests.get('https://httpbin.org/headers', headers=headers)
print("\n服务器收到的请求头:")
print(response.json())
4.3 会话管理和Cookie处理
import requests
import json
import time
import random
from http.cookiejar import LWPCookieJarclass SessionManager:def __init__(self, cookie_file=None):self.session = requests.Session()self.cookie_file = cookie_file# 设置Cookie jarif cookie_file:self.session.cookies = LWPCookieJar(cookie_file)try:self.session.cookies.load(ignore_discard=True)except FileNotFoundError:pass# 会话配置self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})# 请求历史self.request_history = []def save_cookies(self):"""保存Cookie到文件"""if self.cookie_file:self.session.cookies.save(ignore_discard=True)def load_cookies(self):"""从文件加载Cookie"""if self.cookie_file:try:self.session.cookies.load(ignore_discard=True)return Trueexcept FileNotFoundError:return Falsereturn Falsedef clear_cookies(self):"""清除所有Cookie"""self.session.cookies.clear()def get_cookies_dict(self):"""获取Cookie字典"""return dict(self.session.cookies)def set_cookies(self, cookies_dict):"""设置Cookie"""for name, value in cookies_dict.items():self.session.cookies.set(name, value)def simulate_browser_behavior(self, url):"""模拟真实浏览器行为"""# 1. 首先访问主页domain = '/'.join(url.split('/')[:3])try:# 访问主页response = self.session.get(domain, timeout=10)time.sleep(random.uniform(1, 3))# 模拟查看页面if random.random() > 0.5:# 随机访问一些常见页面common_pages = ['/about', '/contact', '/help', '/sitemap']page = random.choice(common_pages)try:self.session.get(f"{domain}{page}", timeout=10)time.sleep(random.uniform(0.5, 2))except:pass# 最后访问目标页面response = self.session.get(url, timeout=10)# 记录请求历史self.request_history.append({'url': url,'timestamp': time.time(),'status_code': response.status_code})return responseexcept Exception as e:print(f"模拟浏览器行为失败:{e}")return Nonedef handle_login(self, login_url, username, password, username_field='username', password_field='password'):"""处理登录"""try:# 获取登录页面response = self.session.get(login_url)# 这里可以解析页面获取CSRF token等# soup = BeautifulSoup(response.text, 'html.parser')# csrf_token = soup.find('input', {'name': 'csrf_token'})['value']# 准备登录数据login_data = {username_field: username,password_field: password}# 发送登录请求response = self.session.post(login_url, data=login_data)# 检查登录是否成功if self.is_login_successful(response):print("登录成功")self.save_cookies()return Trueelse:print("登录失败")return Falseexcept Exception as e:print(f"登录过程出错:{e}")return Falsedef is_login_successful(self, response):"""检查登录是否成功"""# 检查状态码if response.status_code != 200:return False# 检查URL重定向if 'login' in response.url.lower():return False# 检查页面内容content = response.text.lower()success_indicators = ['dashboard', 'profile', 'logout', '退出', '个人中心']failure_indicators = ['error', 'invalid', 'incorrect', '错误', '失败']has_success = any(indicator in content for indicator in success_indicators)has_failure = any(indicator in content for indicator in failure_indicators)return has_success and not has_failuredef get_session_info(self):"""获取会话信息"""return {'cookies_count': len(self.session.cookies),'cookies': dict(self.session.cookies),'headers': dict(self.session.headers),'request_count': len(self.request_history)}# 使用示例
session_manager = SessionManager(cookie_file='session_cookies.txt')# 模拟浏览器行为访问网站
response = session_manager.simulate_browser_behavior('https://example.com/target-page')if response:print(f"访问成功,状态码:{response.status_code}")# 查看会话信息info = session_manager.get_session_info()print(f"Cookie数量:{info['cookies_count']}")print(f"请求次数:{info['request_count']}")# 保存Cookiesession_manager.save_cookies()
5. 实战案例:电商网站商品爬虫
import asyncio
import aiohttp
import json
import time
import random
from dataclasses import dataclass
from typing import List, Optional
import logging@dataclass
class Product:title: strprice: strurl: strimage_url: str = ""rating: str = ""reviews_count: str = ""seller: str = ""class EcommerceSpider:def __init__(self, proxy_pool=None, max_concurrent=10):self.proxy_pool = proxy_poolself.max_concurrent = max_concurrentself.semaphore = asyncio.Semaphore(max_concurrent)# 用户代理轮换self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0']# 统计信息self.stats = {'pages_crawled': 0,'products_found': 0,'errors': 0,'proxy_failures': 0}# 配置日志logging.basicConfig(level=logging.INFO)self.logger = logging.getLogger(__name__)async def get_session(self):"""创建HTTP会话"""connector = aiohttp.TCPConnector(limit=100,limit_per_host=self.max_concurrent,ttl_dns_cache=300,use_dns_cache=True)timeout = aiohttp.ClientTimeout(total=30)return aiohttp.ClientSession(connector=connector,timeout=timeout)def get_headers(self):"""获取随机请求头"""return {'User-Agent': random.choice(self.user_agents),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.9,zh-CN,zh;q=0.8','Accept-Encoding': 'gzip, deflate, br','DNT': '1','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1'}async def fetch_with_retry(self, session, url, max_retries=3):"""带重试的请求"""async with self.semaphore:for attempt in range(max_retries):proxy = None# 获取代理if self.proxy_pool:proxy_info = await self.proxy_pool.get_best_proxy()proxy = proxy_info.url if proxy_info else Nonetry:# 随机延迟await asyncio.sleep(random.uniform(1, 3))headers = self.get_headers()async with session.get(url, headers=headers,proxy=proxy) as response:if response.status == 200:content = await response.text()self.stats['pages_crawled'] += 1# 标记代理成功if proxy and self.proxy_pool:proxy_info.success_count += 1return contentelif response.status in [403, 429, 503]:# 可能被反爬虫阻止self.logger.warning(f"请求被阻止:{url} - 状态码:{response.status}")if proxy and self.proxy_pool:proxy_info.fail_count += 1self.stats['proxy_failures'] += 1if attempt < max_retries - 1:await asyncio.sleep(random.uniform(5, 10))continueelse:self.logger.error(f"HTTP错误:{url} - 状态码:{response.status}")except Exception as e:self.logger.error(f"请求失败:{url} - {e}")if proxy and self.proxy_pool:proxy_info.fail_count += 1self.stats['proxy_failures'] += 1if attempt < max_retries - 1:await asyncio.sleep(random.uniform(2, 5))continueself.stats['errors'] += 1return Nonedef parse_product_list(self, html, base_url):"""解析商品列表页面"""from bs4 import BeautifulSoupsoup = BeautifulSoup(html, 'html.parser')products = []# 根据实际网站结构调整选择器product_items = soup.select('.product-item') # 示例选择器for item in product_items:try:# 商品标题title_elem = item.select_one('.product-title')title = title_elem.get_text(strip=True) if title_elem else ""# 商品价格price_elem = item.select_one('.product-price')price = price_elem.get_text(strip=True) if price_elem else ""# 商品链接link_elem = item.select_one('a')url = link_elem.get('href') if link_elem else ""if url and not url.startswith('http'):url = base_url + url# 商品图片img_elem = item.select_one('img')image_url = img_elem.get('src') if img_elem else ""# 评分rating_elem = item.select_one('.rating')rating = rating_elem.get_text(strip=True) if rating_elem else ""# 评论数reviews_elem = item.select_one('.reviews-count')reviews_count = reviews_elem.get_text(strip=True) if reviews_elem else ""if title and price and url:product = Product(title=title,price=price,url=url,image_url=image_url,rating=rating,reviews_count=reviews_count)products.append(product)except Exception as e:self.logger.error(f"解析商品失败:{e}")continueself.stats['products_found'] += len(products)return productsasync def crawl_search_results(self, keyword, max_pages=5):"""爬取搜索结果"""all_products = []base_url = "https://example-shop.com" # 替换为实际网站async with await self.get_session() as session:tasks = []# 创建所有页面的任务for page in range(1, max_pages + 1):search_url = f"{base_url}/search?q={keyword}&page={page}"task = self.crawl_single_page(session, search_url, base_url)tasks.append(task)# 并发执行所有任务results = await asyncio.gather(*tasks, return_exceptions=True)# 收集结果for result in results:if isinstance(result, list):all_products.extend(result)elif isinstance(result, Exception):self.logger.error(f"页面爬取失败:{result}")return all_productsasync def crawl_single_page(self, session, url, base_url):"""爬取单个页面"""html = await self.fetch_with_retry(session, url)if html:products = self.parse_product_list(html, base_url)self.logger.info(f"页面 {url} 找到 {len(products)} 个商品")return productselse:self.logger.error(f"无法获取页面:{url}")return []async def save_products(self, products, filename='products.json'):"""保存商品数据"""data = []for product in products:data.append({'title': product.title,'price': product.price,'url': product.url,'image_url': product.image_url,'rating': product.rating,'reviews_count': product.reviews_count,'seller': product.seller})with open(filename, 'w', encoding='utf-8') as f:json.dump(data, f, ensure_ascii=False, indent=2)self.logger.info(f"已保存 {len(products)} 个商品到 {filename}")def print_stats(self):"""打印统计信息"""self.logger.info("=== 爬取统计 ===")self.logger.info(f"页面爬取:{self.stats['pages_crawled']}")self.logger.info(f"商品发现:{self.stats['products_found']}")self.logger.info(f"错误次数:{self.stats['errors']}")self.logger.info(f"代理失败:{self.stats['proxy_failures']}")# 使用示例
async def main():# 创建代理池(可选)proxy_pool = AdvancedProxyPool()# 添加一些代理proxy_list = ['http://proxy1:port','http://proxy2:port','http://proxy3:port']for proxy in proxy_list:await proxy_pool.add_proxy(proxy)# 启动后台检查await proxy_pool.start_background_check()try:# 创建爬虫spider = EcommerceSpider(proxy_pool=proxy_pool, max_concurrent=5)# 爬取商品products = await spider.crawl_search_results('手机', max_pages=3)# 保存结果await spider.save_products(products)# 打印统计spider.print_stats()finally:await proxy_pool.stop_background_check()# 运行爬虫
# asyncio.run(main())
6. 练习作业
6.1 基础练习
- 代理验证器:编写一个代理验证器,能够检测代理的可用性、匿名性和响应时间
- 用户代理轮换:实现一个用户代理轮换器,支持桌面和移动端UA
- 请求头优化:创建一个请求头生成器,能够生成真实的浏览器请求头
6.2 进阶练习
- 代理池管理:构建一个完整的代理池系统,包括代理获取、验证、轮换和失效处理
- 反反爬虫系统:设计一个综合的反反爬虫系统,集成代理、UA轮换、请求间隔等功能
- 会话管理:实现一个智能会话管理器,能够处理登录、Cookie管理和会话保持
6.3 实战项目
选择一个电商网站,实现以下功能:
- 使用代理池爬取商品信息
- 实现反反爬虫检测和应对
- 处理动态加载内容
- 数据清洗和存储
7. 常见问题解答
Q1: 如何判断代理是否被检测?
A: 可以通过以下方式判断:
- 检查响应状态码(403、429、503等)
- 分析响应内容是否包含验证码或阻止信息
- 监控响应时间异常
- 检查返回的IP地址是否为代理IP
Q2: 免费代理不稳定怎么办?
A: 建议:
- 建立大量代理池,实现快速轮换
- 实时检测代理可用性
- 使用多个代理源
- 考虑使用付费代理服务
Q3: 如何避免被网站封IP?
A: 采取以下措施:
- 控制请求频率
- 使用代理轮换
- 模拟真实用户行为
- 随机化请求参数
- 处理验证码和登录
8. 下节预告
下一课我们将学习《Python爬虫第9课:验证码识别与自动化处理》,内容包括:
- 验证码类型和识别技术
- OCR文字识别
- 图像验证码处理
- 滑块验证码破解
- 第三方验证码服务
通过本课的学习,你已经掌握了代理池构建和反反爬虫的核心技术。这些技能将帮助你应对各种反爬虫挑战,提高爬虫的稳定性和成功率。
结尾
希望对初学者有帮助;致力于办公自动化的小小程序员一枚
希望能得到大家的【❤️一个免费关注❤️】感谢!
求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏