当前位置: 首页 > news >正文

AI驱动的智能爬虫架构与应用

AI驱动的智能爬虫架构与应用

作为一名热爱编程的技术爱好者,我一直在探索如何将人工智能技术融入到传统的网络爬虫中,让爬虫变得更加智能、高效和人性化。今天,我想和大家分享我在AI驱动智能爬虫领域的探索心得和实践经验。

引言:传统爬虫的局限性与AI的机遇

在互联网信息爆炸的时代,传统爬虫面临着诸多挑战:反爬虫机制日益复杂、网页结构变化频繁、数据质量参差不齐等。作为一名程序员,我深刻体会到传统爬虫的局限性,同时也看到了AI技术带来的巨大机遇。

AI驱动的智能爬虫不仅能够自动适应网页结构变化,还能智能识别反爬虫策略,甚至能够理解网页内容的语义,实现真正的"智能爬取"。但是面临网站不断更新迭代的防御机制也让我认识到AI并不是万能的,本文将深入探讨AI智能爬虫的架构设计、核心算法和实际应用。

第一部分:AI智能爬虫的架构设计

1.1 整体架构概览

AI智能爬虫采用分层架构设计,每一层都有明确的职责和边界。让我用代码来展示这个架构:

# AI智能爬虫核心架构
class AISmartCrawler:def __init__(self):# 核心组件初始化self.nlp_engine = NLPEngine()           # 自然语言处理引擎self.vision_engine = VisionEngine()     # 计算机视觉引擎self.behavior_engine = BehaviorEngine() # 智能行为引擎self.adaptation_engine = AdaptationEngine() # 自适应引擎def crawl(self, target_url, strategy='intelligent'):"""智能爬取主流程"""try:# 1. 智能页面分析page_info = self.analyze_page(target_url)# 2. 动态策略选择if strategy == 'intelligent':strategy = self.select_strategy(page_info)# 3. 执行爬取策略result = self.execute_strategy(target_url, strategy, page_info)# 4. 智能数据提取extracted_data = self.extract_data_intelligently(result)# 5. 质量评估与优化quality_score = self.evaluate_quality(extracted_data)return {'data': extracted_data,'quality_score': quality_score,'strategy_used': strategy,'metadata': page_info}except Exception as e:self.handle_error(e)return None

1.2 核心组件详解

1.2.1 自然语言处理引擎

NLP引擎是AI爬虫的"大脑",负责理解网页内容的语义和结构:

import spacy
from transformers import pipeline
import reclass NLPEngine:def __init__(self):# 加载预训练模型self.nlp = spacy.load("zh_core_web_sm")self.sentiment_analyzer = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-jd-binary-chinese")self.text_classifier = pipeline("text-classification", model="uer/roberta-base-chinese-cluener")def analyze_content(self, html_content):"""智能分析网页内容"""# 提取纯文本text_content = self.extract_text(html_content)# 实体识别entities = self.extract_entities(text_content)# 情感分析sentiment = self.analyze_sentiment(text_content)# 主题分类topic = self.classify_topic(text_content)return {'text': text_content,'entities': entities,'sentiment': sentiment,'topic': topic,'structure': self.analyze_structure(html_content)}def extract_entities(self, text):"""提取命名实体"""doc = self.nlp(text)entities = {}for ent in doc.ents:if ent.label_ not in entities:entities[ent.label_] = []entities[ent.label_].append(ent.text)return entitiesdef analyze_structure(self, html_content):"""分析HTML结构特征"""# 使用正则表达式分析HTML结构structure_info = {'title_tags': len(re.findall(r'<title[^>]*>(.*?)</title>', html_content, re.I)),'heading_tags': len(re.findall(r'<h[1-6][^>]*>(.*?)</h[1-6]>', html_content, re.I)),'link_density': len(re.findall(r'<a[^>]*>', html_content)) / max(len(html_content), 1),'form_count': len(re.findall(r'<form[^>]*>', html_content)),'script_count': len(re.findall(r'<script[^>]*>', html_content))}return structure_info
1.2.2 计算机视觉引擎

视觉引擎让爬虫能够"看到"网页,理解视觉元素和布局:

