Airflow调度爬虫任务:从零搭建高效定时采集系统
目录
一、为什么选择Airflow调度爬虫?
二、Airflow核心概念速解
1. DAG(有向无环图)
2. Operator类型选择
3. 调度参数详解
三、爬虫任务集成实战
1. 基础爬虫封装
2. 动态参数传递
3. 依赖管理技巧
四、高阶功能实现
1. 分布式爬取架构
2. 失败自动处理
3. 数据质量校验
五、监控与优化
1. 关键指标看板
2. 性能优化技巧
六、常见问题Q&A
七、总结与建议
一、为什么选择Airflow调度爬虫?
传统爬虫调度常面临两个痛点:要么用crontab这种简单工具,但缺乏任务依赖管理;要么用Jenkins等CI工具,却不够灵活。Airflow的出现解决了这些矛盾——它用有向无环图(DAG)管理任务依赖,支持分钟级调度,还能通过Web界面监控任务状态。

举个真实案例:某电商公司需要每天采集竞品价格,涉及3个爬虫(列表页→详情页→价格校验)。用crontab时,详情页爬虫常因列表页未完成而报错。改用Airflow后,通过设置depends_on_past=True和wait_for_downstream=True,任务自动按顺序执行,错误率下降90%。
二、Airflow核心概念速解
1. DAG(有向无环图)
想象把爬虫任务拆解成乐高积木:每个积木块是一个Task,用箭头连接表示执行顺序。比如:
with DAG('ecommerce_spider', schedule_interval='0 8 * * *', # 每天8点执行catchup=False) as dag:task1 = PythonOperator(task_id='fetch_list', python_callable=spider_list)task2 = PythonOperator(task_id='fetch_detail', python_callable=spider_detail)task3 = PythonOperator(task_id='validate_price', python_callable=validate_price)task1 >> task2 >> task3 # 定义执行顺序
2. Operator类型选择
- PythonOperator:最常用,直接调用爬虫函数
- BashOperator:适合调用shell命令(如启动Scrapy)
- DockerOperator:当需要隔离环境时使用
- HttpOperator:触发API接口(如通知爬虫结果)
3. 调度参数详解
| 参数 | 作用 | 示例 |
|---|---|---|
schedule_interval | 执行频率 | '@daily' 或 '0 */6 * * *'(每6小时) |
start_date | 首次执行时间 | datetime(2023,1,1) |
retries | 失败重试次数 | retries=3 |
retry_delay | 重试间隔 | retry_delay=timedelta(minutes=5) |
三、爬虫任务集成实战
1. 基础爬虫封装
将Scrapy/Requests爬虫封装成可调用函数:
def spider_list(ds, **kwargs):# ds是执行日期参数,可用于动态构造URLurl = f"https://example.com/products?date={ds}"response = requests.get(url, proxies=get_proxy()) # 使用代理save_to_db(response.json())
2. 动态参数传递
通过template_fields实现动态参数:
class DynamicSpiderOperator(PythonOperator):template_fields = ('url', 'date') # 这些字段会被渲染def execute(self, context):url = self.url.format(date=context['ds'])# 执行爬取...
3. 依赖管理技巧
场景1:详情页必须等列表页完成
解决方案:在详情页Task中设置trigger_rule='all_done'
detail_task = PythonOperator(task_id='fetch_detail',trigger_rule='all_done', # 即使上游失败也执行python_callable=spider_detail
)
场景2:周末不执行校验任务
解决方案:用TimeSensor或自定义BranchPythonOperator
def should_run(**context):return context['ds'].weekday() < 5 # 周一到周五branch_task = BranchPythonOperator(task_id='check_weekday',python_callable=should_run,trigger_rule='all_success'
)
四、高阶功能实现
1. 分布式爬取架构
当单节点性能不足时,可采用:
- CeleryExecutor:将任务分发到Worker集群
- KubernetesExecutor:动态创建Pod执行任务
- RemoteExecutor:配合AWS/GCP等云服务
配置示例(airflow.cfg):
[core]
executor = CeleryExecutor[celery]
broker_url = redis://localhost:6379/0
result_backend = redis://localhost:6379/0
2. 失败自动处理
通过on_failure_callback实现邮件报警:
def send_failure_email(context):task_id = context['task_instance'].task_iderror = context['exception']send_mail(subject=f"Airflow任务失败: {task_id}",body=str(error),to_emails=["admin@example.com"])task = PythonOperator(task_id='critical_spider',on_failure_callback=send_failure_email,# ...其他参数
)
3. 数据质量校验
在爬取后添加校验Task:
def validate_data(ds, **kwargs):df = pd.read_sql("SELECT * FROM products WHERE date=?", params=[ds])if len(df) < 100: # 低于阈值报警raise ValueError("数据量不足")
五、监控与优化
1. 关键指标看板
通过Prometheus+Grafana监控:
- 任务成功率:
airflow_task_instance_success - 执行耗时:
airflow_task_instance_duration - 队列积压:
airflow_scheduler_heartbeat
2. 性能优化技巧
-
并行度调整:
[core] parallelism = 32 # 默认32,可根据CPU核心数调整 -
结果持久化:
task = PythonOperator(task_id='save_results',python_callable=save_data,provide_context=True,output_encoding='utf-8' # 避免编码问题 ) -
日志分级:
import logging logging.getLogger("airflow.task").setLevel(logging.WARNING) # 减少日志量
六、常见问题Q&A
Q1:被网站封IP怎么办?
A:立即启用备用代理池,建议使用住宅代理(如站大爷IP代理),配合每请求更换IP策略。代码示例:
import randomPROXY_POOL = ["http://1.1.1.1:8080","http://2.2.2.2:8080",# ...更多代理
]def get_proxy():return {"http": random.choice(PROXY_POOL)}
Q2:如何避免重复爬取?
A:使用execution_date作为唯一标识,结合数据库去重:
def spider_with_dedup(ds, **kwargs):if db.exists(url=f"https://example.com/item/{ds}"):return # 已爬取则跳过# 执行爬取...
Q3:Airflow和Scrapy如何配合?
A:两种方式:
- 封装Scrapy为命令行:
BashOperator(task_id='run_scrapy',bash_command='scrapy crawl myspider -a date={ds}' ) - 直接调用Scrapy API:
from scrapy.crawler import CrawlerProcess from myproject.spiders import MySpiderdef run_scrapy(ds):process = CrawlerProcess()process.crawl(MySpider, start_date=ds)process.start()
Q4:任务卡住不执行怎么办?
A:按以下步骤排查:
- 检查
airflow-scheduler日志 - 确认Worker是否注册(
airflow workers) - 查看DAG文件是否被加载(Web界面→Browse→DAGs)
- 检查数据库连接(默认使用SQLite,生产环境建议改用PostgreSQL)
Q5:如何实现补数(回填历史数据)?
A:修改DAG的catchup参数并指定start_date:
with DAG('historical_spider',schedule_interval='@daily',start_date=datetime(2023,1,1),catchup=True) as dag: # catchup=True会生成所有未执行的任务# ...任务定义
七、总结与建议
- 小规模试用:先用LocalExecutor+SQLite验证流程
- 渐进式扩展:数据量增大后切换到CeleryExecutor+PostgreSQL
- 监控先行:部署前规划好告警策略
- 文档规范:每个DAG添加
doc_md注释说明业务逻辑
Airflow不是银弹,但它是目前最平衡的爬虫调度解决方案。通过合理设计DAG和参数,可以构建出既稳定又灵活的定时采集系统。实际部署时建议先在测试环境运行一周,观察任务成功率、执行时间分布等指标后再上线生产。
