Python爬虫框架 - 实际项目(拿到可以直接用)
项目目标:
设计一个功能完善的爬虫框架,能够实现以下功能:
- 任务管理:支持多任务并发抓取
 - 请求发送:支持HTTP请求发送和代理管理
 - 页面解析:支持多种网页解析方式
 - 数据存储:支持多种数据存储方式
 - 结果展示:支持多种结果展示方式
 
框架设计思路:
- 模块化设计:将爬虫框架拆分为多个功能模块,每个模块负责不同的任务。
 - 可扩展性:框架设计支持功能模块的扩展和替换。
 - 异常处理:在每个模块中加入异常处理机制,确保爬虫稳定运行。
 - 日志记录:记录爬虫运行过程中的关键信息,便于排查问题。
 
代码分析:
我们将分为以下几个模块进行讲解:
- 任务管理模块
 - 请求发送模块
 - 页面解析模块
 - 数据存储模块
 - 结果展示模块
 
案例项目实现
1. 任务管理模块 (task_manager.py)
 
from queue import Queue
import threading
class TaskQueue:
    def __init__(self):
        self.queue = Queue()
        self.lock = threading.Lock()
        self.tasks = []
    def add_task(self, task):
        """添加任务"""
        with self.lock:
            self.tasks.append(task)
            self.queue.put(task)
    def get_task(self):
        """获取任务"""
        return self.queue.get()
    def task_done(self):
        """标记任务完成"""
        self.queue.task_done()
    def get_remaining_tasks(self):
        """获取剩余任务数"""
        return self.queue.qsize()
    def get_total_tasks(self):
        """获取总任务数"""
        return len(self.tasks)
    def get_completed_tasks(self):
        """获取已完成任务数"""
        return self.get_total_tasks() - self.get_remaining_tasks()
 
代码分析:
- 使用Python自带的
queue.Queue实现任务队列。 - 使用
threading.Lock实现线程安全。 - 提供了任务的添加、获取、完成以及统计功能。
 
2. 请求发送模块 (request_sender.py)
 
import requests
from requests.exceptions import RequestException
import time
class RequestSender:
    def __init__(self, headers=None, proxies=None):
        self.session = requests.Session()
        self.headers = headers
        self.proxies = proxies
    def send_request(self, url, method='GET', timeout=10):
        """发送HTTP请求"""
        try:
            response = self.session.request(
                method=method,
                url=url,
                headers=self.headers,
                proxies=self.proxies,
                timeout=timeout
            )
            response.raise_for_status()
            return response
        except RequestException as e:
            print(f"请求失败: {e}")
            return None
    def set_headers(self, headers):
        """设置请求头"""
        self.headers = headers
    def set_proxies(self, proxies):
        """设置代理"""
        self.proxies = proxies
    def random_delay(self, min_delay=1, max_delay=3):
        """随机延迟"""
        delay = (max_delay - min_delay) * random.random() + min_delay
        time.sleep(delay)
 
代码分析:
- 使用
requests.Session()保持会话状态。 - 提供了请求头和代理设置功能。
 - 集成了随机延迟功能,防止被目标网站封IP。
 
3. 页面解析模块 (page_parser.py)
 
from bs4 import BeautifulSoup
import re
class PageParser:
    def __init__(self, content, parser='html.parser'):
        self.soup = BeautifulSoup(content, parser)
    def extract_links(self, base_url=None):
        """提取页面中的所有链接"""
        links = []
        for link in self.soup.find_all('a', href=True):
            url = link['href']
            if base_url and not url.startswith('http'):
                url = base_url + url
            links.append(url)
        return links
    def extract_content(self, selector):
        """提取指定内容"""
        elements = self.soup.select(selector)
        return [element.get_text() for element in elements]
    def extract_images(self):
        """提取页面中的所有图片链接"""
        images = []
        for img in self.soup.find_all('img', src=True):
            images.append(img['src'])
        return images
    def find_all(self, name=None, attrs={}, recursive=True, text=None):
        """查找所有匹配的标签"""
        return self.soup.find_all(name, attrs, recursive, text)
 
代码分析:
- 使用
BeautifulSoup进行HTML解析。 - 提供了提取链接、内容、图片等功能。
 - 支持自定义选择器进行内容提取。
 
4. 数据存储模块 (data_storage.py)
 