import cv2
import numpy as np
from PIL import Image
import pytesseract
from selenium import webdriver
from selenium.webdriver.chrome.options import Optionsclass VisionEngine:def __init__(self):self.driver = self.setup_driver()def setup_driver(self):"""设置无头浏览器"""chrome_options = Options()chrome_options.add_argument("--headless")chrome_options.add_argument("--no-sandbox")chrome_options.add_argument("--disable-dev-shm-usage")return webdriver.Chrome(options=chrome_options)def capture_page_screenshot(self, url):"""捕获页面截图"""try:self.driver.get(url)# 等待页面加载self.driver.implicitly_wait(5)# 获取页面尺寸page_width = self.driver.execute_script("return document.body.scrollWidth")page_height = self.driver.execute_script("return document.body.scrollHeight")# 设置窗口大小self.driver.set_window_size(page_width, page_height)# 截图screenshot = self.driver.get_screenshot_as_png()return Image.open(io.BytesIO(screenshot))except Exception as e:print(f"截图失败: {e}")return Nonedef analyze_layout(self, screenshot):"""分析页面布局"""# 转换为OpenCV格式img = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)# 边缘检测edges = cv2.Canny(img, 50, 150)# 轮廓检测contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)# 分析布局结构layout_info = {'content_blocks': len(contours),'text_regions': self.detect_text_regions(img),'image_regions': self.detect_image_regions(img),'navigation_elements': self.detect_navigation(img)}return layout_infodef detect_text_regions(self, img):"""检测文本区域"""# 使用Tesseract进行OCRtext_regions = []# 转换为灰度图gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 文本检测try:text = pytesseract.image_to_data(gray, output_type=pytesseract.Output.DICT)for i, conf in enumerate(text['conf']):if conf > 60:  # 置信度阈值x, y, w, h = text['left'][i], text['top'][i], text['width'][i], text['height'][i]text_regions.append({'bbox': (x, y, w, h),'text': text['text'][i],'confidence': conf})except:passreturn text_regions
1.2.3 智能行为引擎

行为引擎模拟人类浏览行为,避免被反爬虫系统检测:

import random
import time
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ECclass BehaviorEngine:def __init__(self):self.behavior_patterns = self.load_behavior_patterns()def load_behavior_patterns(self):"""加载行为模式库"""return {'human_like': {'scroll_patterns': ['smooth', 'random', 'bounce'],'click_patterns': ['direct', 'hover_first', 'double_click'],'typing_patterns': ['natural', 'fast', 'slow'],'wait_patterns': ['random', 'fixed', 'adaptive']},'bot_like': {'scroll_patterns': ['instant'],'click_patterns': ['instant'],'typing_patterns': ['instant'],'wait_patterns': ['minimal']}}def simulate_human_behavior(self, driver, behavior_type='human_like'):"""模拟人类行为"""pattern = self.behavior_patterns[behavior_type]# 随机滚动self.simulate_scrolling(driver, pattern['scroll_patterns'])# 随机等待self.simulate_waiting(pattern['wait_patterns'])# 鼠标移动self.simulate_mouse_movement(driver)def simulate_scrolling(self, driver, scroll_patterns):"""模拟滚动行为"""pattern = random.choice(scroll_patterns)if pattern == 'smooth':# 平滑滚动total_height = driver.execute_script("return document.body.scrollHeight")current_position = 0step = 100while current_position < total_height:driver.execute_script(f"window.scrollTo(0, {current_position})")current_position += steptime.sleep(random.uniform(0.1, 0.3))elif pattern == 'random':# 随机滚动for _ in range(random.randint(3, 8)):scroll_y = random.randint(100, 800)driver.execute_script(f"window.scrollBy(0, {scroll_y})")time.sleep(random.uniform(0.5, 2.0))elif pattern == 'bounce':# 弹跳式滚动positions = [100, 300, 200, 500, 400, 700, 600]for pos in positions:driver.execute_script(f"window.scrollTo(0, {pos})")time.sleep(random.uniform(0.3, 1.0))def simulate_mouse_movement(self, driver):"""模拟鼠标移动"""try:# 随机选择页面元素elements = driver.find_elements(By.TAG_NAME, "a")[:10]if elements:element = random.choice(elements)# 创建动作链actions = ActionChains(driver)# 移动到元素actions.move_to_element(element)actions.perform()# 随机等待time.sleep(random.uniform(0.5, 2.0))except Exception as e:print(f"鼠标移动模拟失败: {e}")def simulate_waiting(self, wait_patterns):"""模拟等待行为"""pattern = random.choice(wait_patterns)if pattern == 'random':wait_time = random.uniform(1.0, 5.0)elif pattern == 'fixed':wait_time = 2.0elif pattern == 'adaptive':wait_time = random.uniform(0.5, 3.0)else:wait_time = 1.0time.sleep(wait_time)

