当前位置: 首页 > news >正文

Python爬虫第10课:分布式爬虫架构与Scrapy-Redis

目录

    • 课程目标
    • 1. 分布式爬虫基础概念
      • 1.1 什么是分布式爬虫
      • 1.2 分布式爬虫架构
      • 1.3 分布式爬虫的优势与挑战
    • 2. Redis在分布式爬虫中的应用
      • 2.1 Redis基础配置
      • 2.2 Redis集群配置
    • 3. Scrapy-Redis框架详解
      • 3.1 Scrapy-Redis安装和配置
      • 3.2 Scrapy-Redis基本配置
      • 3.3 创建分布式爬虫
      • 3.4 自定义管道处理
    • 4. 分布式任务调度
      • 4.1 任务调度器
      • 4.2 工作节点管理
    • 5. 数据去重和存储
      • 5.1 高级去重策略
      • 5.2 分布式存储策略
    • 6. 监控和管理
      • 6.1 分布式爬虫监控系统
    • 7. 实战案例:构建大规模新闻爬虫系统
      • 7.1 系统架构设计
    • 8. 练习与作业
      • 8.1 基础练习
      • 8.2 进阶练习
      • 8.3 实战项目
    • 9. 常见问题与解决方案
      • 9.1 Redis连接问题
      • 9.2 任务调度问题
      • 9.3 性能优化
    • 10. 下节预告

专栏导读
  • 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手
  • 🏳️‍🌈 个人博客主页:请点击——> 个人的博客主页 求收藏
  • 🏳️‍🌈 Github主页:请点击——> Github主页 求Star⭐
  • 🏳️‍🌈 知乎主页:请点击——> 知乎主页 求关注
  • 🏳️‍🌈 CSDN博客主页:请点击——> CSDN的博客主页 求关注
  • 👍 该系列文章专栏:请点击——>Python办公自动化专栏 求订阅
  • 🕷 此外还有爬虫专栏:请点击——>Python爬虫基础专栏 求订阅
  • 📕 此外还有python基础专栏:请点击——>Python基础学习专栏 求订阅
  • 文章作者技术和水平有限,如果文中出现错误,希望大家能指正🙏
  • ❤️ 欢迎各位佬关注! ❤️

课程目标

  • 理解分布式爬虫的基本概念和架构
  • 掌握Scrapy-Redis框架的使用
  • 学会构建可扩展的分布式爬虫系统
  • 实现任务调度和数据去重
  • 掌握分布式爬虫的监控和管理

1. 分布式爬虫基础概念

1.1 什么是分布式爬虫

分布式爬虫是将爬虫任务分散到多台机器上并行执行的技术,主要解决以下问题:

  • 性能瓶颈:单机爬虫处理能力有限
  • 可扩展性:需要根据任务量动态调整资源
  • 容错性:单点故障不影响整体系统
  • 负载均衡:合理分配爬取任务

1.2 分布式爬虫架构

"""
分布式爬虫基本架构:┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Master Node   │    │   Worker Node   │    │   Worker Node   │
│                 │    │                 │    │                 │
│  ┌───────────┐  │    │  ┌───────────┐  │    │  ┌───────────┐  │
│  │ Scheduler │  │    │  │  Spider   │  │    │  │  Spider   │  │
│  └───────────┘  │    │  └───────────┘  │    │  └───────────┘  │
│  ┌───────────┐  │    │  ┌───────────┐  │    │  ┌───────────┐  │
│  │ Monitor   │  │    │  │ Downloader│  │    │  │ Downloader│  │
│  └───────────┘  │    │  └───────────┘  │    │  └───────────┘  │
└─────────────────┘    └─────────────────┘    └─────────────────┘│                       │                       │└───────────────────────┼───────────────────────┘│┌─────────────────┐│   Redis Cluster ││                 ││  ┌───────────┐  ││  │   Queue   │  ││  └───────────┘  ││  ┌───────────┐  ││  │   Set     │  ││  └───────────┘  │└─────────────────┘
"""class DistributedCrawlerArchitecture:"""分布式爬虫架构示例"""def __init__(self):self.components = {'scheduler': 'Redis队列管理任务调度','workers': '多个爬虫节点并行工作','storage': 'Redis存储去重和缓存','monitor': '监控系统状态和性能','load_balancer': '负载均衡和故障转移'}def describe_architecture(self):"""描述架构组件"""for component, description in self.components.items():print(f"{component}: {description}")

1.3 分布式爬虫的优势与挑战

优势:

  • 高并发处理能力
  • 水平扩展性
  • 容错和高可用
  • 资源利用率高

挑战:

  • 任务调度复杂
  • 数据一致性
  • 网络通信开销
  • 系统复杂度增加

2. Redis在分布式爬虫中的应用

2.1 Redis基础配置

import redis
import json
import time
from typing import List, Dict, Anyclass RedisManager:"""Redis管理器"""def __init__(self, host='localhost', port=6379, db=0, password=None):self.redis_client = redis.Redis(host=host,port=port,db=db,password=password,decode_responses=True)# 测试连接try:self.redis_client.ping()print("Redis连接成功")except redis.ConnectionError:print("Redis连接失败")raisedef push_request(self, queue_name: str, request_data: Dict[str, Any]):"""推送请求到队列"""request_json = json.dumps(request_data)return self.redis_client.lpush(queue_name, request_json)def pop_request(self, queue_name: str, timeout: int = 0):"""从队列弹出请求"""result = self.redis_client.brpop(queue_name, timeout=timeout)if result:queue, request_json = resultreturn json.loads(request_json)return Nonedef add_to_set(self, set_name: str, value: str):"""添加到集合(用于去重)"""return self.redis_client.sadd(set_name, value)def is_in_set(self, set_name: str, value: str):"""检查是否在集合中"""return self.redis_client.sismember(set_name, value)def get_queue_length(self, queue_name: str):"""获取队列长度"""return self.redis_client.llen(queue_name)def get_set_size(self, set_name: str):"""获取集合大小"""return self.redis_client.scard(set_name)def clear_queue(self, queue_name: str):"""清空队列"""return self.redis_client.delete(queue_name)def clear_set(self, set_name: str):"""清空集合"""return self.redis_client.delete(set_name)# 使用示例
redis_manager = RedisManager()# 推送请求
request_data = {'url': 'https://example.com/page1','method': 'GET','headers': {'User-Agent': 'MySpider'},'meta': {'page': 1}
}redis_manager.push_request('spider:requests', request_data)# 弹出请求
request = redis_manager.pop_request('spider:requests')
print(f"获取到请求:{request}")# 去重检查
url_hash = hash(request_data['url'])
if not redis_manager.is_in_set('spider:dupefilter', str(url_hash)):redis_manager.add_to_set('spider:dupefilter', str(url_hash))print("新URL,添加到去重集合")
else:print("重复URL,跳过")

2.2 Redis集群配置

import redis.sentinel
from rediscluster import RedisClusterclass RedisClusterManager:"""Redis集群管理器"""def __init__(self, cluster_nodes=None, sentinel_hosts=None):if cluster_nodes:# Redis Cluster模式self.redis_client = RedisCluster(startup_nodes=cluster_nodes,decode_responses=True,skip_full_coverage_check=True)elif sentinel_hosts:# Redis Sentinel模式sentinel = redis.sentinel.Sentinel(sentinel_hosts)self.redis_client = sentinel.master_for('mymaster',decode_responses=True)else:raise ValueError("需要提供cluster_nodes或sentinel_hosts")def get_client(self):"""获取Redis客户端"""return self.redis_client# Redis Cluster配置示例
cluster_nodes = [{"host": "127.0.0.1", "port": "7000"},{"host": "127.0.0.1", "port": "7001"},{"host": "127.0.0.1", "port": "7002"},
]# Redis Sentinel配置示例
sentinel_hosts = [('127.0.0.1', 26379),('127.0.0.1', 26380),('127.0.0.1', 26381),
]# 使用集群
cluster_manager = RedisClusterManager(cluster_nodes=cluster_nodes)
redis_client = cluster_manager.get_client()

