python爬虫简便框架,附带百度操作完整案例
WebScraper 工具类使用笔记:灵活易用的爬虫框架
序言:
安装好对应插件,驱动
pip install selenium webdriver-manager
1. 类名:WebScraper
这个工具类封装了浏览器控制、页面交互和数据提取的核心功能,旨在提供一个灵活且易于使用的爬虫框架。
2. 初始化方法
__init__(browser_type="chrome", headless=True, user_agent=None, proxy=None, timeout=30, debug=False)
-
功能:初始化爬虫实例,配置浏览器和开发工具
-
参数
:
browser_type
:浏览器类型,可选值:“chrome”, “firefox”, “edge”headless
:是否以无头模式运行浏览器user_agent
:自定义 User-Agent 字符串proxy
:代理服务器配置,格式:{"http": "http://proxy.example.com:8080", "https": "http://proxy.example.com:8080"}
timeout
:操作超时时间(秒)debug
:是否开启调试模式
3. 浏览器控制方法
open_url(url)
-
功能:打开指定 URL
-
参数
:
url
:目标 URL
-
返回:页面加载完成状态
close()
- 功能:关闭浏览器实例
- 参数:无
refresh()
- 功能:刷新当前页面
- 参数:无
go_back()
- 功能:返回上一页
- 参数:无
4. 元素定位与交互方法
find_element(selector, by="css", timeout=None)
-
功能:查找单个元素
-
参数
:
selector
:选择器字符串by
:选择器类型,可选值:“css”, “xpath”, “id”, “class”, “name”, “link_text”, “partial_link_text”, “tag_name”timeout
:等待元素出现的超时时间(秒)
-
返回:找到的元素对象或 None
find_elements(selector, by="css", timeout=None)
- 功能:查找多个元素
- 参数:同
find_element
- 返回:找到的元素列表
click(element=None, selector=None, by="css", timeout=None)
-
功能:点击元素
-
参数
:
element
:元素对象(优先使用)selector
:选择器字符串(当 element 为 None 时使用)by
:选择器类型timeout
:等待元素出现的超时时间
-
返回:操作结果
type_text(text, element=None, selector=None, by="css", timeout=None, clear_first=True)
-
功能:在输入框中输入文本
-
参数
:
text
:要输入的文本element
:元素对象(优先使用)selector
:选择器字符串(当 element 为 None 时使用)by
:选择器类型timeout
:等待元素出现的超时时间clear_first
:是否先清空输入框
-
返回:操作结果
5. 滚动方法
scroll(direction="down", amount=None, element=None, smooth=True, duration=0.5)
-
功能:滚动页面或元素
-
参数
:
direction
:滚动方向,可选值:“up”, “down”, “left”, “right”amount
:滚动量(像素),默认为页面高度 / 宽度的 50%element
:要滚动的元素,默认为整个页面smooth
:是否平滑滚动duration
:滚动持续时间(秒)
-
返回:操作结果
scroll_to_element(element=None, selector=None, by="css", timeout=None, align="center")
-
功能:滚动到指定元素
-
参数
:
element
:元素对象(优先使用)selector
:选择器字符串(当 element 为 None 时使用)by
:选择器类型timeout
:等待元素出现的超时时间align
:元素对齐方式,可选值:“top”, “center”, “bottom”
-
返回:操作结果
scroll_to_bottom(element=None, steps=10, delay=0.5)
-
功能:滚动到页面或元素底部
-
参数
:
element
:要滚动的元素,默认为整个页面steps
:滚动步数delay
:每步之间的延迟(秒)
-
返回:操作结果
6. 翻页方法
next_page(selector=None, method="click", url_template=None, page_param="page", next_page_func=None)
-
功能:翻到下一页
-
参数
:
selector
:下一页按钮的选择器(当 method 为 “click” 时使用)method
:翻页方法,可选值:“click”, “url”, “function”url_template
:URL 模板(当 method 为 “url” 时使用)page_param
:页码参数名(当 method 为 “url” 时使用)next_page_func
:自定义翻页函数(当 method 为 “function” 时使用)
-
返回:翻页是否成功
has_next_page(selector=None, check_func=None)
-
功能:检查是否有下一页
-
参数
:
selector
:下一页按钮的选择器check_func
:自定义检查函数
-
返回:布尔值,表示是否有下一页
set_page(page_num, url_template=None, page_param="page")
-
功能:跳转到指定页码
-
参数
:
page_num
:目标页码url_template
:URL 模板page_param
:页码参数名
-
返回:操作结果
7. 数据提取方法
get_text(element=None, selector=None, by="css", timeout=None)
- 功能:获取元素的文本内容
- 参数:同
find_element
- 返回:文本内容或 None
get_attribute(attribute, element=None, selector=None, by="css", timeout=None)
-
功能:获取元素的属性值
-
参数
:
attribute
:属性名- 其他参数同
find_element
-
返回:属性值或 None
extract_data(template)
-
功能:根据模板提取页面数据
-
参数
:
template
:数据提取模板,格式为字典,键为数据字段名,值为选择器或提取函数
-
返回:提取的数据
8. DevTools 方法
start_capturing_network()
- 功能:开始捕获网络请求
- 参数:无
stop_capturing_network()
- 功能:停止捕获网络请求
- 参数:无
get_captured_requests(filter_type=None, url_pattern=None)
-
功能:获取捕获的网络请求
-
参数
:
filter_type
:请求类型过滤,可选值:“xhr”, “fetch”, “script”, “image”, “stylesheet” 等url_pattern
:URL 模式过滤,支持正则表达式
-
返回:符合条件的请求列表
add_request_interceptor(pattern, handler_func)
-
功能:添加请求拦截器
-
参数
:
pattern
:URL 匹配模式handler_func
:处理函数,接收请求对象,可修改请求或返回自定义响应
-
返回:拦截器 ID
9. 辅助方法
wait_for_element(selector, by="css", timeout=None, condition="visible")
-
功能:等待元素满足特定条件
-
参数
:
selector
:选择器字符串by
:选择器类型timeout
:超时时间condition
:等待条件,可选值:“visible”, “present”, “clickable”, “invisible”, “not_present”
-
返回:元素对象或 None
execute_script(script, *args)
-
功能:执行 JavaScript 代码
-
参数
:
script
:JavaScript 代码*args
:传递给 JavaScript 的参数
-
返回:JavaScript 执行结果
set_delay(min_delay, max_delay=None)
-
功能:设置操作之间的随机延迟
-
参数
:
min_delay
:最小延迟时间(秒)max_delay
:最大延迟时间(秒),如果为 None 则固定为 min_delay
-
返回:无
take_screenshot(path=None)
-
功能:截取当前页面截图
-
参数
:
path
:保存路径,如果为 None 则返回图像数据
-
返回:如果 path 为 None,返回图像二进制数据;否则返回保存结果
10. 代码实现部分
python
import time
import random
import json
import re
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from selenium.webdriver.common.action_chains import ActionChains# Optional: For easier driver management
try:from webdriver_manager.chrome import ChromeDriverManagerfrom webdriver_manager.firefox import GeckoDriverManagerfrom webdriver_manager.microsoft import EdgeChromiumDriverManagerWEBDRIVER_MANAGER_AVAILABLE = True
except ImportError:WEBDRIVER_MANAGER_AVAILABLE = Falseprint("Consider installing webdriver-manager for easier driver setup: pip install webdriver-manager")class WebScraper:_BY_MAP = {"css": By.CSS_SELECTOR,"xpath": By.XPATH,"id": By.ID,"class": By.CLASS_NAME, # Note: find by class name only works for a single class"name": By.NAME,"link_text": By.LINK_TEXT,"partial_link_text": By.PARTIAL_LINK_TEXT,"tag_name": By.TAG_NAME,}def __init__(self, browser_type="chrome", headless=True, user_agent=None, proxy=None, timeout=30, debug=False):self.browser_type = browser_type.lower()self.headless = headlessself.user_agent = user_agentself.proxy = proxyself.timeout = timeoutself.debug = debugself.driver = Noneself.current_page_num = 1 # For URL-based paginationself._min_delay = 0.5self._max_delay = 1.5self._network_requests_raw = [] # To store JS collected network entriesself._setup_driver()def _print_debug(self, message):if self.debug:print(f"[DEBUG] {message}")def _setup_driver(self):self._print_debug(f"Setting up {self.browser_type} browser...")options = Noneservice = Noneif self.browser_type == "chrome":options = webdriver.ChromeOptions()if self.user_agent:options.add_argument(f"user-agent={self.user_agent}")if self.headless:options.add_argument("--headless")options.add_argument("--window-size=1920x1080") # Often needed for headlessif self.proxy:if "http" in self.proxy: # Basic proxy, for more auth use selenium-wireoptions.add_argument(f"--proxy-server={self.proxy['http']}")elif "https" in self.proxy: # Selenium typically uses one proxy for alloptions.add_argument(f"--proxy-server={self.proxy['https']}")options.add_argument("--disable-gpu")options.add_argument("--no-sandbox")options.add_argument("--disable-dev-shm-usage") # Overcome limited resource problemsif WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.chrome.service.Service(ChromeDriverManager().install())self.driver = webdriver.Chrome(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Chrome failed: {e}. Falling back to default PATH.")self.driver = webdriver.Chrome(options=options) # Fallback to PATHelse:self.driver = webdriver.Chrome(options=options)elif self.browser_type == "firefox":options = webdriver.FirefoxOptions()if self.user_agent:options.set_preference("general.useragent.override", self.user_agent)if self.headless:options.add_argument("--headless")if self.proxy:# Firefox proxy setup is more involved via preferencesif "http" in self.proxy:host, port = self.proxy['http'].replace('http://', '').split(':')options.set_preference("network.proxy.type", 1)options.set_preference("network.proxy.http", host)options.set_preference("network.proxy.http_port", int(port))if "https" in self.proxy: # Assuming same proxy for httpshost, port = self.proxy['https'].replace('https://', '').split(':')options.set_preference("network.proxy.ssl", host)options.set_preference("network.proxy.ssl_port", int(port))# options.set_preference("network.proxy.share_proxy_settings", True) # if one proxy for allif WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.firefox.service.Service(GeckoDriverManager().install())self.driver = webdriver.Firefox(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Firefox failed: {e}. Falling back to default PATH.")self.driver = webdriver.Firefox(options=options)else:self.driver = webdriver.Firefox(options=options)elif self.browser_type == "edge":options = webdriver.EdgeOptions()if self.user_agent:options.add_argument(f"user-agent={self.user_agent}")if self.headless:options.add_argument("--headless") # Edge uses Chromium engineoptions.add_argument("--window-size=1920x1080")if self.proxy and "http" in self.proxy: # Basic proxyoptions.add_argument(f"--proxy-server={self.proxy['http']}")options.add_argument("--disable-gpu")if WEBDRIVER_MANAGER_AVAILABLE:try:service = webdriver.edge.service.Service(EdgeChromiumDriverManager().install())self.driver = webdriver.Edge(service=service, options=options)except Exception as e:self._print_debug(f"WebDriverManager for Edge failed: {e}. Falling back to default PATH.")self.driver = webdriver.Edge(options=options)else:self.driver = webdriver.Edge(options=options)else:raise ValueError(f"Unsupported browser: {self.browser_type}")self.driver.implicitly_wait(self.timeout / 2) # Implicit wait for elementsself.driver.set_page_load_timeout(self.timeout)self._print_debug(f"{self.browser_type} browser setup complete.")def _get_selenium_by(self, by_string):by_string = by_string.lower()if by_string not in self._BY_MAP:raise ValueError(f"Invalid selector type: {by_string}. Supported: {list(self._BY_MAP.keys())}")return self._BY_MAP[by_string]def _perform_delay(self):time.sleep(random.uniform(self._min_delay, self._max_delay))# --- Browser Control ---def open_url(self, url):self._print_debug(f"Opening URL: {url}")try:self.driver.get(url)self._perform_delay()# A simple check, for true "loaded" status, might need to wait for specific elementreturn self.driver.execute_script("return document.readyState") == "complete"except WebDriverException as e:self._print_debug(f"Error opening URL {url}: {e}")return Falsedef close(self):if self.driver:self._print_debug("Closing browser.")self.driver.quit()self.driver = Nonedef refresh(self):self._print_debug("Refreshing page.")self.driver.refresh()self._perform_delay()def go_back(self):self._print_debug("Going back to previous page.")self.driver.back()self._perform_delay()# --- Element Location & Interaction ---def find_element(self, selector, by="css", timeout=None):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Finding element by {by}: '{selector}' with timeout {wait_timeout}s")try:wait = WebDriverWait(self.driver, wait_timeout)element = wait.until(EC.presence_of_element_located((self._get_selenium_by(by), selector)))return elementexcept TimeoutException:self._print_debug(f"Element not found by {by}: '{selector}' within {wait_timeout}s.")return Noneexcept Exception as e:self._print_debug(f"Error finding element by {by}: '{selector}': {e}")return Nonedef find_elements(self, selector, by="css", timeout=None):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Finding elements by {by}: '{selector}' with timeout {wait_timeout}s")try:# Wait for at least one element to be present to ensure page readinessWebDriverWait(self.driver, wait_timeout).until(EC.presence_of_all_elements_located((self._get_selenium_by(by), selector)))# Then find all elements without further explicit wait beyond implicitreturn self.driver.find_elements(self._get_selenium_by(by), selector)except TimeoutException:self._print_debug(f"No elements found by {by}: '{selector}' within {wait_timeout}s.")return []except Exception as e:self._print_debug(f"Error finding elements by {by}: '{selector}': {e}")return []def click(self, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.wait_for_element(selector, by, timeout, condition="clickable")if element:try:self._print_debug(f"Clicking element: {element.tag_name} (selector: {selector})")# Try JavaScript click if standard click is interceptedtry:element.click()except WebDriverException: # e.g. ElementClickInterceptedExceptionself._print_debug("Standard click failed, trying JavaScript click.")self.driver.execute_script("arguments[0].click();", element)self._perform_delay()return Trueexcept Exception as e:self._print_debug(f"Error clicking element: {e}")return Falseself._print_debug("Element not provided or not found for click.")return Falsedef type_text(self, text, element=None, selector=None, by="css", timeout=None, clear_first=True):if not element and selector:element = self.wait_for_element(selector, by, timeout, condition="visible")if element:try:self._print_debug(f"Typing text '{text}' into element: {element.tag_name} (selector: {selector})")if clear_first:element.clear()element.send_keys(text)self._perform_delay()return Trueexcept Exception as e:self._print_debug(f"Error typing text: {e}")return Falseself._print_debug("Element not provided or not found for typing.")return False# --- Scrolling Methods ---def scroll(self, direction="down", amount=None, element=None, smooth=True, duration=0.5):self._print_debug(f"Scrolling {direction}...")script = ""target = "window"if element:target = "arguments[0]" # Element will be passed as arguments[0]behavior = "smooth" if smooth else "auto"if direction == "down":scroll_val = amount if amount is not None else f"{target}.innerHeight / 2" if element else "window.innerHeight / 2"script = f"{target}.scrollBy({{ top: {scroll_val}, left: 0, behavior: '{behavior}' }});"elif direction == "up":scroll_val = amount if amount is not None else f"{target}.innerHeight / 2" if element else "window.innerHeight / 2"script = f"{target}.scrollBy({{ top: -{scroll_val}, left: 0, behavior: '{behavior}' }});"elif direction == "left":scroll_val = amount if amount is not None else f"{target}.innerWidth / 2" if element else "window.innerWidth / 2"script = f"{target}.scrollBy({{ top: 0, left: -{scroll_val}, behavior: '{behavior}' }});"elif direction == "right":scroll_val = amount if amount is not None else f"{target}.innerWidth / 2" if element else "window.innerWidth / 2"script = f"{target}.scrollBy({{ top: 0, left: {scroll_val}, behavior: '{behavior}' }});"else:self._print_debug(f"Invalid scroll direction: {direction}")return Falsetry:if element:self.driver.execute_script(script, element)else:self.driver.execute_script(script)time.sleep(duration) # Allow time for smooth scroll to completereturn Trueexcept Exception as e:self._print_debug(f"Error during scroll: {e}")return Falsedef scroll_to_element(self, element=None, selector=None, by="css", timeout=None, align="center"):if not element and selector:element = self.find_element(selector, by, timeout)if element:self._print_debug(f"Scrolling to element (selector: {selector}) with align: {align}")try:# 'block' can be 'start', 'center', 'end', or 'nearest'.# 'inline' is similar for horizontal.# For simplicity, map to 'block' options.align_js = "{ behavior: 'smooth', block: 'center', inline: 'nearest' }"if align == "top":align_js = "{ behavior: 'smooth', block: 'start', inline: 'nearest' }"elif align == "bottom":align_js = "{ behavior: 'smooth', block: 'end', inline: 'nearest' }"self.driver.execute_script(f"arguments[0].scrollIntoView({align_js});", element)self._perform_delay() # Give it a moment to scrollreturn Trueexcept Exception as e:self._print_debug(f"Error scrolling to element: {e}")return Falseself._print_debug("Element not provided or not found for scroll_to_element.")return Falsedef scroll_to_bottom(self, element=None, steps=10, delay=0.5):self._print_debug("Scrolling to bottom...")target = "document.body"target_el_for_js = Noneif element:target = "arguments[0]"target_el_for_js = elementtry:last_height_script = f"return {target}.scrollHeight"scroll_script = f"{target}.scrollTop = {target}.scrollHeight;"for _ in range(steps):if target_el_for_js:last_height = self.driver.execute_script(last_height_script, target_el_for_js)self.driver.execute_script(scroll_script, target_el_for_js)else:last_height = self.driver.execute_script(last_height_script)self.driver.execute_script(scroll_script)time.sleep(delay)if target_el_for_js:new_height = self.driver.execute_script(last_height_script, target_el_for_js)else:new_height = self.driver.execute_script(last_height_script)if new_height == last_height: # Reached bottom or no more content loadedbreakself._print_debug("Scrolled to bottom (or no more content loaded).")return Trueexcept Exception as e:self._print_debug(f"Error scrolling to bottom: {e}")return False# --- Pagination Methods ---def next_page(self, selector=None, method="click", url_template=None, page_param="page", next_page_func=None):self._print_debug(f"Attempting to go to next page using method: {method}")if method == "click":if not selector:self._print_debug("Selector for next page button is required for 'click' method.")return Falsenext_button = self.wait_for_element(selector, condition="clickable")if next_button:return self.click(element=next_button)else:self._print_debug("Next page button not found or not clickable.")return Falseelif method == "url":if not url_template:self._print_debug("URL template is required for 'url' method.")return Falseself.current_page_num += 1next_url = url_template.replace(f"{{{page_param}}}", str(self.current_page_num))return self.open_url(next_url)elif method == "function":if not callable(next_page_func):self._print_debug("A callable function is required for 'function' method.")return Falsetry:return next_page_func(self) # Pass scraper instance to the custom functionexcept Exception as e:self._print_debug(f"Custom next_page_func failed: {e}")return Falseelse:self._print_debug(f"Invalid pagination method: {method}")return Falsedef has_next_page(self, selector=None, check_func=None):self._print_debug("Checking for next page...")if callable(check_func):try:return check_func(self)except Exception as e:self._print_debug(f"Custom check_func for has_next_page failed: {e}")return Falseelif selector:# Check if element is present and often, if it's not disabledelement = self.find_element(selector)if element:is_disabled = element.get_attribute("disabled")class_attr = element.get_attribute("class")# Common patterns for disabled buttonsif is_disabled or (class_attr and ("disabled" in class_attr or "inactive" in class_attr)):self._print_debug("Next page element found but appears disabled.")return Falsereturn Truereturn Falseself._print_debug("No selector or check_func provided for has_next_page.")return False # Default to no next page if insufficient infodef set_page(self, page_num, url_template=None, page_param="page"):if not url_template:self._print_debug("URL template is required for set_page.")return Falseself._print_debug(f"Setting page to: {page_num}")self.current_page_num = page_numtarget_url = url_template.replace(f"{{{page_param}}}", str(page_num))return self.open_url(target_url)# --- Data Extraction Methods ---def get_text(self, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.find_element(selector, by, timeout)if element:try:text = element.textself._print_debug(f"Extracted text: '{text[:50]}...' from element (selector: {selector})")return textexcept Exception as e:self._print_debug(f"Error getting text: {e}")return Noneself._print_debug("Element not provided or not found for get_text.")return Nonedef get_attribute(self, attribute, element=None, selector=None, by="css", timeout=None):if not element and selector:element = self.find_element(selector, by, timeout)if element:try:value = element.get_attribute(attribute)self._print_debug(f"Extracted attribute '{attribute}': '{value}' from element (selector: {selector})")return valueexcept Exception as e:self._print_debug(f"Error getting attribute '{attribute}': {e}")return Noneself._print_debug("Element not provided or not found for get_attribute.")return Nonedef extract_data(self, template):"""Extracts data based on a template.Template format: {"field_name": "css_selector" or ("css_selector", "attribute_name") or callable}If callable, it receives the scraper instance (self) and the parent_element (if any).To extract multiple items (e.g., a list), the selector should point to the parent of those items,and the callable should handle finding and processing sub-elements.Or, the template value can be a list of sub-templates for structured data.For simplicity here, we assume template values are selectors for single items,or callables for custom logic."""self._print_debug(f"Extracting data with template: {template}")extracted_data = {}for field_name, rule in template.items():value = Nonetry:if isinstance(rule, str): # Simple CSS selector for textvalue = self.get_text(selector=rule)elif isinstance(rule, tuple) and len(rule) == 2: # (selector, attribute)value = self.get_attribute(selector=rule[0], attribute=rule[1])elif callable(rule): # Custom extraction functionvalue = rule(self) # Pass scraper instanceelse:self._print_debug(f"Invalid rule for field '{field_name}': {rule}")extracted_data[field_name] = valueexcept Exception as e:self._print_debug(f"Error extracting field '{field_name}' with rule '{rule}': {e}")extracted_data[field_name] = Nonereturn extracted_data# --- DevTools Methods (Limited by standard Selenium) ---def start_capturing_network(self):"""Clears previously captured network requests (from JS).Actual continuous network capture requires selenium-wire or browser's DevTools Protocol."""self._print_debug("Starting network capture (clearing previous JS logs).")self._network_requests_raw = []# Note: This doesn't actively "start" a capture process in the browser's network panel.# It just prepares our internal list for new entries gathered by get_captured_requests.def stop_capturing_network(self):"""Conceptually stops. With JS method, it means new calls to get_captured_requestswill include data up to this point, but nothing explicitly 'stops' in the browser."""self._print_debug("Stopping network capture (conceptual for JS method).")# No direct action for JS based capture, it's always available.def get_captured_requests(self, filter_type=None, url_pattern=None):"""Gets network requests using JavaScript performance API. This is a snapshot.filter_type: e.g., 'script', 'img', 'css', 'xmlhttprequest', 'fetch'url_pattern: Regex string to filter URLs."""self._print_debug("Getting captured network requests via JavaScript Performance API.")try:# Get all resource timing entriescurrent_entries = self.driver.execute_script("return window.performance.getEntriesByType('resource');")if isinstance(current_entries, list):self._network_requests_raw.extend(current_entries) # Append new ones# Deduplicate based on 'name' (URL) and 'startTime' to keep it somewhat manageableseen = set()deduplicated_requests = []for entry in sorted(self._network_requests_raw, key=lambda x: x.get('startTime', 0)):identifier = (entry.get('name'), entry.get('startTime'))if identifier not in seen:deduplicated_requests.append(entry)seen.add(identifier)self._network_requests_raw = deduplicated_requestsfiltered_requests = []for req in self._network_requests_raw:# req is a dictionary like:# {'name': url, 'entryType': 'resource', 'startTime': 123.45, 'duration': 67.89,# 'initiatorType': 'script'/'img'/'css'/'link'/'xmlhttprequest', etc.}if filter_type:# initiatorType is more reliable for filtering than entryType (always 'resource')initiator = req.get('initiatorType', '').lower()if filter_type.lower() == "xhr": # Common aliasif initiator != 'xmlhttprequest': continueelif filter_type.lower() not in initiator:continueif url_pattern:if not re.search(url_pattern, req.get('name', '')):continuefiltered_requests.append(req)self._print_debug(f"Found {len(filtered_requests)} filtered network requests.")return filtered_requestsexcept WebDriverException as e:self._print_debug(f"Error getting network requests via JS: {e}")return []def add_request_interceptor(self, pattern, handler_func):"""NOTE: True request interception is NOT reliably possible with standard Selenium.This requires tools like SeleniumWire or direct DevTools Protocol interaction,which are more complex to set up and manage.This method is a placeholder to acknowledge the design spec."""self._print_debug("WARNING: add_request_interceptor is not implemented with standard Selenium. ""Consider using SeleniumWire for this functionality.")# To make it "runnable" without error, return a dummy IDreturn f"dummy_interceptor_id_{pattern}"# --- Auxiliary Methods ---def wait_for_element(self, selector, by="css", timeout=None, condition="visible"):wait_timeout = timeout if timeout is not None else self.timeoutself._print_debug(f"Waiting for element by {by}: '{selector}' to be {condition} (timeout: {wait_timeout}s)")try:wait = WebDriverWait(self.driver, wait_timeout)sel_by = self._get_selenium_by(by)if condition == "visible":element = wait.until(EC.visibility_of_element_located((sel_by, selector)))elif condition == "present":element = wait.until(EC.presence_of_element_located((sel_by, selector)))elif condition == "clickable":element = wait.until(EC.element_to_be_clickable((sel_by, selector)))elif condition == "invisible":# Returns True if invisible, or an element if it becomes invisible (less common use)# For our purpose, we want the element if it exists and is invisible, or None if it becomes visible/not found# This is tricky. A simpler approach is to check if it's NOT visible.# Let's wait for presence, then check visibility.present_element = wait.until(EC.presence_of_element_located((sel_by, selector)))if not present_element.is_displayed():element = present_elementelse: # Element is present AND visible, so condition "invisible" is falseraise TimeoutException(f"Element '{selector}' was visible, not invisible.")elif condition == "not_present":# Returns True if element is not present, or raises TimeoutException# This doesn't return the element. We signal success by returning a dummy True# or failure by returning None after timeout.if wait.until(EC.invisibility_of_element_located((sel_by, selector))): # Waits for staleness or non-presenceself._print_debug(f"Element by {by}: '{selector}' confirmed not present or invisible.")return True # Indicates success for this condition, though no element is returnedelse: # Should not happen if invisibility_of_element_located works as expectedreturn Noneelse:raise ValueError(f"Unsupported condition: {condition}")return elementexcept TimeoutException:self._print_debug(f"Element by {by}: '{selector}' did not meet condition '{condition}' within {wait_timeout}s.")return Noneexcept Exception as e:self._print_debug(f"Error waiting for element '{selector}' condition '{condition}': {e}")return Nonedef execute_script(self, script, *args):self._print_debug(f"Executing script: {script[:100]}...")try:return self.driver.execute_script(script, *args)except WebDriverException as e:self._print_debug(f"Error executing script: {e}")return Nonedef set_delay(self, min_delay, max_delay=None):self._print_debug(f"Setting delay: min={min_delay}, max={max_delay}")self._min_delay = min_delayself._max_delay = max_delay if max_delay is not None else min_delaydef take_screenshot(self, path=None):self._print_debug(f"Taking screenshot. Path: {path if path else 'Return as PNG data'}")try:if path:return self.driver.save_screenshot(path) # Returns True on successelse:return self.driver.get_screenshot_as_png() # Returns binary dataexcept WebDriverException as e:self._print_debug(f"Error taking screenshot: {e}")return None if path is None else Falsedef __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.close()
11. 案例应用部分
python
from WebScraper import WebScraper
import time
import jsondef main():# 初始化WebScraper实例(非无头模式,便于观察)scraper = WebScraper(browser_type="chrome",headless=False,timeout=15,debug=True)try:# 1. 打开百度首页baidu_url = "https://www.baidu.com"print("正在打开百度首页...")if not scraper.open_url(baidu_url):print("百度首页打开失败!")return# 2. 输入搜索关键词并执行搜索search_keyword = "人工智能发展趋势" # 可以修改为任意搜索关键词print(f"正在搜索: {search_keyword}")# 定位搜索框并输入关键词search_input = scraper.find_element(selector="#kw", by="css")if not search_input:print("未找到搜索框元素!")returnscraper.type_text(text=search_keyword, element=search_input)# 点击搜索按钮if not scraper.click(selector="#su", by="css"):print("点击搜索按钮失败!")return# 等待搜索结果加载time.sleep(2)print("搜索结果加载中...")# 3. 滚动到页面底部print("正在滚动到页面底部...")if scraper.scroll_to_bottom(steps=10, delay=0.1):print("已滚动到页面底部")else:print("滚动到页面底部失败")# 4. 提取当前页面的搜索结果标题print("正在提取当前页面的搜索结果...")result_titles = scraper.find_elements(selector="h3.t a", by="css")if result_titles:print(f"找到 {len(result_titles)} 个搜索结果标题:")for i, title in enumerate(result_titles, 1):title_text = title.textprint(f"{i}. {title_text}")else:print("未找到搜索结果标题")# 5. 翻页获取更多搜索结果(演示前3页)for page in range(2, 4):print(f"\n正在翻到第 {page} 页...")# 方法1:使用next_page方法点击下一页按钮next_button_selector = ".n" # 百度下一页按钮的CSS选择器if scraper.next_page(selector=next_button_selector, method="click"):print(f"已翻到第 {page} 页,等待加载...")time.sleep(2)# 滚动到新页面底部scraper.scroll_to_bottom(steps=10, delay=0.3)time.sleep(1)# 提取新页面的搜索结果result_titles = scraper.find_elements(selector="h3.t a", by="css")if result_titles:print(f"第 {page} 页找到 {len(result_titles)} 个搜索结果标题:")for i, title in enumerate(result_titles, 1):title_text = title.textprint(f"{i}. {title_text}")else:print(f"第 {page} 页未找到搜索结果标题")else:print(f"翻到第 {page} 页失败,可能已到最后一页")break# 6. 使用extract_data方法提取结构化数据print("\n使用extract_data方法提取结构化数据:")data_template = {"搜索关键词": search_keyword,"当前时间": lambda scraper: time.strftime("%Y-%m-%d %H:%M:%S"),"页面标题": "title","当前URL": ("", "href"), # 使用空选择器获取当前URL"搜索结果数量": lambda scraper: len(scraper.find_elements("h3.t a"))}extracted_data = scraper.extract_data(data_template)for key, value in extracted_data.items():print(f"{key}: {value}")# 7. 保存数据到JSON文件with open("baidu_search_results.json", "w", encoding="utf-8") as f:json.dump(extracted_data, f, ensure_ascii=False, indent=2)print("\n数据已保存到 baidu_search_results.json")except Exception as e:print(f"操作过程中发生错误: {e}")finally:# 8. 关闭浏览器scraper.close()print("浏览器已关闭")if __name__ == "__main__":main()