第二部分:核心算法与智能策略

2.1 自适应爬取策略

AI爬虫能够根据目标网站的特征自动选择最适合的爬取策略:

class AdaptiveStrategySelector:def __init__(self):self.strategy_weights = {'selenium': 0.3,'requests': 0.4,'playwright': 0.2,'scrapy': 0.1}def select_strategy(self, page_info, historical_data=None):"""智能选择爬取策略"""# 计算策略得分strategy_scores = {}# Selenium策略得分strategy_scores['selenium'] = self.calculate_selenium_score(page_info)# Requests策略得分strategy_scores['requests'] = self.calculate_requests_score(page_info)# Playwright策略得分strategy_scores['playwright'] = self.calculate_playwright_score(page_info)# Scrapy策略得分strategy_scores['scrapy'] = self.calculate_scrapy_score(page_info)# 结合历史数据调整权重if historical_data:strategy_scores = self.adjust_with_history(strategy_scores, historical_data)# 选择得分最高的策略best_strategy = max(strategy_scores, key=strategy_scores.get)return best_strategy, strategy_scoresdef calculate_selenium_score(self, page_info):"""计算Selenium策略得分"""score = 0# JavaScript依赖if page_info.get('javascript_heavy', False):score += 30# 动态内容if page_info.get('dynamic_content', False):score += 25# 反爬虫检测if page_info.get('anti_crawler', False):score += 20# 页面复杂度complexity = page_info.get('complexity', 0)score += min(complexity * 2, 25)return scoredef calculate_requests_score(self, page_info):"""计算Requests策略得分"""score = 0# 静态内容if page_info.get('static_content', True):score += 30# API接口if page_info.get('has_api', False):score += 25# 简单结构if page_info.get('complexity', 0) < 5:score += 25# 无反爬虫if not page_info.get('anti_crawler', False):score += 20return score

2.2 智能反爬虫对抗

AI爬虫能够学习和适应各种反爬虫策略:

class AntiCrawlerDetector:def __init__(self):self.detection_patterns = self.load_detection_patterns()self.evasion_strategies = self.load_evasion_strategies()def load_detection_patterns(self):"""加载检测模式"""return {'behavior_patterns': ['too_fast','too_regular','no_mouse_movement','no_scroll_behavior'],'technical_patterns': ['user_agent_suspicious','ip_frequency','request_headers','javascript_execution'],'content_patterns': ['honeypot_links','hidden_fields','javascript_challenges','captcha_verification']}def detect_anti_crawler(self, response, page_content):"""检测反爬虫机制"""detection_results = {'behavior_detected': False,'technical_detected': False,'content_detected': False,'risk_level': 'low','recommendations': []}# 检测行为模式behavior_detected = self.detect_behavior_patterns(response)if behavior_detected:detection_results['behavior_detected'] = Truedetection_results['recommendations'].append('调整请求频率和行为模式')# 检测技术模式technical_detected = self.detect_technical_patterns(response)if technical_detected:detection_results['technical_detected'] = Truedetection_results['recommendations'].append('优化请求头和用户代理')# 检测内容模式content_detected = self.detect_content_patterns(page_content)if content_detected:detection_results['content_detected'] = Truedetection_results['recommendations'].append('处理验证码和JavaScript挑战')# 计算风险等级detection_results['risk_level'] = self.calculate_risk_level(detection_results)return detection_resultsdef detect_behavior_patterns(self, response):"""检测行为模式"""suspicious_patterns = []# 检查响应时间异常if response.elapsed.total_seconds() < 0.1:suspicious_patterns.append('response_too_fast')# 检查状态码异常if response.status_code in [403, 429, 503]:suspicious_patterns.append('status_code_suspicious')# 检查响应头异常headers = response.headersif 'cf-ray' in headers or 'x-powered-by' in headers:suspicious_patterns.append('cloudflare_detected')return len(suspicious_patterns) > 0def detect_content_patterns(self, page_content):"""检测内容模式"""suspicious_patterns = []# 检测蜜罐链接honeypot_patterns = ['style="display:none"','class="hidden"','visibility:hidden']for pattern in honeypot_patterns:if pattern in page_content:suspicious_patterns.append('honeypot_detected')break# 检测验证码captcha_patterns = ['captcha','recaptcha','验证码','请输入验证码']for pattern in captcha_patterns:if pattern.lower() in page_content.lower():suspicious_patterns.append('captcha_detected')break# 检测JavaScript挑战js_challenge_patterns = ['javascript:void(0)','onclick="return false"','function challenge()']for pattern in js_challenge_patterns:if pattern in page_content:suspicious_patterns.append('js_challenge_detected')breakreturn len(suspicious_patterns) > 0def calculate_risk_level(self, detection_results):"""计算风险等级"""risk_score = 0if detection_results['behavior_detected']:risk_score += 30if detection_results['technical_detected']:risk_score += 40if detection_results['content_detected']:risk_score += 30if risk_score >= 80:return 'high'elif risk_score >= 50:return 'medium'else:return 'low'