3. Scrapy-Redis框架详解

3.1 Scrapy-Redis安装和配置

# 安装Scrapy-Redis
pip install scrapy-redis# 安装Redis
pip install redis

3.2 Scrapy-Redis基本配置

# settings.py
# Scrapy-Redis配置# 启用Scrapy-Redis组件
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"# 不清理Redis队列,允许暂停/恢复爬虫
SCHEDULER_PERSIST = True# 请求序列化器
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'# Redis连接配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
# REDIS_PASSWORD = 'your_password'# Redis连接池配置
REDIS_PARAMS = {'socket_connect_timeout': 30,'socket_timeout': 30,'retry_on_timeout': True,'health_check_interval': 30,
}# 启用管道
ITEM_PIPELINES = {'scrapy_redis.pipelines.RedisPipeline': 300,'myproject.pipelines.CustomPipeline': 400,
}# 并发设置
CONCURRENT_REQUESTS = 32
CONCURRENT_REQUESTS_PER_DOMAIN = 8# 下载延迟
DOWNLOAD_DELAY = 1
RANDOMIZE_DOWNLOAD_DELAY = 0.5# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0# 日志配置
LOG_LEVEL = 'INFO'

3.3 创建分布式爬虫

# spiders/distributed_spider.py
import scrapy
from scrapy_redis.spiders import RedisSpider
import jsonclass DistributedNewsSpider(RedisSpider):"""分布式新闻爬虫"""name = 'distributed_news'redis_key = 'distributed_news:start_urls'# 自定义设置custom_settings = {'CONCURRENT_REQUESTS': 16,'DOWNLOAD_DELAY': 0.5,}def parse(self, response):"""解析新闻列表页"""# 提取新闻链接news_links = response.css('.news-item a::attr(href)').getall()for link in news_links:if link:yield response.follow(link,callback=self.parse_news,meta={'page_url': response.url})# 提取下一页链接next_page = response.css('.pagination .next::attr(href)').get()if next_page:yield response.follow(next_page, callback=self.parse)def parse_news(self, response):"""解析新闻详情页"""yield {'title': response.css('h1::text').get(),'content': ' '.join(response.css('.content p::text').getall()),'author': response.css('.author::text').get(),'publish_time': response.css('.publish-time::text').get(),'url': response.url,'source_page': response.meta.get('page_url'),'crawl_time': scrapy.utils.misc.load_object('time.time')()}class DistributedEcommerceSpider(RedisSpider):"""分布式电商爬虫"""name = 'distributed_ecommerce'redis_key = 'distributed_ecommerce:start_urls'def parse(self, response):"""解析商品列表页"""# 提取商品链接product_links = response.css('.product-item a::attr(href)').getall()for link in product_links:if link:yield response.follow(link,callback=self.parse_product,meta={'category': response.meta.get('category', ''),'page_num': response.meta.get('page_num', 1)})# 处理分页current_page = response.meta.get('page_num', 1)max_pages = response.meta.get('max_pages', 100)if current_page < max_pages:next_page_url = response.urljoin(f'?page={current_page + 1}')yield scrapy.Request(next_page_url,callback=self.parse,meta={'category': response.meta.get('category', ''),'page_num': current_page + 1,'max_pages': max_pages})def parse_product(self, response):"""解析商品详情页"""# 提取商品信息product_data = {'name': response.css('h1.product-title::text').get(),'price': response.css('.price .current::text').re_first(r'[\d.]+'),'original_price': response.css('.price .original::text').re_first(r'[\d.]+'),'description': ' '.join(response.css('.description p::text').getall()),'brand': response.css('.brand::text').get(),'category': response.meta.get('category', ''),'images': response.css('.product-images img::attr(src)').getall(),'specifications': self.extract_specifications(response),'reviews_count': response.css('.reviews-count::text').re_first(r'\d+'),'rating': response.css('.rating::attr(data-rating)').get(),'url': response.url,'crawl_time': scrapy.utils.misc.load_object('time.time')()}yield product_data# 提取评论链接reviews_url = response.css('.reviews-link::attr(href)').get()if reviews_url:yield response.follow(reviews_url,callback=self.parse_reviews,meta={'product_id': product_data.get('name', '')})def extract_specifications(self, response):"""提取商品规格"""specs = {}spec_items = response.css('.specifications .spec-item')for item in spec_items:key = item.css('.spec-key::text').get()value = item.css('.spec-value::text').get()if key and value:specs[key.strip()] = value.strip()return specsdef parse_reviews(self, response):"""解析商品评论"""reviews = response.css('.review-item')for review in reviews:yield {'product_id': response.meta.get('product_id', ''),'reviewer': review.css('.reviewer-name::text').get(),'rating': review.css('.review-rating::attr(data-rating)').get(),'content': review.css('.review-content::text').get(),'review_time': review.css('.review-time::text').get(),'helpful_count': review.css('.helpful-count::text').re_first(r'\d+'),'type': 'review'}# 处理评论分页next_reviews = response.css('.reviews-pagination .next::attr(href)').get()if next_reviews:yield response.follow(next_reviews,callback=self.parse_reviews,meta={'product_id': response.meta.get('product_id', '')})

3.4 自定义管道处理

# pipelines.py
import json
import redis
import pymongo
from scrapy.exceptions import DropItem
import loggingclass RedisValidationPipeline:"""Redis验证管道"""def __init__(self, redis_host, redis_port, redis_db):self.redis_host = redis_hostself.redis_port = redis_portself.redis_db = redis_dbself.redis_client = None@classmethoddef from_crawler(cls, crawler):return cls(redis_host=crawler.settings.get('REDIS_HOST'),redis_port=crawler.settings.get('REDIS_PORT'),redis_db=crawler.settings.get('REDIS_DB'),)def open_spider(self, spider):self.redis_client = redis.Redis(host=self.redis_host,port=self.redis_port,db=self.redis_db,decode_responses=True)def process_item(self, item, spider):# 验证必要字段required_fields = ['title', 'url']for field in required_fields:if not item.get(field):raise DropItem(f"缺少必要字段: {field}")# 检查重复item_hash = hash(str(sorted(item.items())))if self.redis_client.sismember(f"{spider.name}:items", item_hash):raise DropItem("重复的item")# 添加到去重集合self.redis_client.sadd(f"{spider.name}:items", item_hash)return itemclass MongoDBPipeline:"""MongoDB存储管道"""def __init__(self, mongo_uri, mongo_db):self.mongo_uri = mongo_uriself.mongo_db = mongo_dbself.client = Noneself.db = None@classmethoddef from_crawler(cls, crawler):return cls(mongo_uri=crawler.settings.get('MONGO_URI'),mongo_db=crawler.settings.get('MONGO_DATABASE'),)def open_spider(self, spider):self.client = pymongo.MongoClient(self.mongo_uri)self.db = self.client[self.mongo_db]def close_spider(self, spider):self.client.close()def process_item(self, item, spider):collection_name = spider.nameself.db[collection_name].insert_one(dict(item))logging.info(f"Item保存到MongoDB: {item.get('title', 'Unknown')}")return itemclass DataCleaningPipeline:"""数据清洗管道"""def process_item(self, item, spider):# 清洗文本字段text_fields = ['title', 'content', 'description']for field in text_fields:if item.get(field):# 去除多余空白item[field] = ' '.join(item[field].split())# 去除特殊字符item[field] = item[field].replace('\u00a0', ' ')# 清洗价格字段if item.get('price'):try:item['price'] = float(item['price'])except (ValueError, TypeError):item['price'] = None# 清洗URL字段if item.get('url'):item['url'] = item['url'].strip()return itemclass StatisticsPipeline:"""统计管道"""def __init__(self, redis_host, redis_port, redis_db):self.redis_host = redis_hostself.redis_port = redis_portself.redis_db = redis_dbself.redis_client = None@classmethoddef from_crawler(cls, crawler):return cls(redis_host=crawler.settings.get('REDIS_HOST'),redis_port=crawler.settings.get('REDIS_PORT'),redis_db=crawler.settings.get('REDIS_DB'),)def open_spider(self, spider):self.redis_client = redis.Redis(host=self.redis_host,port=self.redis_port,db=self.redis_db,decode_responses=True)def process_item(self, item, spider):# 更新统计信息stats_key = f"{spider.name}:stats"# 总item数量self.redis_client.hincrby(stats_key, 'total_items', 1)# 按类型统计item_type = item.get('type', 'unknown')self.redis_client.hincrby(stats_key, f'items_{item_type}', 1)# 按时间统计(小时)import datetimecurrent_hour = datetime.datetime.now().strftime('%Y-%m-%d-%H')self.redis_client.hincrby(stats_key, f'hour_{current_hour}', 1)return item

