当前位置: 首页 > news >正文

Python - 爬虫;Scrapy框架之items,Pipeline管道持久化存储(二)

阅读本文前先参考

https://blog.csdn.net/MinggeQingchun/article/details/145904572

1、基于管道的持久化存储,主要依靠scrapy框架的item.pypipelines.py文件

item.py:数据结构模板文件,定义数据属性

pipelines.py:管道文件,接收数据(item),进行持久化操作

2、爬虫文件、items文件、pipelines文件三者关系:

(1)首先,spider 爬取了数据,过滤后 写入到items中
(item是scrapy中,连结spider和pipeline的桥梁,即items是 定义数据结构 )
(2)其次,再通过yield返回爬取数据给核心引擎并 交付给pipeline
(3)最后,pipeline通过 调用items获取数据,再 写入指定文件或通过数据库连接存入数据库

3、整个流程:

(1)爬虫文件爬取到数据后,把数据赋给item对象

(2)使用yield关键字将item对象提交给pipelines管道

(3)在管道文件中的process_item方法接收item对象,然后把item对象存储

(4)在setting中开启管道

官网文档:Item Pipeline — Scrapy 2.12.0 documentation

以爬虫迁木网 专注留学DIY-只做精细内容-迁木网 为例

1、 items文件(定义数据结构)定义数据属性,在item.py文件中编写数据的属性

# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.htmlimport scrapy'''
迁木网大学信息的字段
类似Java中的Model
'''
class QianmuUniversityItem(scrapy.Item):# define the fields for your item here like:name = scrapy.Field()runk = scrapy.Field()country = scrapy.Field()state = scrapy.Field()city = scrapy.Field()undergraduate_num = scrapy.Field()postgraduate_num = scrapy.Field()website = scrapy.Field()

2、爬虫文件(爬取数据),项目.spider爬虫程序

import scrapyfrom 爬虫.Scrapy框架.qianmu.qianmu.items import QianmuUniversityItem'''
settings.py中有一个HTTPCACHE_ENABLED配置
# 启用HTTP请求缓存,下次请求同一URL时不再发送远程请求
HTTPCACHE_ENABLED = True
'''class UsnewsSpider(scrapy.Spider):name = "usnews"# 允许爬虫的URL必须在此字段内;genspider时可以指定;如qianmu.com意味www.qianmu.com和http.qianmu.org下的链接都可以爬取allowed_domains = ["www.qianmu.org"]# 爬虫的入口地址,可以多个start_urls = ["http://www.qianmu.org/2023USNEWS%E4%B8%96%E7%95%8C%E5%A4%A7%E5%AD%A6%E6%8E%92%E5%90%8D"]# 框架请求start_urls成功后,会调用parse方法def parse(self, response):# 提取链接,并释放links = response.xpath('//div[@id="content"]//tr[position()>1]/td/a/@href').extract()for link in links:if not link.startswith('http://www.qianmu.org'):link = 'http://www.qianmu.org/%s' % link# 请求成功以后,异步调用callback函数yield response.follow(link, self.parse_university)def parse_university(self, response):# 去除空格等特殊字符response = response.replace(body=response.text.replace('\t','').replace('\r\n', ''))item = QianmuUniversityItem()data = {}item['name'] = response.xpath('//div[@id="wikiContent"]/h1/text()').extract_first()# data['name'] = response.xpath('//div[@id="wikiContent"]/h1/text()')[0]table = response.xpath('//div[@id="wikiContent"]//div[@class="infobox"]/table')if not table:return Nonetable = table[0]keys = table.xpath('.//td[1]/p/text()').extract()cols = table.xpath('.//td[2]')# 当我们确定解析的数据只有1个结果时,可以使用extract_first()函数values = [' '.join(col.xpath('.//text()').extract()) for col in cols]if len(keys) == len(values):data.update(zip(keys, values))item['runk'] = data.get('排名')item['country'] = data.get('国家')item['state'] = data.get('州省')item['city'] = data.get('城市')item['undergraduate_num']= data.get('本科生人数')item['postgraduate_num']= data.get('研究生人数')item['website'] = data.get('网址')# yield出去的数据会被Scrapy框架接收,进行下一步处理;如果没有任何处理,则会打印到控制台yield item

此时运行爬虫文件,可以看到数据格式输出如下 

3、Pipelines文件(保存数据); pipelines.py文件代码的实现,也就是真正的存储过程

(1)MySQL存储

建一个大学表【universities】

CREATE TABLE `universities` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '学校名称',
`rank` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '学校排名',
`country` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '国家',
`state` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '州省',
`city` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '城市',
`undergraduate_num` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '本科生人数',
`postgraduate_num` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '研究生人数',
`website` text COMMENT '网站地址',
PRIMARY KEY (`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='大学信息表'
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
import redis
# useful for handling different item types with a single interface
from itemadapter import ItemAdapterimport pymysql
from redis import Redis
from scrapy.exceptions import DropItemfrom 爬虫.抓取网页数据.urllib_test import value'''
开启settings配置文件中ITEM_PIPELINES
在Scrapy框架中,ITEM_PIPELINES配置项用于定义数据处理的顺序。来指定哪些管道(pipelines)将被启用以及它们的执行顺序。
每个管道都是一个元组(tuple),包含两个元素:管道的路径和它的优先级(整数)。优先级越高,管道执行的越早。自定义的优先级设置为300以上
ITEM_PIPELINES = {"qianmu.pipelines.MysqlPipeline": 300,
}
'''
class MysqlPipeline(object):def open_spider(self,spider):self.conn = pymysql.connect(host='localhost',port=3306,user='root',password='ming1992',database='my_db',charset='utf8')self.cursor = self.conn.cursor()def close_spider(self,spider):self.cursor.close()self.conn.close()def process_item(self, item, spider):# keys = item.keys()# values = [item[k] for k in keys]# MySQL 不接受 None 作为值,应该将其转换为 NULL 或者跳过这些字段# 处理None值:如果某些字段允许为NULL,可以在过滤时保留这些字段,并将None映射为NULL# 过滤掉值为 None 的字段,或者将 None 转换为 NULLprint('处理前:',item.items())filtered_item = {k: ('NULL' if v is None else v) for k, v in item.items()}print('处理后:', filtered_item)keys, values = zip(*filtered_item.items())print(f'处理keys:{keys};values:{values}')sql = "insert into {} ({}) values ({})".format('universities',','.join(keys),','.join(['%s'] * len(values)))print('执行SQL语句', sql)try:self.cursor.execute(sql, values)self.conn.commit()except pymysql.MySQLError as e:print(f"Error inserting data into MySQL: {e}")print('SQL语句',self.cursor._last_executed)return item

(2)Redis中存储 

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
import redis
# useful for handling different item types with a single interface
from itemadapter import ItemAdapterimport pymysql
from redis import Redis
from scrapy.exceptions import DropItemfrom 爬虫.抓取网页数据.urllib_test import value'''
开启settings配置文件中ITEM_PIPELINES
在Scrapy框架中,ITEM_PIPELINES配置项用于定义数据处理的顺序。来指定哪些管道(pipelines)将被启用以及它们的执行顺序。
每个管道都是一个元组(tuple),包含两个元素:管道的路径和它的优先级(整数)。优先级越高,管道执行的越早。自定义的优先级设置为300以上
ITEM_PIPELINES = {"qianmu.pipelines.MysqlPipeline": 300,
}
'''
class RedisPipeline(object):def open_spider(self,spider):self.redisPool = redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)self.redis = redis.Redis(connection_pool=self.redisPool)def close_spider(self,spider):self.redis.close()self.redisPool.close()def process_item(self, item, spider):if self.redis.sadd(spider.name,item['name']):return itemreturn DropItemclass MysqlPipeline(object):def open_spider(self,spider):self.conn = pymysql.connect(host='localhost',port=3306,user='root',password='ming1992',database='my_db',charset='utf8')self.cursor = self.conn.cursor()def close_spider(self,spider):self.cursor.close()self.conn.close()def process_item(self, item, spider):# keys = item.keys()# values = [item[k] for k in keys]# MySQL 不接受 None 作为值,应该将其转换为 NULL 或者跳过这些字段# 处理None值:如果某些字段允许为NULL,可以在过滤时保留这些字段,并将None映射为NULL# 过滤掉值为 None 的字段,或者将 None 转换为 NULLprint('处理前:',item.items())filtered_item = {k: ('NULL' if v is None else v) for k, v in item.items()}print('处理后:', filtered_item)keys, values = zip(*filtered_item.items())print(f'处理keys:{keys};values:{values}')sql = "insert into {} ({}) values ({})".format('universities',','.join(keys),','.join(['%s'] * len(values)))print('执行SQL语句', sql)try:self.cursor.execute(sql, values)self.conn.commit()except pymysql.MySQLError as e:print(f"Error inserting data into MySQL: {e}")print('SQL语句',self.cursor._last_executed)return item

4、setting配置 

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# 在Scrapy框架中,ITEM_PIPELINES配置项用于定义数据处理的顺序。来指定哪些管道(pipelines)将被启用以及它们的执行顺序。
# 每个管道都是一个元组(tuple),包含两个元素:管道的路径和它的优先级(整数)。优先级越高,管道执行的越早。自定义的优先级设置为300以上
ITEM_PIPELINES = {"qianmu.pipelines.RedisPipeline": 300,"qianmu.pipelines.MysqlPipeline": 301,
}

相关文章:

  • 云计算与大数据进阶 | 26、解锁云架构核心:深度解析可扩展数据库的5大策略与挑战(上)
  • 主流数据库运维故障排查卡片式速查表与视觉图谱
  • 25-05-16计算机网络学习笔记Day1
  • SQLMesh 增量模型从入门到精通:5步实现高效数据处理
  • 基于Linux环境实现Oracle goldengate远程抽取MySQL同步数据到MySQL
  • OceanBase 的系统变量、配置项和用户变量有何差异
  • 捌拾伍- 量子傅里叶变换 (3)
  • 数据结构进阶:AVL树与红黑树
  • C++23:ranges::iota、ranges::shift_left和ranges::shift_right详解
  • JavaScript性能优化实战(10):前端框架性能优化深度解析
  • 嵌入式EasyRTC音视频实时通话SDK在工业制造领域的智能巡检/AR协作等应用
  • 医学影像系统性能优化与调试技术:深度剖析与实践指南
  • sqli-labs靶场29-31关(http参数污染)
  • maven和npm区别是什么
  • CVPR2025 | 首个多光谱无人机单目标跟踪大规模数据集与统一框架, 数据可直接下载
  • 中文分词与数据可视化02
  • k8s监控方案实践补充(二):使用kube-state-metrics获取资源状态指标
  • mac中加载C++动态库文件
  • 6 任务路由与负载均衡
  • Linux进程信号(一)之信号的入门
  • 大环线呼之欲出,“金三角”跑起来了
  • 埃尔多安:愿在土耳其促成俄乌领导人会晤
  • 上海高院与上海妇联签协议,建立反家暴常态化联动协作机制
  • 农行再回应客户办理业务期间离世:亲属连续三次输错密码,理解亲属悲痛,将协助做好善后
  • 工商银行杭州金融研修院原院长蒋伟被“双开”
  • 92岁上海交大退休教师捐赠百万元给学校,其父也曾设奖学金