2.3 智能数据提取

AI爬虫能够理解网页结构,智能提取所需数据:

class IntelligentDataExtractor:def __init__(self):self.extraction_models = self.load_extraction_models()self.schema_validator = SchemaValidator()def load_extraction_models(self):"""加载数据提取模型"""return {'product_info': ProductExtractionModel(),'article_content': ArticleExtractionModel(),'ecommerce_data': EcommerceExtractionModel(),'social_media': SocialMediaExtractionModel()}def extract_data_intelligently(self, page_content, target_schema):"""智能数据提取"""# 识别页面类型page_type = self.classify_page_type(page_content)# 选择对应的提取模型if page_type in self.extraction_models:extractor = self.extraction_models[page_type]extracted_data = extractor.extract(page_content, target_schema)else:# 使用通用提取器extracted_data = self.generic_extractor.extract(page_content, target_schema)# 数据清洗和验证cleaned_data = self.clean_and_validate_data(extracted_data, target_schema)return cleaned_datadef classify_page_type(self, page_content):"""分类页面类型"""# 使用机器学习模型分类features = self.extract_page_features(page_content)# 简单的规则分类(实际项目中应使用训练好的模型)if 'product' in page_content.lower() or 'price' in page_content.lower():return 'product_info'elif 'article' in page_content.lower() or 'content' in page_content.lower():return 'article_content'elif 'shop' in page_content.lower() or 'cart' in page_content.lower():return 'ecommerce_data'elif 'social' in page_content.lower() or 'share' in page_content.lower():return 'social_media'else:return 'generic'def extract_page_features(self, page_content):"""提取页面特征"""features = {'text_length': len(page_content),'link_count': page_content.count('<a'),'image_count': page_content.count('<img'),'form_count': page_content.count('<form'),'script_count': page_content.count('<script'),'style_count': page_content.count('<style'),'div_count': page_content.count('<div'),'span_count': page_content.count('<span')}return featuresclass ProductExtractionModel:"""产品信息提取模型"""def extract(self, page_content, target_schema):"""提取产品信息"""extracted_data = {}# 提取产品名称product_name = self.extract_product_name(page_content)if product_name:extracted_data['product_name'] = product_name# 提取价格信息price_info = self.extract_price_info(page_content)if price_info:extracted_data['price'] = price_info# 提取产品描述description = self.extract_description(page_content)if description:extracted_data['description'] = description# 提取产品图片images = self.extract_product_images(page_content)if images:extracted_data['images'] = images# 提取规格参数specifications = self.extract_specifications(page_content)if specifications:extracted_data['specifications'] = specificationsreturn extracted_datadef extract_product_name(self, page_content):"""提取产品名称"""# 使用多种策略提取产品名称strategies = [self.extract_by_h1_tag,self.extract_by_title_tag,self.extract_by_class_name,self.extract_by_id_name]for strategy in strategies:result = strategy(page_content)if result:return resultreturn Nonedef extract_by_h1_tag(self, page_content):"""通过H1标签提取"""import reh1_pattern = r'<h1[^>]*>(.*?)</h1>'matches = re.findall(h1_pattern, page_content, re.I)if matches:# 清理HTML标签clean_text = re.sub(r'<[^>]+>', '', matches[0])return clean_text.strip()return Nonedef extract_price_info(self, page_content):"""提取价格信息"""import re# 价格模式price_patterns = [r'¥\s*(\d+(?:\.\d{2})?)',  # 人民币r'$\s*(\d+(?:\.\d{2})?)',   # 美元r'(\d+(?:\.\d{2})?)\s*元',  # 中文元r'价格[::]\s*(\d+(?:\.\d{2})?)',  # 中文价格r'Price[::]\s*(\d+(?:\.\d{2})?)'  # 英文价格]for pattern in price_patterns:matches = re.findall(pattern, page_content)if matches:try:price = float(matches[0])return {'amount': price,'currency': self.detect_currency(pattern),'original_text': matches[0]}except ValueError:continuereturn Nonedef detect_currency(self, pattern):"""检测货币类型"""if '¥' in pattern or '元' in pattern:return 'CNY'elif '$' in pattern:return 'USD'else:return 'Unknown'