4. 分布式任务调度

4.1 任务调度器

# scheduler.py
import redis
import json
import time
import logging
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enumclass TaskStatus(Enum):PENDING = "pending"RUNNING = "running"COMPLETED = "completed"FAILED = "failed"RETRY = "retry"@dataclass
class Task:id: strspider_name: strstart_urls: List[str]settings: Dict[str, Any]priority: int = 0status: TaskStatus = TaskStatus.PENDINGcreated_time: float = Nonestarted_time: float = Nonecompleted_time: float = Noneretry_count: int = 0max_retries: int = 3def __post_init__(self):if self.created_time is None:self.created_time = time.time()class DistributedScheduler:"""分布式任务调度器"""def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):self.redis_client = redis.Redis(host=redis_host,port=redis_port,db=redis_db,decode_responses=True)self.logger = logging.getLogger(__name__)def submit_task(self, task: Task) -> bool:"""提交任务"""try:# 将任务序列化task_data = {'id': task.id,'spider_name': task.spider_name,'start_urls': task.start_urls,'settings': task.settings,'priority': task.priority,'status': task.status.value,'created_time': task.created_time,'retry_count': task.retry_count,'max_retries': task.max_retries}# 存储任务详情self.redis_client.hset(f"tasks:{task.id}",mapping=task_data)# 添加到优先级队列self.redis_client.zadd("task_queue",{task.id: task.priority})self.logger.info(f"任务提交成功: {task.id}")return Trueexcept Exception as e:self.logger.error(f"任务提交失败: {e}")return Falsedef get_next_task(self, worker_id: str) -> Task:"""获取下一个任务"""try:# 从优先级队列获取任务result = self.redis_client.zpopmax("task_queue")if not result:return Nonetask_id, priority = result[0]# 获取任务详情task_data = self.redis_client.hgetall(f"tasks:{task_id}")if not task_data:return None# 更新任务状态task_data['status'] = TaskStatus.RUNNING.valuetask_data['started_time'] = time.time()task_data['worker_id'] = worker_idself.redis_client.hset(f"tasks:{task_id}",mapping=task_data)# 创建Task对象task = Task(id=task_data['id'],spider_name=task_data['spider_name'],start_urls=json.loads(task_data['start_urls']),settings=json.loads(task_data['settings']),priority=int(task_data['priority']),status=TaskStatus(task_data['status']),created_time=float(task_data['created_time']),started_time=float(task_data['started_time']),retry_count=int(task_data['retry_count']),max_retries=int(task_data['max_retries']))self.logger.info(f"分配任务给worker {worker_id}: {task_id}")return taskexcept Exception as e:self.logger.error(f"获取任务失败: {e}")return Nonedef complete_task(self, task_id: str, success: bool = True) -> bool:"""完成任务"""try:task_data = self.redis_client.hgetall(f"tasks:{task_id}")if not task_data:return Falseif success:task_data['status'] = TaskStatus.COMPLETED.valuetask_data['completed_time'] = time.time()else:retry_count = int(task_data.get('retry_count', 0))max_retries = int(task_data.get('max_retries', 3))if retry_count < max_retries:# 重试任务task_data['status'] = TaskStatus.RETRY.valuetask_data['retry_count'] = retry_count + 1# 重新加入队列priority = int(task_data.get('priority', 0))self.redis_client.zadd("task_queue", {task_id: priority})else:# 标记为失败task_data['status'] = TaskStatus.FAILED.valuetask_data['completed_time'] = time.time()# 更新任务状态self.redis_client.hset(f"tasks:{task_id}", mapping=task_data)self.logger.info(f"任务完成: {task_id}, 成功: {success}")return Trueexcept Exception as e:self.logger.error(f"完成任务失败: {e}")return Falsedef get_task_status(self, task_id: str) -> Dict[str, Any]:"""获取任务状态"""task_data = self.redis_client.hgetall(f"tasks:{task_id}")return task_data if task_data else {}def get_queue_stats(self) -> Dict[str, int]:"""获取队列统计"""return {'pending_tasks': self.redis_client.zcard("task_queue"),'total_tasks': len(self.redis_client.keys("tasks:*"))}def cleanup_completed_tasks(self, days: int = 7):"""清理已完成的任务"""cutoff_time = time.time() - (days * 24 * 60 * 60)task_keys = self.redis_client.keys("tasks:*")cleaned_count = 0for key in task_keys:task_data = self.redis_client.hgetall(key)if (task_data.get('status') in [TaskStatus.COMPLETED.value, TaskStatus.FAILED.value] andfloat(task_data.get('completed_time', 0)) < cutoff_time):self.redis_client.delete(key)cleaned_count += 1self.logger.info(f"清理了 {cleaned_count} 个过期任务")return cleaned_count# 使用示例
scheduler = DistributedScheduler()# 创建任务
task = Task(id="news_crawl_001",spider_name="distributed_news",start_urls=["https://news.example.com"],settings={"CONCURRENT_REQUESTS": 16,"DOWNLOAD_DELAY": 0.5},priority=10
)# 提交任务
scheduler.submit_task(task)# 获取任务(模拟worker)
worker_task = scheduler.get_next_task("worker_001")
if worker_task:print(f"获取到任务: {worker_task.id}")# 模拟任务执行time.sleep(2)# 完成任务scheduler.complete_task(worker_task.id, success=True)

4.2 工作节点管理

