当前位置: 首页 > news >正文

Scrapy:任务队列底层设计详解

Scrapy 中队列设计详解

在这里插入图片描述

1. 概述

Scrapy 的队列系统是其调度器(Scheduler)的核心组件之一,负责存储和管理待抓取的请求。Scrapy 实现了两种类型的队列:

  • 内存队列:请求存储在内存中,重启后数据丢失
  • 磁盘队列:请求序列化到磁盘,支持持久化存储
    文件路径:/scrapy/squeues.py
    设计模式:装饰器模式与Python中的装饰器

2. 队列的基本架构

2.1 队列类型

Scrapy 在 squeues.py 中定义了以下几种队列:

  1. 磁盘队列(使用 pickle 序列化):

    • PickleFifoDiskQueue: 先进先出磁盘队列
    • PickleLifoDiskQueue: 后进先出磁盘队列(默认的磁盘队列类)
  2. 磁盘队列(使用 marshal 序列化):

    • MarshalFifoDiskQueue: 先进先出磁盘队列
    • MarshalLifoDiskQueue: 后进先出磁盘队列
  3. 内存队列:

    • FifoMemoryQueue: 先进先出内存队列
    • LifoMemoryQueue: 后进先出内存队列(默认的内存队列类)

2.2 核心设计模式

Scrapy 的队列设计采用了装饰器模式,主要包含以下几个层次:

  1. 基础队列装饰:
def _with_mkdir(queue_class):
    class DirectoriesCreated(queue_class):
        def __init__(self, path, *args, **kwargs):
            dirname = Path(path).parent
            if not dirname.exists():
                dirname.mkdir(parents=True, exist_ok=True)
            super().__init__(path, *args, **kwargs)
    return DirectoriesCreated
  1. 序列化装饰:
def _serializable_queue(queue_class, serialize, deserialize):
    class SerializableQueue(queue_class):
        def push(self, obj):
            s = serialize(obj)
            super().push(s)

        def pop(self):
            s = super().pop()
            if s:
                return deserialize(s)
            return None

        def peek(self):
            try:
                s = super().peek()
            except AttributeError as ex:
                raise NotImplementedError(
                    "The underlying queue class does not implement 'peek'"
                ) from ex
            if s:
                return deserialize(s)
            return None
    return SerializableQueue

3. 序列化实现

3.1 Pickle 序列化队列

Scrapy 使用 pickle 作为默认的序列化方案,实现如下:

def _pickle_serialize(obj):
    try:
        return pickle.dumps(obj, protocol=4)
    except (pickle.PicklingError, AttributeError, TypeError) as e:
        raise ValueError(str(e)) from e

# 创建 pickle 序列化的磁盘队列
_PickleFifoSerializationDiskQueue = _serializable_queue(
    _with_mkdir(queue.FifoDiskQueue), 
    _pickle_serialize, 
    pickle.loads
)

_PickleLifoSerializationDiskQueue = _serializable_queue(
    _with_mkdir(queue.LifoDiskQueue), 
    _pickle_serialize, 
    pickle.loads
)

3.2 Marshal 序列化队列

Marshal 作为备选的序列化方案,实现如下:

# 创建 marshal 序列化的磁盘队列
_MarshalFifoSerializationDiskQueue = _serializable_queue(
    _with_mkdir(queue.FifoDiskQueue), 
    marshal.dumps, 
    marshal.loads
)

_MarshalLifoSerializationDiskQueue = _serializable_queue(
    _with_mkdir(queue.LifoDiskQueue), 
    marshal.dumps, 
    marshal.loads
)

4. 序列化方案对比

4.1 Pickle vs Marshal 在 Scrapy 中的应用

  1. Pickle 的优势:

    • 支持更多的 Python 数据类型
    • 可以序列化自定义类和对象
    • 支持对象引用和循环引用
    • 版本兼容性更好
  2. Marshal 的优势:

    • 序列化速度更快
    • 内存占用更少
    • 实现更简单,安全性较高

4.2 为什么提供两种序列化方案?

  1. 性能考虑:

    • Marshal 序列化速度快,适合简单数据结构
    • Pickle 功能更强大,适合复杂对象
  2. 兼容性考虑:

    • Pickle 支持自定义类的序列化
    • Marshal 只支持基本数据类型
  3. 安全性考虑:

    • Marshal 更安全,不支持代码执行
    • Pickle 需要注意安全风险

