Python爬虫实战:新闻数据抓取与MongoDB存储全流程
目录
一、环境搭建:工具与库准备
1.1 核心工具安装
1.2 代理池配置(关键反爬措施)
二、爬虫架构设计:模块化实现
2.1 请求发送模块(含反爬策略)
2.2 数据解析模块(BeautifulSoup实战)
2.3 MongoDB存储模块
三、完整爬虫实现(腾讯新闻案例)
3.1 主程序逻辑
3.2 运行效果
四、反爬机制深度解析与应对
4.1 常见反爬手段
4.2 高级反爬突破方案
五、数据质量保障体系
5.1 数据清洗流程
5.2 异常处理机制
六、性能优化实战
6.1 多线程加速方案
6.2 分布式爬虫架构
七、常见问题Q&A
八、进阶技巧:从爬取到分析
8.1 实时分析管道
8.2 自动化报告生成
九、总结与展望
免费python编程教程:https://pan.quark.cn/s/2c17aed36b72
在信息爆炸的时代,新闻数据是分析社会热点、舆情趋势的重要素材。本文将以腾讯新闻网为例,演示如何用Python编写爬虫抓取新闻数据,并存储到MongoDB数据库中。过程中会涉及反爬机制突破、数据清洗、异常处理等关键技术点。

