当前位置: 首页 > news >正文

Airflow 入门案例教程

Airflow 入门案例教程

1. 安装 Airflow 环境

1.1安装Airflow环境

环境信息: Python 版本: 3.10.18

  • 环境名称: crawl_py310 (下面所有涉及环境名称命令,改成相应的个人环境名称)
# 环境名称: crawl_py310
# 环境路径: `/Library/anaconda3/envs/crawl_py310`
# 安装命令: `constraints-3.10.txt` 使用约束文件确保所有依赖包的版本兼容性
python -m pip install "apache-airflow==2.7.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"

1.2检查Airflow环境

# 检查 Airflow 版本
python -c "import airflow; print(airflow.__version__)"# 检查安装的包
pip list | grep airflow
airflow version

1.3常见安装问题

问题1: 版本冲突

现象: 安装时出现版本冲突错误
原因: 依赖包版本不兼容
解决: 使用约束文件,或清理环境重新安装

问题2: 网络超时

现象: 下载包时网络超时
原因: 网络连接不稳定或防火墙限制
解决: 使用国内镜像源或重试安装

问题3: 权限不足

现象: 安装时提示权限错误
原因: 没有写入权限
解决: 使用 --user 参数或检查环境权限

2. Airflow 项目环境配置

2.1 环境配置流程图

flowchart TDsubgraph ENV ["1. 环境准备阶段"]A[激活 Python 环境<br/>conda activate crawl_py310]B[设置环境变量<br/>AIRFLOW_HOME=~/airflow]C[创建目录结构<br/>mkdir -p ~/airflow/dags/logs/plugins]endsubgraph DB ["2. 数据库初始化阶段"]D[初始化数据库<br/>airflow db init]E[验证数据库连接<br/>airflow db check]F[创建管理员用户<br/>airflow users create]endsubgraph SERVICE ["3. 服务启动阶段"]G[启动 Web 服务器<br/>airflow webserver]H[启动调度器<br/>airflow scheduler]endENV --> DBDB --> SERVICEstyle ENV fill:#e1f5fe,stroke:#01579b,stroke-width:3pxstyle DB fill:#f3e5f5,stroke:#4a148c,stroke-width:3pxstyle SERVICE fill:#e8f5e8,stroke:#1b5e20,stroke-width:3pxstyle A fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle B fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle C fill:#ffffff,stroke:#01579b,stroke-width:1pxstyle D fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle E fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle F fill:#ffffff,stroke:#4a148c,stroke-width:1pxstyle G fill:#ffffff,stroke:#1b5e20,stroke-width:1pxstyle H fill:#ffffff,stroke:#1b5e20,stroke-width:1px

流程说明:

  1. 一次性设置阶段(蓝色节点)

    • 激活 Python 环境:确保使用正确的 Python 版本和依赖包
    • 设置环境变量:定义 Airflow 工作目录和配置
    • 创建目录结构:建立必要的文件夹结构
  2. 数据库初始化阶段(紫色节点)

    • 初始化数据库:创建 SQLite 数据库和表结构
    • 验证数据库连接:确认数据库可以正常访问
    • 创建管理员用户:设置登录账户
  3. 服务启动阶段(绿色节点)

    • 启动 Web 服务器:提供 Web UI 界面
    • 启动调度器:执行任务调度和监控

重要说明:

  • 前6个步骤为一次性设置,完成后无需重复
  • 最后2个步骤为每次启动 Airflow 时需要执行
  • 所有步骤必须按顺序执行,确保依赖关系正确

2.2 环境准备阶段

激活 Python 环境

# 切换到 Airflow 环境
conda activate crawl_py310# 验证环境
which python
python --version

设置环境变量

# 设置 Airflow 主目录
export AIRFLOW_HOME=~/airflow# 设置时区(可选,默认为 UTC)
export AIRFLOW__CORE__DEFAULT_TIMEZONE=Asia/Shanghai# 验证环境变量
echo $AIRFLOW_HOME

创建目录结构

# 创建 Airflow 必要的目录结构
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins# 验证目录创建
ls -la ~/airflow/

2.3数据库初始化阶段