5. 队列的实际应用

5.1 在调度器中的使用

# 在 settings.py 中配置队列类型
SCHEDULER_DISK_QUEUE = "scrapy.squeues.PickleLifoDiskQueue"
SCHEDULER_MEMORY_QUEUE = "scrapy.squeues.LifoMemoryQueue"

5.2 队列选择策略

  1. 请求入队逻辑:
def enqueue_request(self, request):
    if not request.dont_filter and self.df.request_seen(request):
        return False
    dqok = self._dqpush(request)  # 优先尝试放入磁盘队列
    if dqok:
        self.stats.inc_value("scheduler/enqueued/disk")
    else:
        self._mqpush(request)  # 磁盘队列失败则放入内存队列
        self.stats.inc_value("scheduler/enqueued/memory")
    return True
  1. 请求出队逻辑:
def next_request(self):
    request = self.mqs.pop()  # 优先从内存队列获取
    if request:
        self.stats.inc_value("scheduler/dequeued/memory")
    else:
        request = self._dqpop()  # 内存队列为空则从磁盘队列获取
        if request:
            self.stats.inc_value("scheduler/dequeued/disk")
    return request

6. 最佳实践

  1. 队列类型选择:

    • 小规模爬虫:使用内存队列即可
    • 大规模爬虫:使用磁盘队列保证可靠性
    • 需要暂停恢复:必须使用磁盘队列
  2. 序列化方案选择:

    • 简单请求:可以使用 Marshal 序列化
    • 复杂请求:建议使用 Pickle 序列化
    • 注重性能:选择 Marshal
    • 注重功能:选择 Pickle
  3. 性能优化:

    • 合理设置并发数
    • 适当调整队列类型
    • 根据数据复杂度选择序列化方案

7. 注意事项

  1. 安全性:

    • 使用 Pickle 时注意反序列化安全
    • 不要处理不信任来源的序列化数据
  2. 性能:

    • 磁盘队列会带来 I/O 开销
    • 序列化/反序列化会消耗 CPU
  3. 可靠性:

    • 定期备份磁盘队列数据
    • 处理序列化异常
    • 考虑磁盘空间限制

相关文章:

  • JSON入门
  • 配置mysql8.0主从同步,并使用PXC实现高可用
  • 【Linux】Socket编程—TCP
  • OpenEuler学习笔记(三十一):在OpenEuler上搭建仓颉语言开发环境
  • 探索后端开发中的异步API:基于Resilience4j与Reactive Programming的高性能设计
  • eval 内置函数用法
  • 三角拓扑聚合优化器TTAO-Transformer-BiLSTM多变量回归预测(Maltab)
  • SQL-leetcode—1581. 进店却未进行过交易的顾客
  • 怎么才能DeepSeek批量写作和内容导出?
  • WebSocket 握手过程
  • 【实战篇】室内设计师如何利用 DeepSeek 做设计:从灵感到落地的全方位指南
  • 软件可靠性基础知识
  • 机器学习 - 贪心算法、前向搜索、后向搜索
  • NPDP学习笔记 -产品经理(第二版)-第七章 产品创新管理
  • openGauss 3.0 数据库在线实训课程12: 学习逻辑结构:模式管理
  • 架构设计系列(二):CI/CD
  • 五、AIGC大模型_02大模型学习重点
  • 基于若依开发的工程项目管系统开源免费,用于工程项目投标、进度及成本管理的OA 办公开源系统,非常出色!
  • 基于 PyTorch 的树叶分类任务:从数据准备到模型训练与测试
  • Linux 内核 IPoIB 驱动中 sysfs 属性冲突问题的分析与解决
  • 魔都眼|买买买,老铺黄金新店开业被挤爆:有人排队5小时
  • 美国季度GDP时隔三年再现负增长,特朗普政府关税政策对美国经济负面影响或将持续
  • 空调+零食助顶级赛马备战,上海环球马术冠军赛将焕新登场
  • 航海王亚洲巡展、工厂店直销……上海多区推出“五五购物节”活动
  • 龚惠民已任江西省司法厅党组书记
  • 中行一季度净赚超543亿降2.9%,利息净收入降逾4%