# worker.py
import os
import time
import signal
import logging
import subprocess
from threading import Thread, Event
from typing import Dict, Anyclass WorkerNode:"""工作节点"""def __init__(self, worker_id: str, scheduler: DistributedScheduler):self.worker_id = worker_idself.scheduler = schedulerself.running = Falseself.current_task = Noneself.stop_event = Event()self.logger = logging.getLogger(__name__)# 注册信号处理signal.signal(signal.SIGINT, self._signal_handler)signal.signal(signal.SIGTERM, self._signal_handler)def _signal_handler(self, signum, frame):"""信号处理器"""self.logger.info(f"收到信号 {signum},准备停止worker")self.stop()def start(self):"""启动worker"""self.running = Trueself.logger.info(f"Worker {self.worker_id} 启动")while self.running and not self.stop_event.is_set():try:# 获取任务task = self.scheduler.get_next_task(self.worker_id)if task:self.current_task = taskself.logger.info(f"开始执行任务: {task.id}")# 执行任务success = self._execute_task(task)# 完成任务self.scheduler.complete_task(task.id, success)self.current_task = Noneelse:# 没有任务,等待self.logger.debug("没有可用任务,等待中...")time.sleep(5)except Exception as e:self.logger.error(f"Worker执行出错: {e}")if self.current_task:self.scheduler.complete_task(self.current_task.id, False)self.current_task = Nonetime.sleep(10)  # 错误后等待self.logger.info(f"Worker {self.worker_id} 停止")def _execute_task(self, task: Task) -> bool:"""执行任务"""try:# 构建Scrapy命令cmd = ['scrapy', 'crawl', task.spider_name,'-s', f'JOBDIR=jobs/{task.id}',  # 支持暂停/恢复]# 添加自定义设置for key, value in task.settings.items():cmd.extend(['-s', f'{key}={value}'])# 设置起始URLfor url in task.start_urls:# 将URL推送到Redisself.scheduler.redis_client.lpush(f"{task.spider_name}:start_urls",url)# 执行爬虫self.logger.info(f"执行命令: {' '.join(cmd)}")process = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)# 等待完成stdout, stderr = process.communicate()if process.returncode == 0:self.logger.info(f"任务 {task.id} 执行成功")return Trueelse:self.logger.error(f"任务 {task.id} 执行失败: {stderr}")return Falseexcept Exception as e:self.logger.error(f"执行任务 {task.id} 时出错: {e}")return Falsedef stop(self):"""停止worker"""self.running = Falseself.stop_event.set()# 如果有正在执行的任务,标记为重试if self.current_task:self.scheduler.complete_task(self.current_task.id, False)def get_status(self) -> Dict[str, Any]:"""获取worker状态"""return {'worker_id': self.worker_id,'running': self.running,'current_task': self.current_task.id if self.current_task else None,'uptime': time.time() - self.start_time if hasattr(self, 'start_time') else 0}class WorkerManager:"""工作节点管理器"""def __init__(self, scheduler: DistributedScheduler):self.scheduler = schedulerself.workers = {}self.logger = logging.getLogger(__name__)def start_worker(self, worker_id: str) -> bool:"""启动worker"""if worker_id in self.workers:self.logger.warning(f"Worker {worker_id} 已存在")return Falsetry:worker = WorkerNode(worker_id, self.scheduler)# 在新线程中启动workerworker_thread = Thread(target=worker.start, daemon=True)worker_thread.start()self.workers[worker_id] = {'worker': worker,'thread': worker_thread,'start_time': time.time()}self.logger.info(f"Worker {worker_id} 启动成功")return Trueexcept Exception as e:self.logger.error(f"启动worker {worker_id} 失败: {e}")return Falsedef stop_worker(self, worker_id: str) -> bool:"""停止worker"""if worker_id not in self.workers:self.logger.warning(f"Worker {worker_id} 不存在")return Falsetry:worker_info = self.workers[worker_id]worker_info['worker'].stop()# 等待线程结束worker_info['thread'].join(timeout=30)del self.workers[worker_id]self.logger.info(f"Worker {worker_id} 停止成功")return Trueexcept Exception as e:self.logger.error(f"停止worker {worker_id} 失败: {e}")return Falsedef get_workers_status(self) -> Dict[str, Dict[str, Any]]:"""获取所有worker状态"""status = {}for worker_id, worker_info in self.workers.items():status[worker_id] = worker_info['worker'].get_status()status[worker_id]['uptime'] = time.time() - worker_info['start_time']return statusdef scale_workers(self, target_count: int):"""扩缩容worker"""current_count = len(self.workers)if target_count > current_count:# 扩容for i in range(current_count, target_count):worker_id = f"worker_{i:03d}"self.start_worker(worker_id)elif target_count < current_count:# 缩容workers_to_stop = list(self.workers.keys())[target_count:]for worker_id in workers_to_stop:self.stop_worker(worker_id)self.logger.info(f"Worker数量调整为: {target_count}")# 使用示例
if __name__ == "__main__":# 配置日志logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')# 创建调度器和管理器scheduler = DistributedScheduler()manager = WorkerManager(scheduler)# 启动多个workermanager.scale_workers(3)try:# 保持运行while True:time.sleep(10)# 打印状态status = manager.get_workers_status()print(f"Workers状态: {status}")queue_stats = scheduler.get_queue_stats()print(f"队列状态: {queue_stats}")except KeyboardInterrupt:print("收到停止信号,正在关闭...")# 停止所有workerfor worker_id in list(manager.workers.keys()):manager.stop_worker(worker_id)

5. 数据去重和存储

5.1 高级去重策略

# deduplication.py
import hashlib
import redis
import json
import time
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod@dataclass
class DupeItem:"""去重项"""key: strdata: Dict[str, Any]timestamp: floatfingerprint: strclass DupeFilter(ABC):"""去重过滤器基类"""@abstractmethoddef is_duplicate(self, item: DupeItem) -> bool:"""检查是否重复"""pass@abstractmethoddef add_item(self, item: DupeItem) -> bool:"""添加项目"""passclass RedisDupeFilter(DupeFilter):"""Redis去重过滤器"""def __init__(self, redis_client: redis.Redis, key_prefix: str = "dupefilter"):self.redis_client = redis_clientself.key_prefix = key_prefixdef _get_key(self, spider_name: str) -> str:"""获取Redis键"""return f"{self.key_prefix}:{spider_name}"def is_duplicate(self, item: DupeItem) -> bool:"""检查是否重复"""key = self._get_key(item.key)return self.redis_client.sismember(key, item.fingerprint)def add_item(self, item: DupeItem) -> bool:"""添加项目"""key = self._get_key(item.key)return self.redis_client.sadd(key, item.fingerprint) > 0class BloomFilterDupeFilter(DupeFilter):"""布隆过滤器去重"""def __init__(self, redis_client: redis.Redis, key_prefix: str = "bloom", capacity: int = 1000000, error_rate: float = 0.001):self.redis_client = redis_clientself.key_prefix = key_prefixself.capacity = capacityself.error_rate = error_rate# 计算布隆过滤器参数import mathself.bit_size = int(-capacity * math.log(error_rate) / (math.log(2) ** 2))self.hash_count = int(self.bit_size * math.log(2) / capacity)def _get_key(self, spider_name: str) -> str:"""获取Redis键"""return f"{self.key_prefix}:{spider_name}"def _hash_functions(self, data: str) -> List[int]:"""生成多个哈希值"""hashes = []for i in range(self.hash_count):hash_obj = hashlib.md5(f"{data}:{i}".encode())hash_val = int(hash_obj.hexdigest(), 16) % self.bit_sizehashes.append(hash_val)return hashesdef is_duplicate(self, item: DupeItem) -> bool:"""检查是否重复"""key = self._get_key(item.key)hashes = self._hash_functions(item.fingerprint)# 检查所有位是否都被设置for hash_val in hashes:if not self.redis_client.getbit(key, hash_val):return Falsereturn Truedef add_item(self, item: DupeItem) -> bool:"""添加项目"""key = self._get_key(item.key)hashes = self._hash_functions(item.fingerprint)# 设置所有对应的位for hash_val in hashes:self.redis_client.setbit(key, hash_val, 1)return Trueclass TimestampDupeFilter(DupeFilter):"""带时间戳的去重过滤器"""def __init__(self, redis_client: redis.Redis, key_prefix: str = "timestamp_dupefilter",ttl: int = 86400):  # 24小时TTLself.redis_client = redis_clientself.key_prefix = key_prefixself.ttl = ttldef _get_key(self, spider_name: str) -> str:"""获取Redis键"""return f"{self.key_prefix}:{spider_name}"def is_duplicate(self, item: DupeItem) -> bool:"""检查是否重复"""key = self._get_key(item.key)score = self.redis_client.zscore(key, item.fingerprint)if score is None:return False# 检查是否在TTL时间内return (time.time() - score) < self.ttldef add_item(self, item: DupeItem) -> bool:"""添加项目"""key = self._get_key(item.key)# 使用有序集合存储,分数为时间戳self.redis_client.zadd(key, {item.fingerprint: item.timestamp})# 清理过期数据cutoff_time = time.time() - self.ttlself.redis_client.zremrangebyscore(key, 0, cutoff_time)return Trueclass AdvancedDupeManager:"""高级去重管理器"""def __init__(self, redis_client: redis.Redis):self.redis_client = redis_clientself.filters = {}def register_filter(self, name: str, filter_instance: DupeFilter):"""注册去重过滤器"""self.filters[name] = filter_instancedef create_fingerprint(self, data: Dict[str, Any], fields: Optional[List[str]] = None) -> str:"""创建数据指纹"""if fields:# 只使用指定字段filtered_data = {k: v for k, v in data.items() if k in fields}else:filtered_data = data# 排序并序列化sorted_data = json.dumps(filtered_data, sort_keys=True, ensure_ascii=False)# 生成MD5哈希return hashlib.md5(sorted_data.encode()).hexdigest()def is_duplicate(self, spider_name: str, data: Dict[str, Any],filter_name: str = "default", fingerprint_fields: Optional[List[str]] = None) -> bool:"""检查是否重复"""if filter_name not in self.filters:return Falsefingerprint = self.create_fingerprint(data, fingerprint_fields)item = DupeItem(key=spider_name,data=data,timestamp=time.time(),fingerprint=fingerprint)return self.filters[filter_name].is_duplicate(item)def add_item(self, spider_name: str, data: Dict[str, Any],filter_name: str = "default",fingerprint_fields: Optional[List[str]] = None) -> bool:"""添加项目到去重过滤器"""if filter_name not in self.filters:return Falsefingerprint = self.create_fingerprint(data, fingerprint_fields)item = DupeItem(key=spider_name,data=data,timestamp=time.time(),fingerprint=fingerprint)return self.filters[filter_name].add_item(item)def get_stats(self, spider_name: str) -> Dict[str, Any]:"""获取去重统计"""stats = {}for filter_name, filter_instance in self.filters.items():if isinstance(filter_instance, RedisDupeFilter):key = filter_instance._get_key(spider_name)stats[filter_name] = {'type': 'redis_set','count': self.redis_client.scard(key)}elif isinstance(filter_instance, TimestampDupeFilter):key = filter_instance._get_key(spider_name)stats[filter_name] = {'type': 'timestamp_zset','count': self.redis_client.zcard(key)}elif isinstance(filter_instance, BloomFilterDupeFilter):stats[filter_name] = {'type': 'bloom_filter','capacity': filter_instance.capacity,'error_rate': filter_instance.error_rate}return stats# 使用示例
redis_client = redis.Redis(decode_responses=True)
dupe_manager = AdvancedDupeManager(redis_client)# 注册不同类型的去重过滤器
dupe_manager.register_filter("default", RedisDupeFilter(redis_client))
dupe_manager.register_filter("bloom", BloomFilterDupeFilter(redis_client))
dupe_manager.register_filter("timestamp", TimestampDupeFilter(redis_client))# 测试去重
test_data = {'title': '测试新闻标题','url': 'https://example.com/news/1','content': '这是新闻内容...','publish_time': '2024-01-01'
}# 检查是否重复(使用URL字段作为指纹)
is_dup = dupe_manager.is_duplicate(spider_name="news_spider",data=test_data,filter_name="default",fingerprint_fields=['url']
)if not is_dup:# 添加到去重过滤器dupe_manager.add_item(spider_name="news_spider",data=test_data,filter_name="default",fingerprint_fields=['url'])print("新数据,已添加到去重过滤器")
else:print("重复数据,跳过")# 获取统计信息
stats = dupe_manager.get_stats("news_spider")
print(f"去重统计: {stats}")