一、环境搭建:工具与库准备
1.1 核心工具安装
- MongoDB:下载社区版安装包,安装时勾选"Install MongoDB as a Service"选项,默认端口27017。
- Python库:通过pip安装爬虫三件套:
pip install requests beautifulsoup4 pymongorequests:发送HTTP请求BeautifulSoup4:解析HTMLpymongo:MongoDB操作接口
1.2 代理池配置(关键反爬措施)
从站大爷IP代理获取住宅代理IP,建立代理池:
# proxy_pool.py
PROXY_LIST = ["123.123.123.123:8080", # 示例IP"124.124.124.124:8081"
]def get_random_proxy():import randomreturn {"http": random.choice(PROXY_LIST)}
二、爬虫架构设计:模块化实现
2.1 请求发送模块(含反爬策略)
import requests
from proxy_pool import get_random_proxy
import time
import randomdef fetch_page(url):headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}max_retries = 3for _ in range(max_retries):try:proxy = get_random_proxy()response = requests.get(url, headers=headers, proxies=proxy, timeout=10)if response.status_code == 200:return response.textelif response.status_code == 403: # IP被封time.sleep(random.uniform(5, 10))continueexcept requests.exceptions.RequestException:time.sleep(random.uniform(2, 5))return None
技术要点:
- 随机User-Agent模拟浏览器
- 失败自动重试机制
- 代理IP轮换策略
- 指数退避算法避免触发频率限制
2.2 数据解析模块(BeautifulSoup实战)
from bs4 import BeautifulSoupdef parse_news(html):soup = BeautifulSoup(html, 'html.parser')news_list = []# 腾讯新闻示例结构(实际需根据网页调整)items = soup.select('.news-item') # CSS选择器for item in items:title = item.select_one('.title a').text.strip()url = item.select_one('.title a')['href']time_str = item.select_one('.time').text.strip()# 数据清洗if "广告" in title or "推广" in title:continuenews_list.append({"title": title,"url": url,"publish_time": time_str,"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")})return news_list
解析技巧:
- 使用浏览器开发者工具定位元素
- 组合CSS选择器精准定位
- 过滤广告等无效数据
- 添加时间戳记录采集时间
2.3 MongoDB存储模块
from pymongo import MongoClientclass MongoDBStorage:def __init__(self, db_name="news_db", collection_name="articles"):self.client = MongoClient('mongodb://localhost:27017/')self.db = self.client[db_name]self.collection = self.db[collection_name]def insert_batch(self, data_list):try:if data_list: # 非空检查result = self.collection.insert_many(data_list)print(f"成功插入{len(result.inserted_ids)}条数据")except Exception as e:print(f"插入失败: {str(e)}")
优化建议:
- 创建索引加速查询:
self.collection.create_index([("title", pymongo.TEXT)]) - 批量插入减少IO操作
- 错误重试机制
三、完整爬虫实现(腾讯新闻案例)
3.1 主程序逻辑
import timedef main():base_url = "https://new.qq.com/ch/news" # 示例URLstorage = MongoDBStorage()for page in range(1, 6): # 爬取前5页url = f"{base_url}?page={page}"html = fetch_page(url)if html:news_data = parse_news(html)storage.insert_batch(news_data)time.sleep(random.uniform(2, 5)) # 礼貌爬取else:print(f"第{page}页获取失败")if __name__ == "__main__":main()
3.2 运行效果
- 每分钟约采集30-50条新闻
- MongoDB中数据示例:
{"_id": ObjectId("67a3f2e8b1e2a1f4c8d3e5f2"),"title": "央行发布新货币政策","url": "https://new.qq.com/rain/a/20251027A01XXY","publish_time": "2025-10-27 10:30","crawl_time": "2025-10-27 11:15:22" }
四、反爬机制深度解析与应对
4.1 常见反爬手段
| 反爬类型 | 腾讯新闻表现 | 应对方案 |
|---|---|---|
| IP限制 | 连续访问50次后封IP | 代理池轮换+请求间隔3-5秒 |
| User-Agent检测 | 识别非浏览器请求 | 随机UA池(包含移动端UA) |
| 行为分析 | 检测鼠标移动轨迹等 | Selenium模拟真实用户操作 |
| 数据加密 | 关键数据通过JS动态加载 | 执行JS获取渲染后DOM(Pyppeteer) |
4.2 高级反爬突破方案
方案1:Selenium无头浏览器
from selenium import webdriver
from selenium.webdriver.chrome.options import Optionsdef get_dynamic_content(url):options = Options()options.add_argument("--headless")options.add_argument("--disable-gpu")driver = webdriver.Chrome(options=options)try:driver.get(url)time.sleep(3) # 等待JS执行return driver.page_sourcefinally:driver.quit()
方案2:验证码识别
- 图形验证码:使用Tesseract OCR
- 滑动验证码:计算缺口位置(需机器学习模型)
五、数据质量保障体系
5.1 数据清洗流程
def clean_data(raw_data):cleaned = []for item in raw_data:# 标准化时间格式if item.get("publish_time"):try:item["publish_time"] = datetime.strptime(item["publish_time"], "%Y-%m-%d %H:%M").strftime("%Y-%m-%d")except ValueError:item["publish_time"] = None# 去除特殊字符if "title" in item:item["title"] = re.sub(r'[\t\n\r]', '', item["title"])cleaned.append(item)return cleaned
5.2 异常处理机制
def safe_fetch(url):try:return fetch_page(url)except requests.exceptions.SSLError:print(f"SSL错误: {url}")return Noneexcept requests.exceptions.ConnectionError:print(f"连接错误: {url}")return Noneexcept Exception as e:print(f"未知错误: {str(e)}")return None
六、性能优化实战
6.1 多线程加速方案
from concurrent.futures import ThreadPoolExecutordef multi_thread_crawl(urls):results = []with ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(fetch_page, url) for url in urls]for future in futures:html = future.result()if html:results.extend(parse_news(html))return results
测试数据:
- 单线程:10页/分钟
- 5线程:35页/分钟(提升250%)
6.2 分布式爬虫架构
爬虫节点1 → 代理池 → 任务队列 → 结果存储
爬虫节点2 → ↑ ↓
爬虫节点3 → 任务分配 MongoDB集群
实现要点:
- 使用Redis作为任务队列
- 每个节点独立运行但共享代理池
- 通过MongoDB分片存储海量数据
七、常见问题Q&A
Q1:被网站封IP怎么办?
A:立即启用备用代理池,建议:
- 使用住宅代理(如站大爷IP代理)
- 配合每请求更换IP策略
- 设置请求间隔3-5秒
- 遇到403错误时自动切换代理
Q2:如何避免被法律追责?
A:遵守三原则:
- 检查
robots.txt(如腾讯新闻允许爬取/ch/路径) - 不采集用户隐私数据(如手机号、身份证号)
- 控制采集频率(建议QPS<1)
Q3:数据重复存储怎么解决?
A:MongoDB去重方案:
# 创建唯一索引
self.collection.create_index([("url", pymongo.ASCENDING)], unique=True)# 插入时处理重复
try:self.collection.insert_one(data)
except pymongo.errors.DuplicateKeyError:print("数据已存在")
Q4:如何应对动态加载内容?
A:分情况处理:
- 简单动态内容:分析XHR请求,直接调用API
- 复杂动态内容:使用Pyppeteer执行JS
from pyppeteer import launchasync def get_js_rendered(url):browser = await launch(headless=True)page = await browser.newPage()await page.goto(url)content = await page.content()await browser.close()return content
Q5:MongoDB存储空间不足?
A:优化方案:
- 启用WiredTiger压缩引擎(默认)
- 设置TTL索引自动过期旧数据:
self.collection.create_index([("crawl_time", pymongo.ASCENDING)],expireAfterSeconds=60*60*24*30 # 30天后自动删除 ) - 定期归档数据到HDFS/S3
八、进阶技巧:从爬取到分析
8.1 实时分析管道
# 结合Pandas进行实时分析
import pandas as pddef analyze_news():df = pd.DataFrame(list(storage.collection.find()))# 词频统计from collections import Counterwords = " ".join(df["title"].dropna()).split()word_counts = Counter(words)print(word_counts.most_common(10))# 时间趋势分析df["date"] = pd.to_datetime(df["publish_time"])daily_counts = df.groupby("date").size()daily_counts.plot(title="新闻发布量趋势")
8.2 自动化报告生成
from fpdf import FPDFdef generate_report(data):pdf = FPDF()pdf.add_page()pdf.set_font("Arial", size=12)pdf.cell(200, 10, txt="新闻采集日报", ln=1, align="C")pdf.cell(200, 10, txt=f"采集时间: {time.strftime('%Y-%m-%d')}", ln=1)pdf.cell(200, 10, txt=f"总条数: {len(data)}", ln=1)# 添加图表(需配合matplotlib)pdf.output("news_report.pdf")
九、总结与展望
通过本文的实践,我们掌握了:
- 完整爬虫系统的搭建(请求→解析→存储)
- 反爬机制的多种应对策略
- MongoDB的高效使用技巧
- 数据质量保障方法
未来发展方向:
- 结合机器学习实现智能解析
- 开发可视化爬虫管理平台
- 探索Serverless架构的爬虫部署
新闻数据采集是大数据分析的基础环节,掌握这些技术后,你可以进一步构建舆情监控系统、热点预测模型等高级应用。记住:技术本身无善恶,关键在于如何使用——在合法合规的前提下,让数据发挥更大价值。