2.3.1 数据库初始化
# 切换到 Airflow 环境
conda activate crawl_py310# 检查数据库连接状态
airflow db check# 初始化 Airflow 元数据库(默认使用 SQLite)
airflow db init# 验证数据库初始化
airflow db check# 数据库重新初始化
airflow dbt reset
# 重新初始化数据库(删除所有数据并重新创建)
airflow db reset --yes

关闭数据库服务

本项目是采用 SQLite 数据库,不需要单独的服务进程,关闭 Airflow 服务时,SQLite 数据库连接会自动关闭。

2.3.2 数据库连接排障

如果 airflow db check 返回错误,可能的原因和解决方案:

错误1: 数据库文件不存在

# 错误信息
[ERROR] Connection failed.
[ERROR] No such file or directory: '/path/to/airflow.db'# 解决方案
# 1. 检查 AIRFLOW_HOME 环境变量
echo $AIRFLOW_HOME# 2. 创建 Airflow 目录
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins# 3. 设置环境变量
export AIRFLOW_HOME=~/airflow# 4. 初始化数据库
airflow db init

错误2: 数据库未初始化

# 错误信息
[ERROR] Connection failed.
[ERROR] no such table: ab_permission# 解决方案
# 直接初始化数据库
airflow db init

错误3: 权限问题

# 错误信息
[ERROR] Connection failed.
[ERROR] permission denied: '/path/to/airflow.db'# 解决方案
# 1. 检查文件权限
ls -la ~/airflow/airflow.db# 2. 修改权限
chmod 644 ~/airflow/airflow.db# 3. 或者重新创建数据库
rm ~/airflow/airflow.db
airflow db init

错误4: 配置错误

# 错误信息
[ERROR] Connection failed.
[ERROR] Invalid database URL format# 解决方案
# 1. 检查数据库配置
airflow config get-value database sql_alchemy_conn# 2. 重置为默认配置
unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
export AIRFLOW_HOME=~/airflow# 3. 重新初始化
airflow db init

使用排障脚本:

# 运行自动排障脚本
./troubleshoot_db.sh# 脚本会自动检查并修复常见问题
2.3.3 创建和管理用户

查看用户信息

# 列出所有用户
airflow users list# 查看用户详细信息(通过用户名)
airflow users list | grep <username># 查看用户详细信息(通过邮箱)
airflow users list | grep <email>

管理用户

角色权限范围适用场景
Public仅能访问登录页面,无实际操作权限临时访客(基本无用)
Viewer查看 DAG、任务日志、任务状态,但不能修改或触发监控人员/只读审计
User查看 + 编辑 DAG 代码(可上传/修改),但不能触发任务或修改变量开发人员(编写DAG)
Op查看 + 触发任务、清除任务状态、标记成功/失败,但不能修改 DAG 代码运维人员(日常操作)
Admin完全控制(包括用户管理、变量/连接编辑、所有 DAG 操作)系统管理员
# 创建管理员用户
airflow users create --username admin --password admin123 --role Admin --email xxx@xxx.com --firstname xx --lastname xx# 创建普通用户, 邮箱冲突:Airflow 不允许重复的邮箱地址,使用不同邮箱解决
airflow users create --username xx --password xxx123 --firstname xx --lastname he --role Public --email xxxx@xxx.com
# 授权
airflow users add-role --username xx --role User
airflow users add-role --username xx --role Viewer# 验证用户创建
airflow users list

删除用户

# 通过用户名删除用户
airflow users delete --username test_user# 通过邮箱删除用户
airflow users delete --email <email>

更改用户

# 为用户添加角色
airflow users add-role --username <username> --role <role># 移除用户角色
airflow users remove-role --username <username> --role <role>

2.4 服务启动阶段

2.4.1 查看 Airflow 服务

查看Airflow进程

# 查看所有 Airflow 相关进程
ps aux | grep airflow# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080# 查看特定 Airflow 进程(更精确的过滤)
ps aux | grep -E "(airflow webserver|airflow scheduler|airflow worker)"# 查看进程树(显示父子关系)
pstree -p | grep airflow# 查看进程详细信息
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime

查看端口占用情况

# 查看 8080 端口占用(Airflow Web 服务器默认端口)
lsof -i :8080# 查看所有监听端口
lsof -i -P -n | grep LISTEN# 查看所有 Airflow 相关端口
lsof -i | grep airflow# 使用 netstat 查看端口
netstat -an | grep :8080# 使用 ss 命令查看端口(更现代的方式)
ss -tuln | grep :8080