5.2 分布式存储策略

# storage.py
import pymongo
import redis
import json
import time
import logging
from typing import Dict, Any, List, Optional
from abc import ABC, abstractmethod
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import threading@dataclass
class StorageItem:"""存储项"""collection: strdata: Dict[str, Any]timestamp: floatspider_name: stritem_id: Optional[str] = Noneclass StorageBackend(ABC):"""存储后端基类"""@abstractmethoddef save_item(self, item: StorageItem) -> bool:"""保存项目"""pass@abstractmethoddef save_batch(self, items: List[StorageItem]) -> int:"""批量保存"""pass@abstractmethoddef get_stats(self) -> Dict[str, Any]:"""获取统计信息"""passclass MongoStorageBackend(StorageBackend):"""MongoDB存储后端"""def __init__(self, mongo_uri: str, database: str):self.client = pymongo.MongoClient(mongo_uri)self.db = self.client[database]self.logger = logging.getLogger(__name__)def save_item(self, item: StorageItem) -> bool:"""保存单个项目"""try:collection = self.db[item.collection]# 添加元数据document = item.data.copy()document.update({'_spider_name': item.spider_name,'_crawl_time': item.timestamp,'_item_id': item.item_id})result = collection.insert_one(document)return result.inserted_id is not Noneexcept Exception as e:self.logger.error(f"保存到MongoDB失败: {e}")return Falsedef save_batch(self, items: List[StorageItem]) -> int:"""批量保存"""if not items:return 0try:# 按collection分组collections_data = {}for item in items:if item.collection not in collections_data:collections_data[item.collection] = []document = item.data.copy()document.update({'_spider_name': item.spider_name,'_crawl_time': item.timestamp,'_item_id': item.item_id})collections_data[item.collection].append(document)# 批量插入每个collectiontotal_saved = 0for collection_name, documents in collections_data.items():collection = self.db[collection_name]result = collection.insert_many(documents)total_saved += len(result.inserted_ids)return total_savedexcept Exception as e:self.logger.error(f"批量保存到MongoDB失败: {e}")return 0def get_stats(self) -> Dict[str, Any]:"""获取统计信息"""try:stats = {}# 获取所有collection的统计for collection_name in self.db.list_collection_names():collection = self.db[collection_name]stats[collection_name] = {'count': collection.count_documents({}),'size': self.db.command("collStats", collection_name).get('size', 0)}return statsexcept Exception as e:self.logger.error(f"获取MongoDB统计失败: {e}")return {}class RedisStorageBackend(StorageBackend):"""Redis存储后端"""def __init__(self, redis_client: redis.Redis, key_prefix: str = "items"):self.redis_client = redis_clientself.key_prefix = key_prefixself.logger = logging.getLogger(__name__)def _get_key(self, collection: str) -> str:"""获取Redis键"""return f"{self.key_prefix}:{collection}"def save_item(self, item: StorageItem) -> bool:"""保存单个项目"""try:key = self._get_key(item.collection)# 添加元数据document = item.data.copy()document.update({'_spider_name': item.spider_name,'_crawl_time': item.timestamp,'_item_id': item.item_id})# 使用列表存储self.redis_client.lpush(key, json.dumps(document, ensure_ascii=False))return Trueexcept Exception as e:self.logger.error(f"保存到Redis失败: {e}")return Falsedef save_batch(self, items: List[StorageItem]) -> int:"""批量保存"""if not items:return 0try:# 使用pipeline提高性能pipe = self.redis_client.pipeline()for item in items:key = self._get_key(item.collection)document = item.data.copy()document.update({'_spider_name': item.spider_name,'_crawl_time': item.timestamp,'_item_id': item.item_id})pipe.lpush(key, json.dumps(document, ensure_ascii=False))results = pipe.execute()return len([r for r in results if r])except Exception as e:self.logger.error(f"批量保存到Redis失败: {e}")return 0def get_stats(self) -> Dict[str, Any]:"""获取统计信息"""try:stats = {}# 获取所有相关键的统计pattern = f"{self.key_prefix}:*"keys = self.redis_client.keys(pattern)for key in keys:collection_name = key.replace(f"{self.key_prefix}:", "")stats[collection_name] = {'count': self.redis_client.llen(key),'memory_usage': self.redis_client.memory_usage(key) or 0}return statsexcept Exception as e:self.logger.error(f"获取Redis统计失败: {e}")return {}class DistributedStorageManager:"""分布式存储管理器"""def __init__(self, batch_size: int = 100, flush_interval: int = 30):self.backends = {}self.batch_size = batch_sizeself.flush_interval = flush_interval# 批量缓存self.batch_cache = []self.cache_lock = threading.Lock()# 线程池self.executor = ThreadPoolExecutor(max_workers=4)# 定时刷新self.flush_timer = Noneself._start_flush_timer()self.logger = logging.getLogger(__name__)def register_backend(self, name: str, backend: StorageBackend):"""注册存储后端"""self.backends[name] = backendself.logger.info(f"注册存储后端: {name}")def save_item(self, item: StorageItem, backend_names: Optional[List[str]] = None):"""保存项目"""if backend_names is None:backend_names = list(self.backends.keys())# 添加到批量缓存with self.cache_lock:self.batch_cache.append((item, backend_names))# 检查是否需要立即刷新if len(self.batch_cache) >= self.batch_size:self._flush_cache()def _flush_cache(self):"""刷新缓存"""if not self.batch_cache:return# 获取当前缓存current_cache = self.batch_cache.copy()self.batch_cache.clear()# 按后端分组backend_items = {}for item, backend_names in current_cache:for backend_name in backend_names:if backend_name not in backend_items:backend_items[backend_name] = []backend_items[backend_name].append(item)# 并行保存到各个后端futures = []for backend_name, items in backend_items.items():if backend_name in self.backends:future = self.executor.submit(self._save_to_backend,backend_name,items)futures.append(future)# 等待所有任务完成for future in futures:try:future.result(timeout=30)except Exception as e:self.logger.error(f"保存任务失败: {e}")def _save_to_backend(self, backend_name: str, items: List[StorageItem]) -> int:"""保存到指定后端"""try:backend = self.backends[backend_name]saved_count = backend.save_batch(items)self.logger.info(f"保存到 {backend_name}: {saved_count}/{len(items)}")return saved_countexcept Exception as e:self.logger.error(f"保存到 {backend_name} 失败: {e}")return 0def _start_flush_timer(self):"""启动定时刷新"""def flush_periodically():with self.cache_lock:if self.batch_cache:self._flush_cache()# 重新设置定时器self.flush_timer = threading.Timer(self.flush_interval, flush_periodically)self.flush_timer.start()def force_flush(self):"""强制刷新缓存"""with self.cache_lock:self._flush_cache()def shutdown(self):"""关闭存储管理器"""# 停止定时器if self.flush_timer:self.flush_timer.cancel()# 刷新剩余缓存self.force_flush()# 关闭线程池self.executor.shutdown(wait=True)self.logger.info("存储管理器已关闭")def get_all_stats(self) -> Dict[str, Dict[str, Any]]:"""获取所有后端统计"""all_stats = {}for backend_name, backend in self.backends.items():try:all_stats[backend_name] = backend.get_stats()except Exception as e:self.logger.error(f"获取 {backend_name} 统计失败: {e}")all_stats[backend_name] = {}return all_stats# 使用示例
if __name__ == "__main__":# 创建存储管理器storage_manager = DistributedStorageManager(batch_size=50, flush_interval=10)# 注册MongoDB后端mongo_backend = MongoStorageBackend(mongo_uri="mongodb://localhost:27017/",database="crawler_data")storage_manager.register_backend("mongodb", mongo_backend)# 注册Redis后端redis_client = redis.Redis(decode_responses=True)redis_backend = RedisStorageBackend(redis_client)storage_manager.register_backend("redis", redis_backend)# 保存测试数据test_item = StorageItem(collection="news",data={'title': '测试新闻','content': '这是测试内容','url': 'https://example.com/news/1'},timestamp=time.time(),spider_name="test_spider",item_id="news_001")# 保存到所有后端storage_manager.save_item(test_item)# 获取统计信息stats = storage_manager.get_all_stats()print(f"存储统计: {stats}")# 关闭storage_manager.shutdown()