第三部分:实际应用与性能优化

3.1 分布式爬虫架构

为了处理大规模爬取任务,我们需要设计分布式架构:

import redis
import json
import hashlib
from celery import Celery
from datetime import datetime, timedeltaclass DistributedCrawlerManager:def __init__(self):self.redis_client = redis.Redis(host='localhost', port=6379, db=0)self.celery_app = Celery('crawler_tasks', broker='redis://localhost:6379/0')self.task_queue = 'crawler_queue'def distribute_task(self, task_config):"""分发爬取任务"""# 生成任务IDtask_id = self.generate_task_id(task_config)# 检查任务是否已存在if self.redis_client.exists(f"task:{task_id}"):return {"status": "exists", "task_id": task_id}# 任务去重if self.is_duplicate_task(task_config):return {"status": "duplicate", "message": "任务已存在"}# 任务优先级计算priority = self.calculate_priority(task_config)# 分发到合适的workerworker_id = self.select_worker(task_config)# 创建任务task_info = {'id': task_id,'config': task_config,'priority': priority,'worker_id': worker_id,'status': 'pending','created_at': datetime.now().isoformat(),'estimated_duration': self.estimate_duration(task_config)}# 存储任务信息self.redis_client.hset(f"task:{task_id}", mapping=task_info)# 发送到Celery队列task = self.celery_app.send_task('crawler_tasks.execute_crawl',args=[task_config],kwargs={'task_id': task_id},queue=self.task_queue,priority=priority)return {"status": "distributed","task_id": task_id,"celery_task_id": task.id}def generate_task_id(self, task_config):"""生成任务ID"""# 基于配置内容生成唯一IDconfig_str = json.dumps(task_config, sort_keys=True)return hashlib.md5(config_str.encode()).hexdigest()def is_duplicate_task(self, task_config):"""检查任务是否重复"""# 检查URL是否在最近的任务中url = task_config.get('url')if not url:return False# 检查最近24小时内的任务yesterday = datetime.now() - timedelta(hours=24)# 获取所有任务task_keys = self.redis_client.keys("task:*")for key in task_keys:task_data = self.redis_client.hgetall(key)if task_data.get(b'config'):try:config = json.loads(task_data[b'config'].decode())if config.get('url') == url:created_at = datetime.fromisoformat(task_data[b'created_at'].decode())if created_at > yesterday:return Trueexcept:continuereturn Falsedef calculate_priority(self, task_config):"""计算任务优先级"""priority = 5  # 默认优先级# URL重要性url_importance = task_config.get('importance', 'normal')if url_importance == 'high':priority += 3elif url_importance == 'low':priority -= 2# 任务类型task_type = task_config.get('type', 'general')if task_type == 'urgent':priority += 4elif task_type == 'scheduled':priority -= 1# 资源需求resource_demand = task_config.get('resource_demand', 'normal')if resource_demand == 'high':priority -= 1  # 高资源需求降低优先级# 时间约束deadline = task_config.get('deadline')if deadline:time_left = datetime.fromisoformat(deadline) - datetime.now()if time_left.total_seconds() < 3600:  # 1小时内priority += 5elif time_left.total_seconds() < 86400:  # 24小时内priority += 2return max(1, min(10, priority))  # 限制在1-10范围内def select_worker(self, task_config):"""选择合适的工作节点"""# 获取可用worker列表available_workers = self.get_available_workers()if not available_workers:return None# 根据任务需求选择workerbest_worker = Nonebest_score = -1for worker in available_workers:score = self.calculate_worker_score(worker, task_config)if score > best_score:best_score = scorebest_worker = workerreturn best_workerdef get_available_workers(self):"""获取可用worker列表"""worker_keys = self.redis_client.keys("worker:*")available_workers = []for key in worker_keys:worker_data = self.redis_client.hgetall(key)if worker_data.get(b'status') == b'available':available_workers.append({'id': worker_data[b'id'].decode(),'capabilities': json.loads(worker_data[b'capabilities'].decode()),'current_load': int(worker_data[b'current_load']),'max_load': int(worker_data[b'max_load'])})return available_workersdef calculate_worker_score(self, worker, task_config):"""计算worker适合度得分"""score = 0# 负载能力load_ratio = worker['current_load'] / worker['max_load']if load_ratio < 0.5:score += 30elif load_ratio < 0.8:score += 20else:score += 10# 能力匹配required_capabilities = task_config.get('required_capabilities', [])worker_capabilities = worker['capabilities']for capability in required_capabilities:if capability in worker_capabilities:score += 20# 地理位置(如果相关)if 'location' in task_config and 'location' in worker:if task_config['location'] == worker['location']:score += 15return score