查看 Airflow 服务状态

# 检查 Airflow 数据库连接状态
airflow db check# 检查 Airflow 配置
airflow config list# 检查 DAG 列表
airflow dags list# 检查任务状态
airflow tasks list
2.4.2 启动 Airflow 服务

启动 Web 服务器

# 前台启动 Web 服务器(推荐用于调试)
airflow webserver --port 8080# 后台启动 Web 服务器(推荐用于生产环境)
nohup airflow webserver --port 8080 > ~/Downloads/airflow_webserver.log 2>&1 &# 使用其他端口启动(如果 8080 被占用)
airflow webserver --port 8081# 指定主机地址启动
airflow webserver --host 0.0.0.0 --port 8080# 检查airflow web是否启动
ps aux | grep airflow-webserver
ps aux | grep airflow
# 检查端口
lsof -i :8080
# 检查 Airflow Web ip
curl -I http://localhost:8080

启动调度器

调度器(Scheduler) 类似 linux crontab,没有 crontab 则不能自动调度任务,但仍可以手动调度

# 前台启动调度器(推荐用于调试)
airflow scheduler# 后台启动调度器(推荐用于生产环境)
nohup airflow scheduler > ~/Downloads/airflow_scheduler.log 2>&1 &# 指定配置文件启动
airflow scheduler --config /path/to/airflow.cfg# 启动调度器并指定日志级别
airflow scheduler --log-level DEBUG# 检查scheduler启动情况
ps aux | grep -E "(airflow scheduler|airflow worker)"

服务启动验证

# 检查进程是否启动
ps aux | grep airflow# 检查端口是否监听
lsof -i :8080# 检查日志文件
tail -f airflow_webserver.log
tail -f airflow_scheduler.log# 检查服务健康状态
curl -I http://localhost:8080
2.4.3 验证服务功能

Web UI 访问验证

# 检查 Web 服务器是否可访问
curl -I http://localhost:8080# 检查 API 端点
curl http://localhost:8080/api/v1/health# 检查版本信息
curl http://localhost:8080/api/v1/version

登录验证

  • Web UI 地址: http://localhost:8080
  • 管理员账户:
    • 用户名: admin
    • 密码: admin123

功能验证命令

# 检查 DAG 列表
airflow dags list# 检查任务列表
airflow tasks list# 检查连接配置
airflow connections list# 检查变量配置
airflow variables list# 检查池配置
airflow pools list# 检查插件
airflow plugins

服务状态监控

# 查看调度器状态
airflow jobs check# 查看任务实例
airflow tasks list <dag_id># 查看 DAG 运行历史
airflow dags list-runs# 查看任务运行历史
airflow tasks list-runs <dag_id> <task_id>

日志查看