6. 监控和管理

6.1 分布式爬虫监控系统

# monitoring.py
import time
import json
import redis
import psutil
import logging
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from flask import Flask, jsonify, render_template
import threading@dataclass
class NodeMetrics:"""节点指标"""node_id: strcpu_percent: floatmemory_percent: floatdisk_usage: floatnetwork_io: Dict[str, int]active_spiders: intcompleted_tasks: intfailed_tasks: inttimestamp: float@dataclass
class SpiderMetrics:"""爬虫指标"""spider_name: strnode_id: strstatus: str  # running, stopped, erroritems_scraped: intrequests_made: intresponse_received: interrors_count: intstart_time: floatlast_activity: floatclass MetricsCollector:"""指标收集器"""def __init__(self, redis_client: redis.Redis, node_id: str):self.redis_client = redis_clientself.node_id = node_idself.logger = logging.getLogger(__name__)# 收集间隔self.collection_interval = 30  # 30秒self.running = Falseself.collection_thread = Nonedef start_collection(self):"""开始收集指标"""self.running = Trueself.collection_thread = threading.Thread(target=self._collect_loop, daemon=True)self.collection_thread.start()self.logger.info(f"节点 {self.node_id} 开始收集指标")def stop_collection(self):"""停止收集指标"""self.running = Falseif self.collection_thread:self.collection_thread.join()self.logger.info(f"节点 {self.node_id} 停止收集指标")def _collect_loop(self):"""收集循环"""while self.running:try:# 收集节点指标node_metrics = self._collect_node_metrics()self._store_node_metrics(node_metrics)# 收集爬虫指标spider_metrics = self._collect_spider_metrics()for metrics in spider_metrics:self._store_spider_metrics(metrics)time.sleep(self.collection_interval)except Exception as e:self.logger.error(f"收集指标时出错: {e}")time.sleep(5)def _collect_node_metrics(self) -> NodeMetrics:"""收集节点指标"""# CPU使用率cpu_percent = psutil.cpu_percent(interval=1)# 内存使用率memory = psutil.virtual_memory()memory_percent = memory.percent# 磁盘使用率disk = psutil.disk_usage('/')disk_usage = (disk.used / disk.total) * 100# 网络IOnetwork = psutil.net_io_counters()network_io = {'bytes_sent': network.bytes_sent,'bytes_recv': network.bytes_recv,'packets_sent': network.packets_sent,'packets_recv': network.packets_recv}# 活跃爬虫数量(从Redis获取)active_spiders = len(self.redis_client.keys(f"spider:{self.node_id}:*"))# 任务统计completed_tasks = int(self.redis_client.get(f"stats:{self.node_id}:completed") or 0)failed_tasks = int(self.redis_client.get(f"stats:{self.node_id}:failed") or 0)return NodeMetrics(node_id=self.node_id,cpu_percent=cpu_percent,memory_percent=memory_percent,disk_usage=disk_usage,network_io=network_io,active_spiders=active_spiders,completed_tasks=completed_tasks,failed_tasks=failed_tasks,timestamp=time.time())def _collect_spider_metrics(self) -> List[SpiderMetrics]:"""收集爬虫指标"""metrics_list = []# 获取所有活跃爬虫spider_keys = self.redis_client.keys(f"spider:{self.node_id}:*")for key in spider_keys:try:spider_data = self.redis_client.hgetall(key)if spider_data:spider_name = spider_data.get('name', 'unknown')metrics = SpiderMetrics(spider_name=spider_name,node_id=self.node_id,status=spider_data.get('status', 'unknown'),items_scraped=int(spider_data.get('items_scraped', 0)),requests_made=int(spider_data.get('requests_made', 0)),response_received=int(spider_data.get('response_received', 0)),errors_count=int(spider_data.get('errors_count', 0)),start_time=float(spider_data.get('start_time', 0)),last_activity=float(spider_data.get('last_activity', 0)))metrics_list.append(metrics)except Exception as e:self.logger.error(f"收集爬虫指标失败 {key}: {e}")return metrics_listdef _store_node_metrics(self, metrics: NodeMetrics):"""存储节点指标"""try:# 存储到时间序列key = f"metrics:node:{self.node_id}"timestamp = int(metrics.timestamp)# 使用有序集合存储时间序列数据self.redis_client.zadd(key,{json.dumps(asdict(metrics)): timestamp})# 保留最近24小时的数据cutoff_time = timestamp - 86400self.redis_client.zremrangebyscore(key, 0, cutoff_time)# 存储最新指标self.redis_client.hset(f"latest:node:{self.node_id}",mapping=asdict(metrics))except Exception as e:self.logger.error(f"存储节点指标失败: {e}")def _store_spider_metrics(self, metrics: SpiderMetrics):"""存储爬虫指标"""try:# 存储到时间序列key = f"metrics:spider:{metrics.spider_name}:{self.node_id}"timestamp = int(time.time())self.redis_client.zadd(key,{json.dumps(asdict(metrics)): timestamp})# 保留最近24小时的数据cutoff_time = timestamp - 86400self.redis_client.zremrangebyscore(key, 0, cutoff_time)# 存储最新指标self.redis_client.hset(f"latest:spider:{metrics.spider_name}:{self.node_id}",mapping=asdict(metrics))except Exception as e:self.logger.error(f"存储爬虫指标失败: {e}")class MonitoringDashboard:"""监控仪表板"""def __init__(self, redis_client: redis.Redis):self.redis_client = redis_clientself.app = Flask(__name__)self.setup_routes()def setup_routes(self):"""设置路由"""@self.app.route('/')def dashboard():"""主仪表板"""return render_template('dashboard.html')@self.app.route('/api/nodes')def get_nodes():"""获取所有节点信息"""nodes = []# 获取所有节点的最新指标node_keys = self.redis_client.keys("latest:node:*")for key in node_keys:node_data = self.redis_client.hgetall(key)if node_data:# 转换数据类型for k, v in node_data.items():if k in ['cpu_percent', 'memory_percent', 'disk_usage', 'timestamp']:node_data[k] = float(v)elif k in ['active_spiders', 'completed_tasks', 'failed_tasks']:node_data[k] = int(v)elif k == 'network_io':node_data[k] = json.loads(v)nodes.append(node_data)return jsonify(nodes)@self.app.route('/api/spiders')def get_spiders():"""获取所有爬虫信息"""spiders = []# 获取所有爬虫的最新指标spider_keys = self.redis_client.keys("latest:spider:*")for key in spider_keys:spider_data = self.redis_client.hgetall(key)if spider_data:# 转换数据类型for k, v in spider_data.items():if k in ['items_scraped', 'requests_made', 'response_received', 'errors_count']:spider_data[k] = int(v)elif k in ['start_time', 'last_activity']:spider_data[k] = float(v)spiders.append(spider_data)return jsonify(spiders)@self.app.route('/api/metrics/node/<node_id>')def get_node_metrics(node_id):"""获取节点历史指标"""key = f"metrics:node:{node_id}"# 获取最近1小时的数据end_time = time.time()start_time = end_time - 3600metrics_data = self.redis_client.zrangebyscore(key, start_time, end_time, withscores=True)metrics = []for data, timestamp in metrics_data:metric = json.loads(data)metrics.append(metric)return jsonify(metrics)@self.app.route('/api/stats/overview')def get_overview_stats():"""获取总览统计"""stats = {'total_nodes': len(self.redis_client.keys("latest:node:*")),'total_spiders': len(self.redis_client.keys("latest:spider:*")),'total_tasks_completed': 0,'total_tasks_failed': 0,'total_items_scraped': 0}# 统计所有节点的任务数node_keys = self.redis_client.keys("latest:node:*")for key in node_keys:node_data = self.redis_client.hgetall(key)stats['total_tasks_completed'] += int(node_data.get('completed_tasks', 0))stats['total_tasks_failed'] += int(node_data.get('failed_tasks', 0))# 统计所有爬虫的抓取数spider_keys = self.redis_client.keys("latest:spider:*")for key in spider_keys:spider_data = self.redis_client.hgetall(key)stats['total_items_scraped'] += int(spider_data.get('items_scraped', 0))return jsonify(stats)def run(self, host='0.0.0.0', port=5000, debug=False):"""运行仪表板"""self.app.run(host=host, port=port, debug=debug)# 使用示例
if __name__ == "__main__":# 配置日志logging.basicConfig(level=logging.INFO)# 创建Redis客户端redis_client = redis.Redis(decode_responses=True)# 启动指标收集器collector = MetricsCollector(redis_client, "node_001")collector.start_collection()try:# 启动监控仪表板dashboard = MonitoringDashboard(redis_client)dashboard.run(debug=True)except KeyboardInterrupt:print("停止监控系统...")collector.stop_collection()