3.2 性能监控与优化

实时监控爬虫性能,自动优化参数:

import psutil
import time
from collections import deque
import threadingclass PerformanceMonitor:def __init__(self):self.metrics = {'requests_per_second': deque(maxlen=100),'success_rate': deque(maxlen=100),'response_time': deque(maxlen=100),'memory_usage': deque(maxlen=100),'cpu_usage': deque(maxlen=100),'error_rate': deque(maxlen=100)}self.optimization_rules = self.load_optimization_rules()self.monitoring_thread = Noneself.is_monitoring = Falsedef start_monitoring(self):"""开始性能监控"""if not self.is_monitoring:self.is_monitoring = Trueself.monitoring_thread = threading.Thread(target=self._monitor_loop)self.monitoring_thread.daemon = Trueself.monitoring_thread.start()def stop_monitoring(self):"""停止性能监控"""self.is_monitoring = Falseif self.monitoring_thread:self.monitoring_thread.join()def _monitor_loop(self):"""监控循环"""while self.is_monitoring:try:# 收集系统指标self.collect_system_metrics()# 分析性能self.analyze_performance()# 执行优化self.execute_optimizations()# 等待下次监控time.sleep(5)except Exception as e:print(f"监控错误: {e}")time.sleep(10)def collect_system_metrics(self):"""收集系统指标"""# CPU使用率cpu_percent = psutil.cpu_percent(interval=1)self.metrics['cpu_usage'].append({'timestamp': time.time(),'value': cpu_percent})# 内存使用率memory = psutil.virtual_memory()self.metrics['memory_usage'].append({'timestamp': time.time(),'value': memory.percent})# 网络统计network = psutil.net_io_counters()# 磁盘I/Odisk = psutil.disk_io_counters()def analyze_performance(self):"""分析性能指标"""current_time = time.time()# 计算平均响应时间if self.metrics['response_time']:avg_response_time = sum(item['value'] for item in self.metrics['response_time']) / len(self.metrics['response_time'])# 响应时间趋势分析recent_times = [item['value'] for item in list(self.metrics['response_time'])[-10:]]if len(recent_times) >= 2:trend = (recent_times[-1] - recent_times[0]) / len(recent_times)if trend > 0.1:  # 响应时间增加self.trigger_optimization('response_time_increasing', {'current_avg': avg_response_time,'trend': trend})# 成功率分析if self.metrics['success_rate']:recent_success_rate = [item['value'] for item in list(self.metrics['success_rate'])[-20:]]avg_success_rate = sum(recent_success_rate) / len(recent_success_rate)if avg_success_rate < 0.8:  # 成功率低于80%self.trigger_optimization('low_success_rate', {'current_rate': avg_success_rate,'threshold': 0.8})# 错误率分析if self.metrics['error_rate']:recent_error_rate = [item['value'] for item in list(self.metrics['error_rate'])[-20:]]avg_error_rate = sum(recent_error_rate) / len(recent_error_rate)if avg_error_rate > 0.2:  # 错误率高于20%self.trigger_optimization('high_error_rate', {'current_rate': avg_error_rate,'threshold': 0.2})def trigger_optimization(self, issue_type, context):"""触发优化"""print(f"检测到性能问题: {issue_type}, 上下文: {context}")# 查找对应的优化规则if issue_type in self.optimization_rules:rule = self.optimization_rules[issue_type]# 检查是否满足执行条件if self.should_execute_optimization(rule, context):self.execute_optimization(rule, context)def should_execute_optimization(self, rule, context):"""检查是否应该执行优化"""# 检查冷却时间last_execution = rule.get('last_execution', 0)current_time = time.time()if current_time - last_execution < rule.get('cooldown', 300):  # 默认5分钟冷却return False# 检查触发条件conditions = rule.get('conditions', [])for condition in conditions:if not self.evaluate_condition(condition, context):return Falsereturn Truedef execute_optimization(self, rule, context):"""执行优化"""try:print(f"执行优化: {rule['name']}")# 执行优化动作actions = rule.get('actions', [])for action in actions:self.execute_action(action, context)# 更新执行时间rule['last_execution'] = time.time()# 记录优化日志self.log_optimization(rule, context)except Exception as e:print(f"优化执行失败: {e}")def execute_action(self, action, context):"""执行具体优化动作"""action_type = action['type']if action_type == 'adjust_request_rate':self.adjust_request_rate(action['params'])elif action_type == 'change_user_agent':self.change_user_agent(action['params'])elif action_type == 'enable_proxy':self.enable_proxy(action['params'])elif action_type == 'adjust_timeout':self.adjust_timeout(action['params'])else:print(f"未知的优化动作类型: {action_type}")def adjust_request_rate(self, params):"""调整请求频率"""new_rate = params.get('new_rate')if new_rate:# 这里应该通知爬虫引擎调整请求频率print(f"调整请求频率到: {new_rate} 请求/秒")def change_user_agent(self, params):"""更换用户代理"""new_user_agent = params.get('user_agent')if new_user_agent:# 这里应该通知爬虫引擎更换用户代理print(f"更换用户代理到: {new_user_agent}")def enable_proxy(self, params):"""启用代理"""proxy_config = params.get('proxy_config')if proxy_config:# 这里应该通知爬虫引擎启用代理print(f"启用代理: {proxy_config}")
http://www.dtcms.com/a/331136.html