import sqlite3
class DataStorage:
    def __init__(self, db_name='spider.db'):
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
    def create_table(self, table_name, columns):
        """创建数据表"""
        columns_str = ', '.join([f'{col} {typ}' for col, typ in columns.items()])
        self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})')
        self.conn.commit()
    def insert_data(self, table_name, data):
        """插入数据"""
        placeholders = ', '.join(['?'] * len(data))
        columns = ', '.join(data.keys())
        values = tuple(data.values())
        self.cursor.execute(f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})', values)
        self.conn.commit()
    def query_data(self, table_name, columns='*', condition=None):
        """查询数据"""
        query = f'SELECT {columns} FROM {table_name}'
        if condition:
            query += f' WHERE {condition}'
        self.cursor.execute(query)
        return self.cursor.fetchall()
    def close(self):
        """关闭数据库连接"""
        self.conn.close()
 
代码分析:
- 使用
sqlite3进行数据存储。 - 提供了创建数据表、插入数据、查询数据功能。
 - 支持自定义表结构和查询条件。
 
5. 结果展示模块 (result_viewer.py)
 
from prettytable import PrettyTable
class ResultViewer:
    def __init__(self):
        self.table = PrettyTable()
    def display_data(self, data, fields=None):
        """显示数据"""
        if fields:
            self.table.field_names = fields
        for row in data:
            self.table.add_row(row)
        print(self.table)
    def display_statistics(self, stats):
        """显示统计信息"""
        print("\n爬虫统计信息:")
        for key, value in stats.items():
            print(f"{key}: {value}")
 
代码分析:
- 使用
PrettyTable库以表格形式展示数据。 - 提供了展示数据和统计信息的功能。
 
案例项目主程序 (main.py)
 
import random
from task_manager import TaskQueue
from request_sender import RequestSender
from page_parser import PageParser
from data_storage import DataStorage
from result_viewer import ResultViewer
class NewsSpider:
    def __init__(self, base_url, max_threads=5):
        self.base_url = base_url
        self.task_queue = TaskQueue()
        self.request_sender = RequestSender()
        self.data_storage = DataStorage()
        self.result_viewer = ResultViewer()
        self.max_threads = max_threads
        self.running_threads = 0
        self.total_tasks = 0
        self.completed_tasks = 0
    def start(self):
        """启动爬虫"""
        self.init_db()
        self.seed_tasks()
        self.start_threads()
    def init_db(self):
        """初始化数据库"""
        columns = {
            'id': 'INTEGER PRIMARY KEY AUTOINCREMENT',
            'title': 'TEXT',
            'content': 'TEXT',
            'publish_date': 'TEXT',
            'url': 'TEXT'
        }
        self.data_storage.create_table('news', columns)
    def seed_tasks(self):
        """初始化任务"""
        self.task_queue.add_task(self.base_url)
        self.total_tasks = self.task_queue.get_total_tasks()
    def start_threads(self):
        """启动线程"""
        for _ in range(self.max_threads):
            thread = threading.Thread(target=self.run)
            thread.start()
    def run(self):
        """爬虫执行逻辑"""
        while not self.task_queue.empty():
            self.running_threads += 1
            url = self.task_queue.get_task()
            self.crawl(url)
            self.task_queue.task_done()
            self.running_threads -= 1
    def crawl(self, url):
        """抓取逻辑"""
        self.request_sender.random_delay()
        response = self.request_sender.send_request(url)
        if not response:
            return
        parser = PageParser(response.text)
        news_list = parser.extract_content('.news-item')
        for news in news_list:
            data = {
                'title': news['title'],
                'content': news['content'],
                'publish_date': news['publish_date'],
                'url': url
            }
            self.data_storage.insert_data('news', data)
        links = parser.extract_links(self.base_url)
        for link in links:
            self.task_queue.add_task(link)
    def get_statistics(self):
        """获取爬虫统计信息"""
        return {
            '总任务数': self.task_queue.get_total_tasks(),
            '剩余任务数': self.task_queue.get_remaining_tasks(),
            '已完成任务数': self.task_queue.get_completed_tasks(),
            '运行线程数': self.running_threads
        }
    def display_results(self):
        """展示结果"""
        stats = self.get_statistics()
        self.result_viewer.display_statistics(stats)
        data = self.data_storage.query_data('news', '*', 'id <= 10')
        self.result_viewer.display_data(data, ['标题', '内容', '发布时间', 'URL'])
        self.data_storage.close()
if __name__ == '__main__':
    base_url = 'http://example.com/news'
    spider = NewsSpider(base_url, max_threads=5)
    spider.start()
    spider.display_results()
 
代码分析:
- 实现了一个完整的新闻爬虫逻辑。
 - 支持多线程并发抓取。
 - 数据存储在SQLite数据库中。
 - 结果以表格形式展示。
 