7. 实战案例:构建大规模新闻爬虫系统

7.1 系统架构设计

# news_crawler_system.py
import scrapy
from scrapy_redis.spiders import RedisSpider
import json
import time
from typing import Dict, Any, Listclass DistributedNewsSpider(RedisSpider):"""分布式新闻爬虫"""name = 'distributed_news_crawler'redis_key = 'news_crawler:start_urls'# 自定义设置custom_settings = {'CONCURRENT_REQUESTS': 32,'CONCURRENT_REQUESTS_PER_DOMAIN': 8,'DOWNLOAD_DELAY': 0.5,'RANDOMIZE_DOWNLOAD_DELAY': 0.5,# 启用自动限速'AUTOTHROTTLE_ENABLED': True,'AUTOTHROTTLE_START_DELAY': 1,'AUTOTHROTTLE_MAX_DELAY': 10,'AUTOTHROTTLE_TARGET_CONCURRENCY': 2.0,# 管道配置'ITEM_PIPELINES': {'news_crawler.pipelines.ValidationPipeline': 300,'news_crawler.pipelines.DuplicationPipeline': 400,'news_crawler.pipelines.DataCleaningPipeline': 500,'news_crawler.pipelines.StoragePipeline': 600,},# 中间件配置'DOWNLOADER_MIDDLEWARES': {'news_crawler.middlewares.ProxyMiddleware': 350,'news_crawler.middlewares.UserAgentMiddleware': 400,'news_crawler.middlewares.RetryMiddleware': 500,}}def parse(self, response):"""解析新闻列表页"""# 提取新闻链接news_links = response.css('.news-list .news-item a::attr(href)').getall()for link in news_links:if link:yield response.follow(link,callback=self.parse_news,meta={'category': response.meta.get('category', ''),'source_site': response.meta.get('source_site', ''),'list_page': response.url})# 处理分页next_page = response.css('.pagination .next::attr(href)').get()if next_page:yield response.follow(next_page,callback=self.parse,meta=response.meta)def parse_news(self, response):"""解析新闻详情页"""# 提取新闻内容title = response.css('h1.article-title::text').get()content_paragraphs = response.css('.article-content p::text').getall()content = '\n'.join(content_paragraphs) if content_paragraphs else ''# 提取元数据author = response.css('.article-meta .author::text').get()publish_time = response.css('.article-meta .publish-time::attr(datetime)').get()tags = response.css('.article-tags .tag::text').getall()# 提取图片images = response.css('.article-content img::attr(src)').getall()# 构建新闻项news_item = {'title': title.strip() if title else '','content': content.strip(),'author': author.strip() if author else '','publish_time': publish_time,'tags': tags,'images': images,'url': response.url,'category': response.meta.get('category', ''),'source_site': response.meta.get('source_site', ''),'list_page': response.meta.get('list_page', ''),'crawl_time': time.time(),'spider_name': self.name}yield news_item# 提取相关新闻链接related_links = response.css('.related-news a::attr(href)').getall()for link in related_links[:5]:  # 限制相关新闻数量if link:yield response.follow(link,callback=self.parse_news,meta=response.meta)# 启动脚本
class NewsSystemLauncher:"""新闻系统启动器"""def __init__(self):self.redis_client = redis.Redis(decode_responses=True)self.scheduler = DistributedScheduler(redis_client=self.redis_client)self.worker_manager = WorkerManager(self.scheduler)self.storage_manager = DistributedStorageManager()# 配置存储后端self._setup_storage()# 配置监控self.metrics_collector = MetricsCollector(self.redis_client, "master_node")self.dashboard = MonitoringDashboard(self.redis_client)def _setup_storage(self):"""设置存储后端"""# MongoDB存储mongo_backend = MongoStorageBackend(mongo_uri="mongodb://localhost:27017/",database="news_crawler")self.storage_manager.register_backend("mongodb", mongo_backend)# Redis缓存redis_backend = RedisStorageBackend(self.redis_client)self.storage_manager.register_backend("redis", redis_backend)def add_news_sources(self, sources: List[Dict[str, Any]]):"""添加新闻源"""for source in sources:task = Task(id=f"news_task_{int(time.time())}_{source['name']}",spider_name="distributed_news_crawler",start_urls=source['urls'],settings={'CONCURRENT_REQUESTS': source.get('concurrent_requests', 16),'DOWNLOAD_DELAY': source.get('download_delay', 1.0),},priority=source.get('priority', 5))self.scheduler.submit_task(task)print(f"添加新闻源任务: {source['name']}")def start_system(self, worker_count: int = 4):"""启动系统"""print("启动分布式新闻爬虫系统...")# 启动指标收集self.metrics_collector.start_collection()# 启动工作节点self.worker_manager.scale_workers(worker_count)# 启动监控仪表板dashboard_thread = threading.Thread(target=self.dashboard.run,kwargs={'host': '0.0.0.0', 'port': 5000},daemon=True)dashboard_thread.start()print(f"系统启动完成,监控地址: http://localhost:5000")print(f"工作节点数量: {worker_count}")def stop_system(self):"""停止系统"""print("停止分布式新闻爬虫系统...")# 停止工作节点for worker_id in list(self.worker_manager.workers.keys()):self.worker_manager.stop_worker(worker_id)# 停止指标收集self.metrics_collector.stop_collection()# 关闭存储管理器self.storage_manager.shutdown()print("系统已停止")# 使用示例
if __name__ == "__main__":# 创建系统启动器launcher = NewsSystemLauncher()# 定义新闻源news_sources = [{'name': 'tech_news','urls': ['https://techcrunch.com', 'https://arstechnica.com'],'priority': 10,'concurrent_requests': 16,'download_delay': 0.5},{'name': 'business_news','urls': ['https://bloomberg.com', 'https://reuters.com'],'priority': 8,'concurrent_requests': 12,'download_delay': 1.0},{'name': 'general_news','urls': ['https://cnn.com', 'https://bbc.com'],'priority': 5,'concurrent_requests': 8,'download_delay': 1.5}]try:# 启动系统launcher.start_system(worker_count=6)# 添加新闻源launcher.add_news_sources(news_sources)# 保持运行while True:time.sleep(60)# 打印系统状态stats = launcher.scheduler.get_queue_stats()print(f"队列状态: {stats}")worker_status = launcher.worker_manager.get_workers_status()print(f"工作节点状态: {len(worker_status)} 个节点运行中")except KeyboardInterrupt:launcher.stop_system()