相关文章:

  • openvsx搭建私有插件仓库
  • 设计模式有哪些
  • 2022_ISG_CTF-rechall详解(含思考过程)
  • MixOne在macOS上安装碰到的问题
  • 上网行为安全概述
  • 【跨越 6G 安全、防御与智能协作:从APT检测到多模态通信再到AI代理语言革命】
  • 数据结构:用链表实现队列(Implementing Queue Using List)
  • python批量爬虫实战之windows主题爬取
  • 移位操作符技巧
  • 8. 函数简介
  • DeepSeek补全IBM MQ 9.4 REST API 执行命令的PPT
  • Mac chrome浏览器下载DevEco Studio 6.0.0 Beta2失败
  • 分布式锁—Redisson的公平锁
  • 线上故障定位:从报警到根因的实战指南
  • Eureka故障处理大汇总
  • 使用 Git Submodules 管理前后端分离项目
  • scikit-learn/sklearn学习|广义线性回归 Logistic regression的三种成本函数
  • Docker 核心技术:Namespace
  • Java毕业设计选题推荐 |基于SpringBoot的健身爱好线上互动与打卡社交平台系统 互动打卡小程序系统
  • 2019 GPT2原文 Language Models are Unsupervised Multitask Learners - Reading Notes
  • [激光原理与应用-274]:理论 - 波动光学 - 光是电磁波,无线电磁波可以通过天线接收和发送,为什么可见光不可以?
  • Visual Studio2019/2022离线安装完整教程(含闪退解决方法)
  • 无人机双目视觉设计要点概述!
  • SOD-YOLO:基于YOLO的无人机图像小目标检测增强方法
  • 值传递+move 优化数据传递
  • torchvision中数据集的使用与DataLoader 小土堆pytorch记录
  • Autoppt-AI驱动的演示文稿生成工具
  • 深入理解 RAG:检索增强生成技术详解
  • 通过机器学习框架实现Android手写识别输入功能
  • 【开源工具】基于硬件指纹的“一机一码”软件授权系统全实现(附完整源码)