# 查看 Web 服务器日志
tail -f ~/airflow/logs/webserver/webserver.log# 查看调度器日志
tail -f ~/airflow/logs/scheduler/latest/*.log# 查看 DAG 日志
airflow tasks logs <dag_id> <task_id> <execution_date># 查看最新日志
find ~/airflow/logs -name "*.log" -type f -exec ls -lt {} + | head -10

性能检查

# 检查数据库连接
airflow db check# 检查配置
airflow config list# 检查版本信息
airflow version# 检查环境信息
airflow info

常见问题排查

# 如果 Web UI 无法访问
curl -v http://localhost:8080# 如果调度器不工作
airflow scheduler --dry-run# 检查端口占用
lsof -i :8080# 检查进程状态
pgrep -f airflow | xargs ps -o pid,ppid,cmd,etime

3. DAG 开发与实践

3.1 第一个实战 DAG

3.1.1 创建 Hello World DAG

创建 DAG 文件

# 创建 DAG 目录(如果不存在)
mkdir -p ~/airflow/dags# 创建第一个 DAG 文件
touch ~/airflow/dags/hello_world_dag.py

编写 DAG 代码

# ~/airflow/dags/hello_world_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator# 定义默认参数
default_args = {'owner': 'airflow','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}# 定义 Python 函数
def print_hello():"""打印 Hello World"""print("Hello World from Python!")return "Hello World"def print_date(**context):"""打印执行日期"""execution_date = context['execution_date']print(f"Execution date: {execution_date}")return f"Executed on {execution_date}"# 创建 DAG
with DAG('hello_world_dag',default_args=default_args,description='A simple Hello World DAG',schedule_interval=timedelta(minutes=5),  # 每5分钟执行一次start_date=datetime(2023, 1, 1),catchup=False,tags=['example', 'hello-world'],
) as dag:# 任务1:使用 BashOperator 打印 Hellotask_hello = BashOperator(task_id='print_hello_bash',bash_command='echo "Hello World from Bash!"',)# 任务2:使用 PythonOperator 打印 Hellotask_hello_python = PythonOperator(task_id='print_hello_python',python_callable=print_hello,)# 任务3:打印执行日期task_date = PythonOperator(task_id='print_date',python_callable=print_date,)# 设置任务依赖关系task_hello >> task_hello_python >> task_date

验证 DAG 创建

# 检查 DAG 是否被正确加载
airflow dags list | grep hello_world_dag# 检查 DAG 语法
airflow dags test hello_world_dag 2023-01-01# 查看 DAG 任务列表
airflow tasks list hello_world_dag# 查看任务依赖关系
airflow tasks list hello_world_dag --tree
3.1.2 手动触发 DAG
# 手动触发 DAG 执行
airflow dags trigger hello_world_dag# 指定执行日期触发
airflow dags trigger hello_world_dag --conf '{"execution_date": "2023-01-01T00:00:00"}'# 查看 DAG 运行状态
airflow dags list-runs --dag-id hello_world_dag
3.1.3 Web UI 验证
  1. 访问 Web UI: http://localhost:8080
  2. 登录: 使用 admin/admin123 账户
  3. 查看 DAG: 在 DAGs 列表中找到 hello_world_dag
  4. 手动触发: 点击 “Trigger DAG” 按钮
  5. 监控执行: 在 Graph View 中查看任务执行状态

3.2 任务依赖关系实践

3.2.1 线性依赖
# ~/airflow/dags/linear_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('linear_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 创建多个任务task_a = BashOperator(task_id='task_a',bash_command='echo "Task A completed"',)task_b = BashOperator(task_id='task_b',bash_command='echo "Task B completed"',)task_c = BashOperator(task_id='task_c',bash_command='echo "Task C completed"',)task_d = BashOperator(task_id='task_d',bash_command='echo "Task D completed"',)# 设置线性依赖:A -> B -> C -> Dtask_a >> task_b >> task_c >> task_d
3.2.2 并行依赖
# ~/airflow/dags/parallel_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('parallel_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 起始任务start_task = BashOperator(task_id='start_task',bash_command='echo "Starting parallel tasks"',)# 并行任务task_1 = BashOperator(task_id='task_1',bash_command='sleep 10 && echo "Task 1 completed"',)task_2 = BashOperator(task_id='task_2',bash_command='sleep 15 && echo "Task 2 completed"',)task_3 = BashOperator(task_id='task_3',bash_command='sleep 20 && echo "Task 3 completed"',)# 结束任务end_task = BashOperator(task_id='end_task',bash_command='echo "All parallel tasks completed"',)# 设置依赖:start -> [task_1, task_2, task_3] -> endstart_task >> [task_1, task_2, task_3] >> end_task
3.2.3 复杂依赖关系
# ~/airflow/dags/complex_dependencies_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperatorwith DAG('complex_dependencies_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,
) as dag:# 起始任务start = EmptyOperator(task_id='start')# 第一组并行任务task_a1 = BashOperator(task_id='task_a1',bash_command='echo "Task A1 completed"',)task_a2 = BashOperator(task_id='task_a2',bash_command='echo "Task A2 completed"',)# 中间任务middle = BashOperator(task_id='middle',bash_command='echo "Middle task completed"',)# 第二组并行任务task_b1 = BashOperator(task_id='task_b1',bash_command='echo "Task B1 completed"',)task_b2 = BashOperator(task_id='task_b2',bash_command='echo "Task B2 completed"',)task_b3 = BashOperator(task_id='task_b3',bash_command='echo "Task B3 completed"',)# 结束任务end = EmptyOperator(task_id='end')# 设置复杂依赖关系start >> [task_a1, task_a2] >> middle >> [task_b1, task_b2, task_b3] >> end

3.3 常用 Operator 实战

3.3.1 BashOperator 详解
# ~/airflow/dags/bash_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatorwith DAG('bash_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 基本命令执行basic_cmd = BashOperator(task_id='basic_cmd',bash_command='echo "Current date: $(date)"',)# 使用模板变量template_cmd = BashOperator(task_id='template_cmd',bash_command='echo "Execution date: {{ ds }}"',)# 执行脚本文件script_cmd = BashOperator(task_id='script_cmd',bash_command='''set -eecho "Creating directory..."mkdir -p /tmp/airflow_testecho "Writing file..."echo "Hello from Airflow" > /tmp/airflow_test/test.txtecho "File created successfully"''',)# 使用环境变量env_cmd = BashOperator(task_id='env_cmd',bash_command='echo "User: $USER, Home: $HOME"',env={'CUSTOM_VAR': 'custom_value'},)# 设置依赖basic_cmd >> template_cmd >> script_cmd >> env_cmd
3.3.2 PythonOperator 详解
# ~/airflow/dags/python_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperatordef simple_function():"""简单函数"""print("Hello from Python function!")return "Success"def function_with_args(name, age):"""带参数的函数"""print(f"Name: {name}, Age: {age}")return f"Processed {name}"def function_with_context(**context):"""使用 Airflow 上下文的函数"""execution_date = context['execution_date']task_instance = context['task_instance']dag = context['dag']print(f"Execution date: {execution_date}")print(f"DAG ID: {dag.dag_id}")print(f"Task ID: {task_instance.task_id}")return "Context processed"def data_processing_function(**context):"""数据处理函数"""# 模拟数据处理data = [1, 2, 3, 4, 5]result = sum(data)print(f"Data processing result: {result}")# 将结果推送到 XComcontext['task_instance'].xcom_push(key='sum_result', value=result)return resultwith DAG('python_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 简单函数调用simple_task = PythonOperator(task_id='simple_task',python_callable=simple_function,)# 带参数的函数调用args_task = PythonOperator(task_id='args_task',python_callable=function_with_args,op_args=['Alice', 25],  # 位置参数)# 使用上下文的函数context_task = PythonOperator(task_id='context_task',python_callable=function_with_context,provide_context=True,)# 数据处理函数data_task = PythonOperator(task_id='data_task',python_callable=data_processing_function,provide_context=True,)# 设置依赖simple_task >> args_task >> context_task >> data_task
3.3.3 EmailOperator 使用
# ~/airflow/dags/email_operator_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperatordef generate_report():"""生成报告内容"""return """<h2>Daily Report</h2><p>This is a daily report generated by Airflow.</p><ul><li>Task 1: Completed</li><li>Task 2: Completed</li><li>Task 3: Completed</li></ul><p>Generated on: {{ ds }}</p>"""with DAG('email_operator_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 生成报告generate_report_task = PythonOperator(task_id='generate_report',python_callable=generate_report,)# 发送邮件send_email_task = EmailOperator(task_id='send_email',to=['recipient@example.com'],subject='Daily Report - {{ ds }}',html_content="{{ task_instance.xcom_pull(task_ids='generate_report') }}",)# 设置依赖generate_report_task >> send_email_task

3.4 变量和连接实战

3.4.1 使用 Variables

设置变量

# 通过命令行设置变量
airflow variables set "data_path" "/opt/data"
airflow variables set "api_url" "https://api.example.com"
airflow variables set "email_recipients" '["user1@example.com", "user2@example.com"]'# 查看变量
airflow variables list
airflow variables get data_path

在 DAG 中使用变量

# ~/airflow/dags/variables_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variabledef use_variables(**context):"""使用 Airflow 变量"""# 获取变量data_path = Variable.get("data_path")api_url = Variable.get("api_url")email_recipients = Variable.get("email_recipients", deserialize_json=True)print(f"Data path: {data_path}")print(f"API URL: {api_url}")print(f"Email recipients: {email_recipients}")return "Variables processed"def use_template_variables(**context):"""使用模板变量"""# 在函数中访问模板变量execution_date = context['ds']dag_id = context['dag'].dag_idprint(f"Execution date: {execution_date}")print(f"DAG ID: {dag_id}")return "Template variables processed"with DAG('variables_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 使用变量的任务variables_task = PythonOperator(task_id='use_variables',python_callable=use_variables,provide_context=True,)# 使用模板变量的任务template_task = PythonOperator(task_id='use_template_variables',python_callable=use_template_variables,provide_context=True,)# 设置依赖variables_task >> template_task
3.4.2 使用 Connections

设置 MySQL 连接

# 通过命令行设置连接
airflow connections add 'mysql_default' \--conn-type 'mysql' \--conn-host 'localhost' \--conn-login 'root' \--conn-password 'password' \--conn-port '3306' \--conn-schema 'test_db'# 查看连接
airflow connections list
airflow connections get mysql_default

在 DAG 中使用连接

# ~/airflow/dags/connections_demo.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.mysql_hook import MySqlHookdef query_mysql_data(**context):"""查询 MySQL 数据"""# 使用 MySQL Hookmysql_hook = MySqlHook(mysql_conn_id='mysql_default')# 执行查询sql = "SELECT COUNT(*) as count FROM users"result = mysql_hook.get_first(sql)print(f"User count: {result[0]}")# 推送到 XComcontext['task_instance'].xcom_push(key='user_count', value=result[0])return result[0]with DAG('connections_demo',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),catchup=False,
) as dag:# 使用 MySqlOperatormysql_task = MySqlOperator(task_id='mysql_query',mysql_conn_id='mysql_default',sql='SELECT * FROM users LIMIT 5',)# 使用 Python 函数查询python_mysql_task = PythonOperator(task_id='python_mysql_query',python_callable=query_mysql_data,provide_context=True,)# 设置依赖mysql_task >> python_mysql_task

3.5 实际项目案例

3.5.1 数据采集 DAG
# ~/airflow/dags/data_collection_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import osdef fetch_api_data(**context):"""从 API 获取数据"""api_url = "https://jsonplaceholder.typicode.com/posts"try:response = requests.get(api_url)response.raise_for_status()data = response.json()# 保存数据到文件output_path = "/tmp/api_data.json"with open(output_path, 'w') as f:json.dump(data, f, indent=2)print(f"Data fetched successfully, saved to {output_path}")print(f"Total records: {len(data)}")# 推送到 XComcontext['task_instance'].xcom_push(key='record_count', value=len(data))return len(data)except Exception as e:print(f"Error fetching data: {e}")raisedef process_data(**context):"""处理数据"""input_path = "/tmp/api_data.json"if not os.path.exists(input_path):raise FileNotFoundError(f"Input file not found: {input_path}")with open(input_path, 'r') as f:data = json.load(f)# 简单的数据处理:提取标题titles = [item['title'] for item in data]# 保存处理结果output_path = "/tmp/processed_data.txt"with open(output_path, 'w') as f:for title in titles:f.write(f"{title}\n")print(f"Data processed, titles saved to {output_path}")return len(titles)def cleanup_files(**context):"""清理临时文件"""files_to_clean = ["/tmp/api_data.json", "/tmp/processed_data.txt"]for file_path in files_to_clean:if os.path.exists(file_path):os.remove(file_path)print(f"Cleaned up: {file_path}")return "Cleanup completed"with DAG('data_collection_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=6),  # 每6小时执行一次catchup=False,tags=['data-collection', 'api'],
) as dag:# 获取数据fetch_task = PythonOperator(task_id='fetch_api_data',python_callable=fetch_api_data,provide_context=True,)# 处理数据process_task = PythonOperator(task_id='process_data',python_callable=process_data,provide_context=True,)# 清理文件cleanup_task = PythonOperator(task_id='cleanup_files',python_callable=cleanup_files,provide_context=True,)# 设置依赖fetch_task >> process_task >> cleanup_task
3.5.2 数据处理管道
# ~/airflow/dags/data_pipeline_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import pandas as pd
import numpy as npdef extract_data(**context):"""提取数据"""# 模拟数据提取data = {'id': range(1, 101),'name': [f'User_{i}' for i in range(1, 101)],'age': np.random.randint(18, 65, 100),'salary': np.random.randint(30000, 100000, 100),'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 100)}df = pd.DataFrame(data)# 保存原始数据df.to_csv('/tmp/raw_data.csv', index=False)print(f"Extracted {len(df)} records")return len(df)def transform_data(**context):"""转换数据"""# 读取原始数据df = pd.read_csv('/tmp/raw_data.csv')# 数据清洗和转换# 1. 处理缺失值df = df.dropna()# 2. 添加计算字段df['salary_category'] = df['salary'].apply(lambda x: 'High' if x > 70000 else 'Medium' if x > 50000 else 'Low')# 3. 按部门统计dept_stats = df.groupby('department').agg({'salary': ['mean', 'count'],'age': 'mean'}).round(2)# 保存转换后的数据df.to_csv('/tmp/transformed_data.csv', index=False)dept_stats.to_csv('/tmp/department_stats.csv')print(f"Transformed {len(df)} records")print("Department statistics:")print(dept_stats)return len(df)def load_data(**context):"""加载数据"""# 模拟数据加载到数据库df = pd.read_csv('/tmp/transformed_data.csv')# 这里可以添加实际的数据库插入逻辑# 例如:使用 SQLAlchemy 或其他数据库连接器print(f"Loaded {len(df)} records to database")# 生成报告report = f"""Data Pipeline Report - {context['ds']}======================================Total records processed: {len(df)}Records by salary category:{df['salary_category'].value_counts().to_dict()}"""with open('/tmp/pipeline_report.txt', 'w') as f:f.write(report)print("Pipeline report generated")return len(df)with DAG('data_pipeline_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(days=1),  # 每天执行一次catchup=False,tags=['data-pipeline', 'etl'],
) as dag:# 提取数据extract_task = PythonOperator(task_id='extract_data',python_callable=extract_data,provide_context=True,)# 转换数据transform_task = PythonOperator(task_id='transform_data',python_callable=transform_data,provide_context=True,)# 加载数据load_task = PythonOperator(task_id='load_data',python_callable=load_data,provide_context=True,)# 设置依赖:ETL 管道extract_task >> transform_task >> load_task

3.6 错误处理和监控

3.6.1 任务重试机制
# ~/airflow/dags/retry_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import randomdef task_with_retry(**context):"""可能失败的任务,演示重试机制"""# 模拟随机失败if random.random() < 0.7:  # 70% 概率失败raise Exception("Random failure for demonstration")print("Task completed successfully!")return "Success"def task_with_custom_retry(**context):"""自定义重试逻辑的任务"""try:# 模拟业务逻辑result = 10 / 0  # 故意制造错误return resultexcept ZeroDivisionError:print("Caught ZeroDivisionError, will retry...")raisewith DAG('retry_demo_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=1),catchup=False,default_args={'retries': 3,  # 最多重试3次'retry_delay': timedelta(minutes=1),  # 重试间隔1分钟'retry_exponential_backoff': True,  # 指数退避'max_retry_delay': timedelta(minutes=10),  # 最大重试间隔},
) as dag:# 基本重试任务retry_task = PythonOperator(task_id='task_with_retry',python_callable=task_with_retry,provide_context=True,)# 自定义重试任务custom_retry_task = PythonOperator(task_id='task_with_custom_retry',python_callable=task_with_custom_retry,provide_context=True,retries=5,  # 覆盖默认重试次数retry_delay=timedelta(seconds=30),  # 覆盖默认重试间隔)# 设置依赖retry_task >> custom_retry_task
3.6.2 监控和告警
# ~/airflow/dags/monitoring_demo_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperatordef critical_task(**context):"""关键任务,失败时需要告警"""# 模拟关键业务逻辑print("Executing critical business logic...")# 模拟偶尔失败import randomif random.random() < 0.3:  # 30% 概率失败raise Exception("Critical task failed!")print("Critical task completed successfully")return "Success"def generate_monitoring_report(**context):"""生成监控报告"""execution_date = context['ds']report = f"""<h2>DAG Monitoring Report</h2><p><strong>Execution Date:</strong> {execution_date}</p><p><strong>DAG ID:</strong> {context['dag'].dag_id}</p><p><strong>Status:</strong> Running</p><h3>Task Summary</h3><ul><li>Critical Task: Completed</li><li>Report Generation: Completed</li></ul><p>This is an automated report generated by Airflow.</p>"""# 推送到 XComcontext['task_instance'].xcom_push(key='monitoring_report', value=report)return "Report generated"with DAG('monitoring_demo_dag',start_date=datetime(2023, 1, 1),schedule_interval=timedelta(hours=2),catchup=False,default_args={'email_on_failure': True,  # 失败时发送邮件'email_on_retry': False,'retries': 2,'retry_delay': timedelta(minutes=5),},
) as dag:# 关键任务critical_task_op = PythonOperator(task_id='critical_task',python_callable=critical_task,provide_context=True,)# 生成监控报告report_task = PythonOperator(task_id='generate_monitoring_report',python_callable=generate_monitoring_report,provide_context=True,)# 发送监控邮件email_task = EmailOperator(task_id='send_monitoring_email',to=['admin@example.com'],subject='DAG Monitoring Report - {{ ds }}',html_content="{{ task_instance.xcom_pull(task_ids='generate_monitoring_report', key='monitoring_report') }}",)# 设置依赖critical_task_op >> report_task >> email_task

3.7 最佳实践总结

3.7.1 DAG 开发最佳实践

1. 文件组织

  • 每个 DAG 文件只包含一个 DAG
  • 使用描述性的文件名和 DAG ID
  • 按业务模块组织 DAG 文件

2. 代码质量

  • 添加适当的注释和文档
  • 使用有意义的任务 ID
  • 避免在 DAG 文件中执行耗时操作

3. 错误处理

  • 设置合理的重试策略
  • 使用 try-catch 处理异常
  • 配置适当的告警机制

4. 性能优化

  • 避免不必要的任务依赖
  • 合理设置调度间隔
  • 使用适当的资源限制
3.7.2 测试和验证
# 测试 DAG 语法
airflow dags test <dag_id> <execution_date># 验证任务依赖
airflow tasks list <dag_id> --tree# 检查 DAG 导入错误
airflow dags report# 手动触发测试
airflow dags trigger <dag_id>
3.7.3 监控和维护

1. 定期检查

  • 监控 DAG 执行状态
  • 检查任务失败率
  • 查看执行时间趋势

2. 日志管理

  • 定期清理日志文件
  • 配置日志轮转
  • 监控日志大小

3. 性能调优

  • 优化任务依赖关系
  • 调整调度参数
  • 监控资源使用情况

通过以上实战练习,您应该能够:

  • 创建和配置各种类型的 DAG
  • 理解和使用不同的 Operator
  • 管理任务依赖关系
  • 使用变量和连接
  • 处理错误和监控任务执行
  • 开发实际的数据处理管道
http://www.dtcms.com/a/320254.html

相关文章:

  • 前端性能优化:从请求到资源的精细调控
  • 【第9话:感知算法基础1】深度学习神经网络模型基础知识概念入门简介
  • 批量获取亚马逊商品SKU商品规格调用流程
  • 【实时Linux实战系列】基于实时Linux的高频交易系统构建
  • Python 常用内置高阶函数
  • RabbitMQ面试精讲 Day 15:RabbitMQ故障转移与数据恢复
  • C++ min循环超超超详细指南
  • WFP DNS 域名解析
  • 深入理解C++模板进阶:非类型参数、特化与分离编译
  • Linux节点创建API与路径对应关系
  • AI日报0807 | GPT-5或今晚1点来袭:四大版本全曝光
  • 什么是 TDengine IDMP?
  • Disruptor 消费者核心:BatchEventProcessor解析
  • 告别复杂配置!cpolar让Prometheus监控突破网络限制
  • 【42】【OpenCV C++】 计算图像某一列像素方差 或 某一行像素的方差;
  • 嵌入式开发硬件——单片机
  • 【列出指定时间段内所有的下单产品】
  • 数据结构(循环顺序队列)
  • RAGAS:检索增强生成系统的无参考评估框架与技术解析
  • 2025年华数杯C题超详细解题思路
  • 哈希表原理与实现全解析
  • 天道20金句
  • Moses工具的配置和小语种平行语料训练SMT完整实现
  • 大模型 Transformer模型(上)
  • Java集合的遍历方式(全解析)
  • 力扣经典算法篇-46-阶乘后的零(正向步长遍历,逆向步长遍历)
  • BGP笔记整理
  • Maven高级:继承与聚合实战指南
  • RS485转Profibus网关在QDNA钠离子分析仪与300PLC通信中的应用解析
  • 【OCCT+ImGUI系列】013-碰撞检测-包围盒Bnd_Box