8. 练习与作业

8.1 基础练习

  1. Redis队列操作:实现一个简单的任务队列系统
  2. Scrapy-Redis配置:配置一个基本的分布式爬虫
  3. 数据去重:实现基于Redis的URL去重功能

8.2 进阶练习

  1. 负载均衡:实现智能任务分配算法
  2. 故障恢复:设计节点故障自动恢复机制
  3. 性能优化:优化分布式爬虫的并发性能

8.3 实战项目

  1. 电商价格监控系统:构建分布式电商价格爬虫
  2. 社交媒体分析平台:爬取多平台社交媒体数据
  3. 新闻聚合系统:实现实时新闻聚合和分析

9. 常见问题与解决方案

9.1 Redis连接问题

  • 连接超时:调整连接池参数和超时设置
  • 内存不足:优化数据结构和清理策略
  • 集群配置:正确配置Redis集群和哨兵

9.2 任务调度问题

  • 任务丢失:实现任务持久化和恢复机制
  • 重复执行:完善去重和状态管理
  • 负载不均:优化任务分配算法

9.3 性能优化

  • 网络延迟:使用本地Redis和优化网络配置
  • 序列化开销:选择高效的序列化方案
  • 内存使用:监控和优化内存使用

10. 下节预告

下一课我们将学习**《Python爬虫第11课:大规模数据处理与存储优化》**,内容包括:

  • 大数据存储方案选择
  • 数据库性能优化
  • 数据清洗和ETL流程
  • 实时数据处理
  • 数据可视化和分析

通过本课的学习,你已经掌握了分布式爬虫的核心技术,能够构建可扩展的大规模爬虫系统。下一课将重点关注数据处理和存储优化,帮助你构建完整的数据采集和处理流水线。interval, flush_periodically)
self.flush_timer.start()

    self.flush_timer = threading.Timer(self.flush_

结尾
  • 希望对初学者有帮助;致力于办公自动化的小小程序员一枚
  • 希望能得到大家的【❤️一个免费关注❤️】感谢!
  • 求个 🤞 关注 🤞 +❤️ 喜欢 ❤️ +👍 收藏 👍
  • 此外还有办公自动化专栏,欢迎大家订阅:Python办公自动化专栏
  • 此外还有爬虫专栏,欢迎大家订阅:Python爬虫基础专栏
  • 此外还有Python基础专栏,欢迎大家订阅:Python基础学习专栏

http://www.dtcms.com/a/519877.html

相关文章:

  • 2025年运维部网络安全工作小结1025
  • 基于 Python 的坦克大战小程序,使用 Pygame 库开发
  • 做网站前期需求分析收费么互联网营销是做什么
  • 在 MacOS 中安装 MySQL 8
  • 宿迁网站建设宿迁网站域名的组成
  • Gartner发布AI安全创新指南:用集成的模块化AI安全平台赢得AI安全之战
  • FastGateway 核心技术原理拆解手册
  • vue3中实现渐变三层柱状图
  • 7.IXM6U系统时钟
  • 算子相关通用概念整理
  • Java 操作 PDF 图像:轻松驾驭 PDF 文档中的图片
  • OS_2 进程与线程(进程管理)
  • 网站规划 评价谷歌三件套一键安装
  • 腾讯云服务器如何建设网站百度关键词排名突然没了
  • 【论文笔记】LTX-Video极致速度的视频生成模型
  • 安科瑞防逆流解决方案:物联网技术赋能光伏能源高效管理
  • 如何根据不同的场景选择YOLO相应的基座模型
  • 【OJ】二叉树的经典OJ题
  • Excel 重磅更新 AI技术走进公式
  • div嵌套影响网站收录建设公司需要网站吗
  • VBA技术资料MF383:处理Excel中存储为文本的数据
  • 注册网站的公司名字网站项目建设流程图
  • 大数据存储组件分别位于数据仓库的哪一层
  • Dubbo应用开发之RPC直连开发
  • 坦电容做电源滤波,放在陶瓷电容的前面还是后面好
  • 北京城建亚泰建设集团有限公司网站首页wordpress中文教程 下载
  • 虚幻引擎5 GAS开发俯视角RPG游戏 P06-13 属性菜单 - 边框值
  • Bash 括号:()、{}、[]、$()、$(() )、${}、[[]] 到底有什么区别?
  • bash执行脚本 CondaError: Run ‘conda init‘ before ‘conda activate‘
  • 虚幻引擎5 GAS开发俯视角RPG游戏 P06-11 初始化生命值和法力值属性