Airflow 入门案例教程
Airflow 入门案例教程
1. 安装 Airflow 环境
1.1安装Airflow环境
环境信息: Python 版本: 3.10.18
- 环境名称: crawl_py310 (下面所有涉及环境名称命令,改成相应的个人环境名称)
# 环境名称: crawl_py310
# 环境路径: `/Library/anaconda3/envs/crawl_py310`
# 安装命令: `constraints-3.10.txt` 使用约束文件确保所有依赖包的版本兼容性
python -m pip install "apache-airflow==2.7.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
1.2检查Airflow环境
# 检查 Airflow 版本
python -c "import airflow; print(airflow.__version__)"# 检查安装的包
pip list | grep airflow
airflow version
1.3常见安装问题
问题1: 版本冲突
现象: 安装时出现版本冲突错误
原因: 依赖包版本不兼容
解决: 使用约束文件,或清理环境重新安装
问题2: 网络超时
现象: 下载包时网络超时
原因: 网络连接不稳定或防火墙限制
解决: 使用国内镜像源或重试安装
问题3: 权限不足
现象: 安装时提示权限错误
原因: 没有写入权限
解决: 使用 --user
参数或检查环境权限
2. Airflow 项目环境配置
2.1 环境配置流程图
flowchart TDsubgraph ENV ["1. 环境准备阶段"]A[激活 Python 环境<br/>conda activate crawl_py310]B[设置环境变量<br/>AIRFLOW_HOME=~/airflow]C[创建目录结构<br/>mkdir -p ~/airflow/dags/logs/plugins]endsubgraph DB ["2. 数据库初始化阶段"]D[初始化数据库<br/>airflow db init]E[验证数据库连接<br/>airflow db check]F[创建管理员用户<br/>airflow users create]endsubgraph SERVICE ["3. 服务启动阶段"]G[启动 Web 服务器<br/>airflow webserver]H[启动调度器<br/>airflow scheduler]endENV --> DBDB --> SERVICEstyle ENV fill:#e1f5fe,stroke:#01579b,stroke-width:3pxstyle DB fill:#f3e5f5,stroke:#4a148c,stroke-width:3pxstyle SERVICE fill:#e8f5e8,stroke:#1b5e20,stroke-width:3pxstyle A fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle B fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle C fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle D fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle E fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle F fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle G fill:#ffffff,stroke:#1b5e20,stroke-width:1pxstyle H fill:#ffffff,stroke:#1b5e20,stroke-width:1px
流程说明:
-
一次性设置阶段(蓝色节点):
- 激活 Python 环境:确保使用正确的 Python 版本和依赖包
- 设置环境变量:定义 Airflow 工作目录和配置
- 创建目录结构:建立必要的文件夹结构
-
数据库初始化阶段(紫色节点):
- 初始化数据库:创建 SQLite 数据库和表结构
- 验证数据库连接:确认数据库可以正常访问
- 创建管理员用户:设置登录账户
-
服务启动阶段(绿色节点):
- 启动 Web 服务器:提供 Web UI 界面
- 启动调度器:执行任务调度和监控
重要说明:
- 前6个步骤为一次性设置,完成后无需重复
- 最后2个步骤为每次启动 Airflow 时需要执行
- 所有步骤必须按顺序执行,确保依赖关系正确
2.2 环境准备阶段
激活 Python 环境
# 切换到 Airflow 环境
conda activate crawl_py310# 验证环境
which python
python --version
设置环境变量
# 设置 Airflow 主目录
export AIRFLOW_HOME=~/airflow# 设置时区(可选,默认为 UTC)
export AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai# 验证环境变量
echo $AIRFLOW_HOME
创建目录结构
# 创建 Airflow 必要的目录结构
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins# 验证目录创建
ls -la ~/airflow/
2.3数据库初始化阶段
2.3.1 数据库初始化
# 切换到 Airflow 环境
conda activate crawl_py310# 检查数据库连接状态
airflow db check# 初始化 Airflow 元数据库(默认使用 SQLite)
airflow db init# 验证数据库初始化
airflow db check# 数据库重新初始化
airflow dbt reset
# 重新初始化数据库(删除所有数据并重新创建)
airflow db reset --yes
关闭数据库服务
本项目是采用 SQLite 数据库,不需要单独的服务进程,关闭 Airflow 服务时,SQLite 数据库连接会自动关闭。
2.3.2 数据库连接排障
如果 airflow db check
返回错误,可能的原因和解决方案:
错误1: 数据库文件不存在
# 错误信息
[ERROR] Connection failed.
[ERROR] No such file or directory: '/path/to/airflow.db'# 解决方案
# 1. 检查 AIRFLOW_HOME 环境变量
echo $AIRFLOW_HOME# 2. 创建 Airflow 目录
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins# 3. 设置环境变量
export AIRFLOW_HOME=~/airflow# 4. 初始化数据库
airflow db init
错误2: 数据库未初始化
# 错误信息
[ERROR] Connection failed.
[ERROR] no such table: ab_permission# 解决方案
# 直接初始化数据库
airflow db init
错误3: 权限问题
# 错误信息
[ERROR] Connection failed.
[ERROR] permission denied: '/path/to/airflow.db'# 解决方案
# 1. 检查文件权限
ls -la ~/airflow/airflow.db# 2. 修改权限
chmod 644 ~/airflow/airflow.db# 3. 或者重新创建数据库
rm ~/airflow/airflow.db
airflow db init
错误4: 配置错误
# 错误信息
[ERROR] Connection failed.
[ERROR] Invalid database URL format# 解决方案
# 1. 检查数据库配置
airflow config get-value database sql_alchemy_conn# 2. 重置为默认配置
unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
export AIRFLOW_HOME=~/airflow# 3. 重新初始化
airflow db init
使用排障脚本:
# 运行自动排障脚本
./troubleshoot_db.sh# 脚本会自动检查并修复常见问题
2.3.3 创建和管理用户
查看用户信息
# 列出所有用户
airflow users list# 查看用户详细信息(通过用户名)
airflow users list | grep <username># 查看用户详细信息(通过邮箱)
airflow users list | grep <email>
管理用户
角色 | 权限范围 | 适用场景 |
---|---|---|
Public | 仅能访问登录页面,无实际操作权限 | 临时访客(基本无用) |
Viewer | 查看 DAG、任务日志、任务状态,但不能修改或触发 | 监控人员/只读审计 |
User | 查看 + 编辑 DAG 代码(可上传/修改),但不能触发任务或修改变量 | 开发人员(编写DAG) |
Op | 查看 + 触发任务、清除任务状态、标记成功/失败,但不能修改 DAG 代码 | 运维人员(日常操作) |
Admin | 完全控制(包括用户管理、变量/连接编辑、所有 DAG 操作) | 系统管理员 |
# 创建管理员用户
airflow users create --username admin --password admin123 --role Admin --email xxx@xxx.com --firstname xx --lastname xx# 创建普通用户, 邮箱冲突:Airflow 不允许重复的邮箱地址,使用不同邮箱解决
airflow users create --username xx --password xxx123 --firstname xx --lastname he --role Public --email xxxx@xxx.com
# 授权
airflow users add-role --username xx --role User
airflow users add-role --username xx --role Viewer# 验证用户创建
airflow users list
删除用户
# 通过用户名删除用户
airflow users delete --username test_user# 通过邮箱删除用户
airflow users delete --email <email>
更改用户
# 为用户添加角色
airflow users add-role --username <username> --role <role># 移除用户角色
airflow users remove-role --username <username> --role <role>
2.4 服务启动阶段
2.4.1 查看 Airflow 服务
查看Airflow进程
# 查看所有 Airflow 相关进程
ps aux | grep airflow# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080# 查看特定 Airflow 进程(更精确的过滤)
ps aux | grep -E "(airflow webserver|airflow scheduler|airflow worker)"# 查看进程树(显示父子关系)
pstree -p | grep airflow# 查看进程详细信息
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime
查看端口占用情况
# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080# 查看所有监听端口
lsof -i -P -n | grep LISTEN# 查看所有 Airflow 相关端口
lsof -i | grep airflow# 使用 netstat 查看端口
netstat -an | grep :8080# 使用 ss 命令查看端口(更现代的方式)
ss -tuln | grep :8080
查看 Airflow 服务状态
# 检查 Airflow 数据库连接状态
airflow db check# 检查 Airflow 配置
airflow config list# 检查 DAG 列表
airflow dags list# 检查任务状态
airflow tasks list
2.4.2 启动 Airflow 服务
启动 Web 服务器
# 前台启动 Web 服务器(推荐用于调试)
airflow webserver --port 8080# 后台启动 Web 服务器(推荐用于生产环境)
nohup airflow webserver --port 8080 > ~/Downloads/airflow_webserver.log 2>&1 &# 使用其他端口启动(如果 8080 被占用)
airflow webserver --port 8081# 指定主机地址启动
airflow webserver --host 0.0.0.0 --port 8080# 检查airflow web是否启动
ps aux | grep airflow-webserver
ps aux | grep airflow
# 检查端口
lsof -i :8080
# 检查 Airflow Web ip
curl -I http://localhost:8080
启动调度器
调度器(Scheduler) 类似 linux crontab,没有 crontab 则不能自动调度任务,但仍可以手动调度
# 前台启动调度器(推荐用于调试)
airflow scheduler# 后台启动调度器(推荐用于生产环境)
nohup airflow scheduler > ~/Downloads/airflow_scheduler.log 2>&1 &# 指定配置文件启动
airflow scheduler --config /path/to/airflow.cfg# 启动调度器并指定日志级别
airflow scheduler --log-level DEBUG# 检查scheduler启动情况
ps aux | grep -E "(airflow scheduler|airflow worker)"
服务启动验证
# 检查进程是否启动
ps aux | grep airflow# 检查端口是否监听
lsof -i :8080# 检查日志文件
tail -f airflow_webserver.log
tail -f airflow_scheduler.log# 检查服务健康状态
curl -I http://localhost:8080
2.4.3 验证服务功能
Web UI 访问验证
# 检查 Web 服务器是否可访问
curl -I http://localhost:8080# 检查 API 端点
curl http://localhost:8080/api/v1/health# 检查版本信息
curl http://localhost:8080/api/v1/version
登录验证
- Web UI 地址: http://localhost:8080
- 管理员账户:
- 用户名: admin
- 密码: admin123
功能验证命令
# 检查 DAG 列表
airflow dags list# 检查任务列表
airflow tasks list# 检查连接配置
airflow connections list# 检查变量配置
airflow variables list# 检查池配置
airflow pools list# 检查插件
airflow plugins
服务状态监控
# 查看调度器状态
airflow jobs check# 查看任务实例
airflow tasks list <dag_id># 查看 DAG 运行历史
airflow dags list-runs# 查看任务运行历史
airflow tasks list-runs <dag_id> <task_id>
日志查看
# 查看 Web 服务器日志
tail -f ~/airflow/logs/webserver/webserver.log# 查看调度器日志
tail -f ~/airflow/logs/scheduler/latest/*.log# 查看 DAG 日志
airflow tasks logs <dag_id> <task_id> <execution_date># 查看最新日志
find ~/airflow/logs -name "*.log" -type f -exec ls -lt {} + | head -10
性能检查
# 检查数据库连接
airflow db check# 检查配置
airflow config list# 检查版本信息
airflow version# 检查环境信息
airflow info
常见问题排查
# 如果 Web UI 无法访问
curl -v http://localhost:8080# 如果调度器不工作
airflow scheduler --dry-run# 检查端口占用
lsof -i :8080# 检查进程状态
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime
3. DAG 开发与实践
3.1 第一个实战 DAG
3.1.1 创建 Hello World DAG
创建 DAG 文件
# 创建 DAG 目录(如果不存在)
mkdir -p ~/airflow/dags# 创建第一个 DAG 文件
touch ~/airflow/dags/hello_world_dag.py
编写 DAG 代码
# ~/airflow/dags/hello_world_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator# 定义默认参数
default_args = {'owner': 'airflow','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}# 定义 Python 函数
def print_hello():"""打印 Hello World"""print("Hello World from Python!")return "Hello World"def print_date(**context):"""打印执行日期"""execution_date = context['execution_date']print(f"Execution date: {execution_date}")return f"Executed on {execution_date}"# 创建 DAG
with DAG('hello_world_dag',default_args=default_args,description='A simple Hello World DAG',schedule_interval=timedelta(minutes=5), # 每5分钟执行一次start_date=datetime(2023, 1, 1),catchup=False,tags=['example', 'hello-world'],
) as dag:# 任务1:使用 BashOperator 打印 Hellotask_hello = BashOperator(task_id='print_hello_bash',bash_command='echo "Hello World from Bash!"',)# 任务2:使用 PythonOperator 打印 Hellotask_hello_python = PythonOperator(task_id='print_hello_python',python_callable=print_hello,)# 任务3:打印执行日期task_date = PythonOperator(task_id='print_date',python_callable=print_date,)# 设置任务依赖关系task_hello >> task_hello_python >> task_date
验证 DAG 创建
# 检查 DAG 是否被正确加载
airflow dags list | grep hello_world_dag# 检查 DAG 语法
airflow dags test hello_world_dag 2023-01-01# 查看 DAG 任务列表
airflow tasks list hello_world_dag# 查看任务依赖关系
airflow tasks list hello_world_dag --tree
3.1.2 手动触发 DAG
# 手动触发 DAG 执行
airflow dags trigger hello_world_dag# 指定执行日期触发
airflow dags trigger hello_world_dag --conf '{"execution_date": "2023-01-01T00:00:00"}'# 查看 DAG 运行状态
airflow dags list-runs --dag-id hello_world_dag
3.1.3 Web UI 验证
- 访问 Web UI: http://localhost:8080
- 登录: 使用 admin/admin123 账户
- 查看 DAG: 在 DAGs 列表中找到
hello_world_dag
- 手动触发: 点击 “Trigger DAG” 按钮
- 监控执行: 在 Graph View 中查看任务执行状态
3.2 任务依赖关系实践
3.2.1 线性依赖
# ~/airflow/dags/linear_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('linear_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 创建多个任务task_a = BashOperator(task_id='task_a',bash_command='echo "Task A completed"',)task_b = BashOperator(task_id='task_b',bash_command='echo "Task B completed"',)task_c = BashOperator(task_id='task_c',bash_command='echo "Task C completed"',)task_d = BashOperator(task_id='task_d',bash_command='echo "Task D completed"',)# 设置线性依赖:A -> B -> C -> Dtask_a >> task_b >> task_c >> task_d
3.2.2 并行依赖
# ~/airflow/dags/parallel_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('parallel_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 起始任务start_task = BashOperator(task_id='start_task',bash_command='echo "Starting parallel tasks"',)# 并行任务task_1 = BashOperator(task_id='task_1',bash_command='sleep 10 && echo "Task 1 completed"',)task_2 = BashOperator(task_id='task_2',bash_command='sleep 15 && echo "Task 2 completed"',)task_3 = BashOperator(task_id='task_3',bash_command='sleep 20 && echo "Task 3 completed"',)# 结束任务end_task = BashOperator(task_id='end_task',bash_command='echo "All parallel tasks completed"',)# 设置依赖:start -> [task_1, task_2, task_3] -> endstart_task >> [task_1, task_2, task_3] >> end_task
3.2.3 复杂依赖关系
# ~/airflow/dags/complex_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperatorwith DAG('complex_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 起始任务start = EmptyOperator(task_id='start')# 第一组并行任务task_a1 = BashOperator(task_id='task_a1',bash_command='echo "Task A1 completed"',)task_a2 = BashOperator(task_id='task_a2',bash_command='echo "Task A2 completed"',)# 中间任务middle = BashOperator(task_id='middle',bash_command='echo "Middle task completed"',)# 第二组并行任务task_b1 = BashOperator(task_id='task_b1',bash_command='echo "Task B1 completed"',)task_b2 = BashOperator(task_id='task_b2',bash_command='echo "Task B2 completed"',)task_b3 = BashOperator(task_id='task_b3',bash_command='echo "Task B3 completed"',)# 结束任务end = EmptyOperator(task_id='end')# 设置复杂依赖关系start >> [task_a1, task_a2] >> middle >> [task_b1, task_b2, task_b3] >> end
3.3 常用 Operator 实战
3.3.1 BashOperator 详解
# ~/airflow/dags/bash_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('bash_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 基本命令执行basic_cmd = BashOperator(task_id='basic_cmd',bash_command='echo "Current date: $(date)"',)# 使用模板变量template_cmd = BashOperator(task_id='template_cmd',bash_command='echo "Execution date: {{ ds }}"',)# 执行脚本文件script_cmd = BashOperator(task_id='script_cmd',bash_command='''set -eecho "Creating directory..."mkdir -p /tmp/airflow_testecho "Writing file..."echo "Hello from Airflow" > /tmp/airflow_test/test.txtecho "File created successfully"''',)# 使用环境变量env_cmd = BashOperator(task_id='env_cmd',bash_command='echo "User: $USER, Home: $HOME"',env={'CUSTOM_VAR': 'custom_value'},)# 设置依赖basic_cmd >> template_cmd >> script_cmd >> env_cmd
3.3.2 PythonOperator 详解
# ~/airflow/dags/python_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperatordef simple_function():"""简单函数"""print("Hello from Python function!")return "Success"def function_with_args(name, age):"""带参数的函数"""print(f"Name: {name}, Age: {age}")return f"Processed {name}"def function_with_context(**context):"""使用 Airflow 上下文的函数"""execution_date = context['execution_date']task_instance = context['task_instance']dag = context['dag']print(f"Execution date: {execution_date}")print(f"DAG ID: {dag.dag_id}")print(f"Task ID: {task_instance.task_id}")return "Context processed"def data_processing_function(**context):"""数据处理函数"""# 模拟数据处理data = [1, 2, 3, 4, 5]result = sum(data)print(f"Data processing result: {result}")# 将结果推送到 XComcontext['task_instance'].xcom_push(key='sum_result', value=result)return resultwith DAG('python_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 简单函数调用simple_task = PythonOperator(task_id='simple_task',python_callable=simple_function,)# 带参数的函数调用args_task = PythonOperator(task_id='args_task',python_callable=function_with_args,op_args=['Alice', 25], # 位置参数)# 使用上下文的函数context_task = PythonOperator(task_id='context_task',python_callable=function_with_context,provide_context=True,)# 数据处理函数data_task = PythonOperator(task_id='data_task',python_callable=data_processing_function,provide_context=True,)# 设置依赖simple_task >> args_task >> context_task >> data_task
3.3.3 EmailOperator 使用
# ~/airflow/dags/email_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperatordef generate_report():"""生成报告内容"""return """<h2>Daily Report</h2><p>This is a daily report generated by Airflow.</p><ul><li>Task 1: Completed</li><li>Task 2: Completed</li><li>Task 3: Completed</li></ul><p>Generated on: {{ ds }}</p>"""with DAG('email_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 生成报告generate_report_task = PythonOperator(task_id='generate_report',python_callable=generate_report,)# 发送邮件send_email_task = EmailOperator(task_id='send_email',to=['recipient@example.com'],subject='Daily Report - {{ ds }}',html_content="{{ task_instance.xcom_pull(task_ids='generate_report') }}",)# 设置依赖generate_report_task >> send_email_task
3.4 变量和连接实战
3.4.1 使用 Variables
设置变量
# 通过命令行设置变量
airflow variables set "data_path" "/opt/data"
airflow variables set "api_url" "https://api.example.com"
airflow variables set "email_recipients" '["user1@example.com", "user2@example.com"]'# 查看变量
airflow variables list
airflow variables get data_path
在 DAG 中使用变量
# ~/airflow/dags/variables_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variabledef use_variables(**context):"""使用 Airflow 变量"""# 获取变量data_path = Variable.get("data_path")api_url = Variable.get("api_url")email_recipients = Variable.get("email_recipients", deserialize_json=True)print(f"Data path: {data_path}")print(f"API URL: {api_url}")print(f"Email recipients: {email_recipients}")return "Variables processed"def use_template_variables(**context):"""使用模板变量"""# 在函数中访问模板变量execution_date = context['ds']dag_id = context['dag'].dag_idprint(f"Execution date: {execution_date}")print(f"DAG ID: {dag_id}")return "Template variables processed"with DAG('variables_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 使用变量的任务variables_task = PythonOperator(task_id='use_variables',python_callable=use_variables,provide_context=True,)# 使用模板变量的任务template_task = PythonOperator(task_id='use_template_variables',python_callable=use_template_variables,provide_context=True,)# 设置依赖variables_task >> template_task
3.4.2 使用 Connections
设置 MySQL 连接
# 通过命令行设置连接
airflow connections add 'mysql_default' \--conn-type 'mysql' \--conn-host 'localhost' \--conn-login 'root' \--conn-password 'password' \--conn-port '3306' \--conn-schema 'test_db'# 查看连接
airflow connections list
airflow connections get mysql_default
在 DAG 中使用连接
# ~/airflow/dags/connections_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.mysql_hook import MySqlHookdef query_mysql_data(**context):"""查询 MySQL 数据"""# 使用 MySQL Hookmysql_hook = MySqlHook(mysql_conn_id='mysql_default')# 执行查询sql = "SELECT COUNT(*) as count FROM users"result = mysql_hook.get_first(sql)print(f"User count: {result[0]}")# 推送到 XComcontext['task_instance'].xcom_push(key='user_count', value=result[0])return result[0]with DAG('connections_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 使用 MySqlOperatormysql_task = MySqlOperator(task_id='mysql_query',mysql_conn_id='mysql_default',sql='SELECT * FROM users LIMIT 5',)# 使用 Python 函数查询python_mysql_task = PythonOperator(task_id='python_mysql_query',python_callable=query_mysql_data,provide_context=True,)# 设置依赖mysql_task >> python_mysql_task
3.5 实际项目案例
3.5.1 数据采集 DAG
# ~/airflow/dags/data_collection_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import osdef fetch_api_data(**context):"""从 API 获取数据"""api_url = "https://jsonplaceholder.typicode.com/posts"try:response = requests.get(api_url)response.raise_for_status()data = response.json()# 保存数据到文件output_path = "/tmp/api_data.json"with open(output_path, 'w') as f:json.dump(data, f, indent=2)print(f"Data fetched successfully, saved to {output_path}")print(f"Total records: {len(data)}")# 推送到 XComcontext['task_instance'].xcom_push(key='record_count', value=len(data))return len(data)except Exception as e:print(f"Error fetching data: {e}")raisedef process_data(**context):"""处理数据"""input_path = "/tmp/api_data.json"if not os.path.exists(input_path):raise FileNotFoundError(f"Input file not found: {input_path}")with open(input_path, 'r') as f:data = json.load(f)# 简单的数据处理:提取标题titles = [item['title'] for item in data]# 保存处理结果output_path = "/tmp/processed_data.txt"with open(output_path, 'w') as f:for title in titles:f.write(f"{title}\n")print(f"Data processed, titles saved to {output_path}")return len(titles)def cleanup_files(**context):"""清理临时文件"""files_to_clean = ["/tmp/api_data.json", "/tmp/processed_data.txt"]for file_path in files_to_clean:if os.path.exists(file_path):os.remove(file_path)print(f"Cleaned up: {file_path}")return "Cleanup completed"with DAG('data_collection_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=6), # 每6小时执行一次catchup=False,tags=['data-collection', 'api'],
) as dag:# 获取数据fetch_task = PythonOperator(task_id='fetch_api_data',python_callable=fetch_api_data,provide_context=True,)# 处理数据process_task = PythonOperator(task_id='process_data',python_callable=process_data,provide_context=True,)# 清理文件cleanup_task = PythonOperator(task_id='cleanup_files',python_callable=cleanup_files,provide_context=True,)# 设置依赖fetch_task >> process_task >> cleanup_task
3.5.2 数据处理管道
# ~/airflow/dags/data_pipeline_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import pandas as pd
import numpy as npdef extract_data(**context):"""提取数据"""# 模拟数据提取data = {'id': range(1, 101),'name': [f'User_{i}' for i in range(1, 101)],'age': np.random.randint(18, 65, 100),'salary': np.random.randint(30000, 100000, 100),'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 100)}df = pd.DataFrame(data)# 保存原始数据df.to_csv('/tmp/raw_data.csv', index=False)print(f"Extracted {len(df)} records")return len(df)def transform_data(**context):"""转换数据"""# 读取原始数据df = pd.read_csv('/tmp/raw_data.csv')# 数据清洗和转换# 1. 处理缺失值df = df.dropna()# 2. 添加计算字段df['salary_category'] = df['salary'].apply(lambda x: 'High' if x > 70000 else 'Medium' if x > 50000 else 'Low')# 3. 按部门统计dept_stats = df.groupby('department').agg({'salary': ['mean', 'count'],'age': 'mean'}).round(2)# 保存转换后的数据df.to_csv('/tmp/transformed_data.csv', index=False)dept_stats.to_csv('/tmp/department_stats.csv')print(f"Transformed {len(df)} records")print("Department statistics:")print(dept_stats)return len(df)def load_data(**context):"""加载数据"""# 模拟数据加载到数据库df = pd.read_csv('/tmp/transformed_data.csv')# 这里可以添加实际的数据库插入逻辑# 例如:使用 SQLAlchemy 或其他数据库连接器print(f"Loaded {len(df)} records to database")# 生成报告report = f"""Data Pipeline Report - {context['ds']}======================================Total records processed: {len(df)}Records by salary category:{df['salary_category'].value_counts().to_dict()}"""with open('/tmp/pipeline_report.txt', 'w') as f:f.write(report)print("Pipeline report generated")return len(df)with DAG('data_pipeline_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1), # 每天执行一次catchup=False,tags=['data-pipeline', 'etl'],
) as dag:# 提取数据extract_task = PythonOperator(task_id='extract_data',python_callable=extract_data,provide_context=True,)# 转换数据transform_task = PythonOperator(task_id='transform_data',python_callable=transform_data,provide_context=True,)# 加载数据load_task = PythonOperator(task_id='load_data',python_callable=load_data,provide_context=True,)# 设置依赖:ETL 管道extract_task >> transform_task >> load_task
3.6 错误处理和监控
3.6.1 任务重试机制
# ~/airflow/dags/retry_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import randomdef task_with_retry(**context):"""可能失败的任务,演示重试机制"""# 模拟随机失败if random.random() < 0.7: # 70% 概率失败raise Exception("Random failure for demonstration")print("Task completed successfully!")return "Success"def task_with_custom_retry(**context):"""自定义重试逻辑的任务"""try:# 模拟业务逻辑result = 10 / 0 # 故意制造错误return resultexcept ZeroDivisionError:print("Caught ZeroDivisionError, will retry...")raisewith DAG('retry_demo_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,default_args={'retries': 3, # 最多重试3次'retry_delay': timedelta(minutes=1), # 重试间隔1分钟'retry_exponential_backoff': True, # 指数退避'max_retry_delay': timedelta(minutes=10), # 最大重试间隔},
) as dag:# 基本重试任务retry_task = PythonOperator(task_id='task_with_retry',python_callable=task_with_retry,provide_context=True,)# 自定义重试任务custom_retry_task = PythonOperator(task_id='task_with_custom_retry',python_callable=task_with_custom_retry,provide_context=True,retries=5, # 覆盖默认重试次数retry_delay=timedelta(seconds=30), # 覆盖默认重试间隔)# 设置依赖retry_task >> custom_retry_task
3.6.2 监控和告警
# ~/airflow/dags/monitoring_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperatordef critical_task(**context):"""关键任务,失败时需要告警"""# 模拟关键业务逻辑print("Executing critical business logic...")# 模拟偶尔失败import randomif random.random() < 0.3: # 30% 概率失败raise Exception("Critical task failed!")print("Critical task completed successfully")return "Success"def generate_monitoring_report(**context):"""生成监控报告"""execution_date = context['ds']report = f"""<h2>DAG Monitoring Report</h2><p><strong>Execution Date:</strong> {execution_date}</p><p><strong>DAG ID:</strong> {context['dag'].dag_id}</p><p><strong>Status:</strong> Running</p><h3>Task Summary</h3><ul><li>Critical Task: Completed</li><li>Report Generation: Completed</li></ul><p>This is an automated report generated by Airflow.</p>"""# 推送到 XComcontext['task_instance'].xcom_push(key='monitoring_report', value=report)return "Report generated"with DAG('monitoring_demo_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=2),catchup=False,default_args={'email_on_failure': True, # 失败时发送邮件'email_on_retry': False,'retries': 2,'retry_delay': timedelta(minutes=5),},
) as dag:# 关键任务critical_task_op = PythonOperator(task_id='critical_task',python_callable=critical_task,provide_context=True,)# 生成监控报告report_task = PythonOperator(task_id='generate_monitoring_report',python_callable=generate_monitoring_report,provide_context=True,)# 发送监控邮件email_task = EmailOperator(task_id='send_monitoring_email',to=['admin@example.com'],subject='DAG Monitoring Report - {{ ds }}',html_content="{{ task_instance.xcom_pull(task_ids='generate_monitoring_report', key='monitoring_report') }}",)# 设置依赖critical_task_op >> report_task >> email_task
3.7 最佳实践总结
3.7.1 DAG 开发最佳实践
1. 文件组织
- 每个 DAG 文件只包含一个 DAG
- 使用描述性的文件名和 DAG ID
- 按业务模块组织 DAG 文件
2. 代码质量
- 添加适当的注释和文档
- 使用有意义的任务 ID
- 避免在 DAG 文件中执行耗时操作
3. 错误处理
- 设置合理的重试策略
- 使用 try-catch 处理异常
- 配置适当的告警机制
4. 性能优化
- 避免不必要的任务依赖
- 合理设置调度间隔
- 使用适当的资源限制
3.7.2 测试和验证
# 测试 DAG 语法
airflow dags test <dag_id> <execution_date># 验证任务依赖
airflow tasks list <dag_id> --tree# 检查 DAG 导入错误
airflow dags report# 手动触发测试
airflow dags trigger <dag_id>
3.7.3 监控和维护
1. 定期检查
- 监控 DAG 执行状态
- 检查任务失败率
- 查看执行时间趋势
2. 日志管理
- 定期清理日志文件
- 配置日志轮转
- 监控日志大小
3. 性能调优
- 优化任务依赖关系
- 调整调度参数
- 监控资源使用情况
通过以上实战练习,您应该能够:
- 创建和配置各种类型的 DAG
- 理解和使用不同的 Operator
- 管理任务依赖关系
- 使用变量和连接
- 处理错误和监控任务执行
- 开发实际的数据处理管道