谷歌云+Apache Airflow,数据处理自动化的强力武器
文章目录
- Airflow 是个啥?
- 在谷歌云上搭建 Airflow
- 创建第一个数据处理 DAG
- Airflow 中的常用操作符
- 1\. GCSToGCSOperator - 复制或移动 GCS 文件
- 2\. GCSFileTransformOperator - 在 GCS 文件上运行转换
- 3\. DataflowTemplateOperator - 启动 Dataflow 作业
- 4\. BigQueryCheckOperator - 检查 BigQuery 查询结果
- 动态生成 DAG
- 监控和调试 DAG
- 通过 Web UI 监控
- 日志查看
- 设置告警
- 生产环境最佳实践
- 1\. 避免把敏感信息写在 DAG 代码里
- 2\. 合理设置重试策略
- 3\. 使用 Pools 限制并发
- 4\. 使用 SubDAGs 组织复杂流程
- 高级技巧:自定义组件
- 自定义 Operator
- 自定义 Sensor
- XComs:任务间通信的桥梁
- 定时任务的那些坑
- 关于 start_date
- 关于 catchup
- 故障排除:常见问题
- 1\. DAG 没有按时触发
- 2\. 任务执行很慢
- 3\. 内存溢出错误
- 4\. XCom 值丢失
- 写在最后
Apache Airflow 这工具是真的香,我在谷歌云平台上用了好几年了。可以肯定地说,它彻底改变了我处理数据流程的方式。以前手动跑的那些 ETL 任务,现在全都交给 Airflow 自动执行了,省心不少。那我今儿就跟大家唠唠,怎么在谷歌云上搭建 Airflow,然后让你的数据处理任务全自动化起来。
Airflow 是个啥?
简单来讲,Apache Airflow 是个工作流管理平台,让你可以编程式地创建、调度和监控复杂的工作流。
它的核心理念就是把工作流定义为 有向无环图 (DAG),图里每个节点是一个任务,边则表示任务之间的依赖关系。举个栗子,你可以设定任务 B 只有在任务 A 成功完成后才会执行,这样就构成了 A→B 的依赖关系。
# 一个简单的DAG定义示例from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimedef hello_world():print("Hello, Airflow!")dag = DAG('hello_world_dag',start_date=datetime(2023, 1, 1),schedule_interval='@daily')task = PythonOperator(task_id='hello_task',python_callable=hello_world,dag=dag)
这段代码定义了个每天执行一次的 DAG,里面就一个打印 “Hello, Airflow!” 的简单任务。
温馨提示: DAG 不是狗,是 Directed Acyclic Graph (有向无环图) 的缩写!记住这点,不然面试容易露馅…
在谷歌云上搭建 Airflow
谷歌云提供了 Cloud Composer 服务,这玩意儿其实就是托管版的 Airflow。用它的好处是不用操心基础设施的维护,直接上手写工作流就行。
搭建步骤大致如下:
1. 登录谷歌云控制台
2. 找到并启用 Cloud Composer API
3. 创建一个新的 Composer 环境
gcloud composer environments create my-airflow-env \--location us-central1 \--zone us-central1-a \--machine-type n1-standard-2 \--python-version 3 \--airflow-version 2.3.3
这个命令会创建一个名为 “my-airflow-env” 的 Composer 环境,用的是 Airflow 2.3.3 版本。整个过程大概要 20-30 分钟,别急,去泡杯咖啡歇会儿。
温馨提示: Composer 环境创建完后,谷歌会自动生成一个网址,通过这个网址可以访问 Airflow 的 Web UI。记得把这个链接保存下来,后面要经常用到。
云服务嘛,当然不是白给的。Composer 环境会根据你选的配置产生费用,包括计算资源、存储和网络流量等。要是预算紧张,可以先用小规格的机器,比如 n1-standard-1
,后面需要了再升级。
创建第一个数据处理 DAG
好了,环境搭好了,接下来就要写我们的第一个正经 DAG 了。假设我们要处理谷歌云存储(GCS)上的一些 CSV 文件,然后把处理后的数据存入 BigQuery。
from airflow import DAGfrom airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperatorfrom airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': '我是数据工程师','depends_on_past': False,'email_on_failure': True,'email': 'your-email@example.com','retries': 3,'retry_delay': timedelta(minutes=5)}with DAG('gcs_to_bigquery_pipeline',default_args=default_args,description='从GCS加载数据到BigQuery并转换',schedule_interval='0 2 * * *', # 每天凌晨2点执行start_date=datetime(2023, 1, 1),catchup=False) as dag:# 任务1: 从GCS导入数据到BigQueryload_data = GCSToBigQueryOperator(task_id='load_csv_to_bq',bucket='my-data-bucket',source_objects=['data/sales_*.csv'],destination_project_dataset_table='my_project.sales_data.raw_sales',schema_fields=[{'name': 'id', 'type': 'INTEGER'},{'name': 'date', 'type': 'DATE'},{'name': 'amount', 'type': 'FLOAT'},{'name': 'customer_id', 'type': 'STRING'}],write_disposition='WRITE_TRUNCATE',skip_leading_rows=1,allow_quoted_newlines=True)# 任务2: 在BigQuery中转换数据transform_data = BigQueryExecuteQueryOperator(task_id='transform_data',sql="""SELECT date,SUM(amount) as daily_sales,COUNT(DISTINCT customer_id) as customer_countFROM `my_project.sales_data.raw_sales`GROUP BY dateORDER BY date DESC""",destination_dataset_table='my_project.sales_data.daily_summary',write_disposition='WRITE_TRUNCATE',use_legacy_sql=False)# 设置任务依赖关系load_data >> transform_data
这个 DAG 做了啥呢?
1. 第一个任务用 `GCSToBigQueryOperator` 把 GCS 里的 CSV 文件加载到 BigQuery 表中
2. 第二个任务用 `BigQueryExecuteQueryOperator` 执行 SQL 查询,计算日销售总额和客户数,结果写入另一个表
3. 最后一行 `load_data >> transform_data` 表示第二个任务依赖于第一个任务
看到那个 >>
操作符没?这就是 Airflow 里设置任务依赖关系的方式,超直观的!你还可以用 <<
,意思一样,就是写法不同:transform_data << load_data
。
温馨提示: 在 Airflow 2.0 以上版本,推荐使用 with DAG(...) as dag:
的上下文管理器写法,这样不用给每个任务都指定 dag 参数,代码看起来更整洁。
Airflow 中的常用操作符
Airflow 提供了一大堆操作符(Operators),针对不同的任务类型。在谷歌云环境中,最常用的有这些:
1. GCSToGCSOperator - 复制或移动 GCS 文件
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperatorcopy_files = GCSToGCSOperator(task_id='copy_files',source_bucket='source-bucket',source_objects=['path/to/file.csv'],destination_bucket='destination-bucket',destination_object='path/to/destination/file.csv')
2. GCSFileTransformOperator - 在 GCS 文件上运行转换
from airflow.providers.google.cloud.operators.gcs import GCSFileTransformOperatortransform_file = GCSFileTransformOperator(task_id='transform_file',source_bucket='my-bucket',source_object='raw/data.json',destination_bucket='my-bucket',destination_object='processed/data.json',transform_script=['python', '/path/to/transformation_script.py'])
3. DataflowTemplateOperator - 启动 Dataflow 作业
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplateOperatorrun_dataflow = DataflowTemplateOperator(task_id='dataflow_job',template='gs://dataflow-templates/latest/Word_Count',parameters={'inputFile': 'gs://my-bucket/input/my_file.txt','output': 'gs://my-bucket/output/word_count'},location='us-central1',project_id='my-project-id')
4. BigQueryCheckOperator - 检查 BigQuery 查询结果
from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperatorcheck_data_quality = BigQueryCheckOperator(task_id='check_data_quality',sql="""SELECT COUNT(*)FROM `my_project.sales_data.daily_summary`WHERE daily_sales < 0""",use_legacy_sql=False)
这个操作符会执行 SQL 查询,如果结果不为零或者 False,则任务失败。超适合做数据质量检查!
温馨提示: 别小看这些操作符,它们虽然简单,但组合起来威力很大。我见过有人用 100 多个操作符组成的 DAG 处理 TB 级数据,跑起来稳得一批!
动态生成 DAG
有时候我们需要根据不同的配置生成多个类似的 DAG,比如对不同的数据源执行相同的处理逻辑。手动复制粘贴?太 low 了!看看下面这个例子:
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimeimport yamlimport os# 读取配置文件config_path = os.path.join(os.path.dirname(__file__), 'configs')for config_file in os.listdir(config_path):if not config_file.endswith('.yaml'):continuewith open(os.path.join(config_path, config_file)) as f:config = yaml.safe_load(f)# 为每个配置创建一个DAGdag_id = f"process_{config['source_name']}"globals()[dag_id] = DAG(dag_id,schedule_interval=config['schedule'],start_date=datetime.fromisoformat(config['start_date']),catchup=False)def process_data(config, **kwargs):print(f"处理来自 {config['source_name']} 的数据")# 实际处理逻辑# 创建处理任务task = PythonOperator(task_id='process_data',python_callable=process_data,op_kwargs={'config': config},dag=globals()[dag_id])
这段代码会读取 configs 目录下的所有 YAML 配置文件,为每个配置创建一个单独的 DAG。
配置文件例子 (configs/sales.yaml):
source_name: salesschedule: "0 4 * * *"start_date: "2023-01-01T00:00:00"
温馨提示: 当你有很多类似的 DAG 时,这种动态生成的方法能大大减少代码重复,提高可维护性。但别过度使用,如果生成了太多 DAG (比如上千个),可能会给 Airflow 调度器带来性能压力。
监控和调试 DAG
DAG 写完了,跑起来了,但总会遇到各种问题。别慌,Airflow 提供了全面的监控和调试工具。
通过 Web UI 监控
Cloud Composer 环境创建好后,就能访问 Airflow 的 Web UI。里面可以看到:
1. DAG 列表和它们的运行状态
2. 任务执行历史和日志
3. 可视化的 DAG 图表
界面虽然不像那些商业软件那么花哨,但功能是真的全。特别是那个 Graph View,能直观地展示任务依赖关系,对排查问题很有帮助。
日志查看
执行失败的任务,点进去就能看到详细日志。不过有时候日志不够详细,这时候可以修改你的代码,加点日志输出:
def my_task(**context):logging.info("开始执行任务...")try:# 任务逻辑logging.info(f"处理的数据量: {len(data)}")except Exception as e:logging.error(f"出错了!错误信息: {str(e)}")raiselogging.info("任务执行完成")
多打些日志,出了问题容易定位多了。
设置告警
生产环境中,你肯定不想一直盯着 UI 看。设置告警,让系统自动通知你:
default_args = {# 其他参数...'email': ['your-email@example.com'],'email_on_failure': True,'email_on_retry': False,'retries': 3,'retry_delay': timedelta(minutes=5)}
除了邮件通知,还可以通过 Cloud Monitoring 设置更复杂的告警,比如连续失败超过 3 次发送短信啥的。
温馨提示: 千万别把告警阈值设太低,不然手机会被告警信息炸到没电!我之前就犯过这错,结果大半夜被短信轰炸醒好几次…
生产环境最佳实践
在生产环境使用 Airflow,有些坑是必须注意的:
1. 避免把敏感信息写在 DAG 代码里
永远不要把密码、API 密钥之类的敏感信息硬编码在 DAG 里。正确的做法是用 Airflow 的 Connections 和 Variables:
from airflow.models import Variablefrom airflow.hooks.base import BaseHook# 使用Variable存储的值config = Variable.get("my_config", deserialize_json=True)# 使用Connection存储的连接信息conn = BaseHook.get_connection("my_gcp_connection")
在 Composer 环境中,这些值会被加密存储。
2. 合理设置重试策略
网络抽风、依赖服务偶尔挂掉这种情况很常见,所以设置合理的重试很重要:
default_args = {# 其他参数...'retries': 3, # 最多重试3次'retry_delay': timedelta(minutes=5), # 每次重试间隔5分钟'retry_exponential_backoff': True, # 使用指数退避策略'max_retry_delay': timedelta(hours=1) # 最大重试间隔1小时}
指数退避策略(exponential backoff)意思是每次重试的间隔时间会越来越长,避免对外部服务造成过大压力。
3. 使用 Pools 限制并发
默认情况下,Airflow 会尽可能并行执行不互相依赖的任务。但有些外部系统可能扛不住太多并发请求,这时候就需要用 Pools 来限制:
task = BigQueryExecuteQueryOperator(# 其他参数...pool='bigquery_pool', # 使用名为bigquery_pool的资源池pool_slots=2 # 占用2个槽位)
在 Web UI 中创建名为 bigquery_pool
的资源池,设置槽位总数,比如 10。这样即使有 100 个查询任务,同时执行的也不会超过 10 个(或者 5 个,如果每个任务占 2 个槽位)。
4. 使用 SubDAGs 组织复杂流程
当 DAG 变得特别复杂时,可以用 SubDAGs 把相关任务组织在一起:
from airflow.operators.subdag import SubDagOperatordef create_processing_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):with DAG(f"{parent_dag_id}.{child_dag_id}",start_date=start_date,schedule_interval=schedule_interval) as dag:# 子DAG的任务定义# ...return dagprocessing_tasks = SubDagOperator(task_id='processing_tasks',subdag=create_processing_subdag('main_dag', 'processing_tasks', start_date, schedule_interval),dag=main_dag)
不过,SubDAG 用起来有点复杂,如果不是很必要,可以考虑用 TaskGroups 代替,效果类似但更轻量:
from airflow.utils.task_group import TaskGroupwith TaskGroup(group_id='processing_tasks') as processing_group:# 定义一组相关任务task1 = PythonOperator(...)task2 = PythonOperator(...)task1 >> task2
温馨提示: SubDAGs 在内部实际上是作为单独的 DAG 运行的,可能导致调度问题。大多数情况下,TaskGroups 是更好的选择,它只是在 UI 上对任务进行分组,不影响实际执行逻辑。
高级技巧:自定义组件
用了一段时间,你可能会发现标准的操作符满足不了你的特殊需求。没关系,Airflow 允许你自定义各种组件。
自定义 Operator
from airflow.models.baseoperator import BaseOperatorfrom airflow.utils.decorators import apply_defaultsclass MyCustomOperator(BaseOperator):@apply_defaultsdef __init__(self, my_parameter, *args, **kwargs):super().__init__(*args, **kwargs)self.my_parameter = my_parameterdef execute(self, context):# 实现你的操作逻辑self.log.info(f"使用参数 {self.my_parameter} 执行自定义操作")# 可以返回值,会被推送到XComreturn "操作结果"
自定义 Sensor
Sensor 是一种特殊的操作符,会周期性地检查条件,直到条件满足才继续执行:
from airflow.sensors.base import BaseSensorOperatorclass MyCustomSensor(BaseSensorOperator):def __init__(self, my_parameter, *args, **kwargs):super().__init__(*args, **kwargs)self.my_parameter = my_parameterdef poke(self, context):# 返回True表示条件满足,返回False表示需要继续等待# 这个方法会被周期性调用self.log.info(f"检查条件是否满足,参数: {self.my_parameter}")# 实现你的检查逻辑return condition_is_met # 返回布尔值
这些自定义组件可以打包成插件,放在 Composer 环境的 plugins 目录下,就能在所有 DAG 中使用了。
XComs:任务间通信的桥梁
有时候,一个任务的输出需要传递给另一个任务使用。Airflow 提供了 XComs(Cross-Communication)机制来实现这一点:
def task1(**context):# 向XCom推送值value = "这是任务1生成的值"context['task_instance'].xcom_push(key='my_value', value=value)def task2(**context):# 从XCom拉取值value = context['task_instance'].xcom_pull(task_ids='task1', key='my_value')print(f"从任务1获取的值: {value}")task1 = PythonOperator(task_id='task1', python_callable=task1, dag=dag)task2 = PythonOperator(task_id='task2', python_callable=task2, dag=dag)task1 >> task2
默认情况下,Operator 的返回值也会自动推送到 XCom:
def task1():return {"status": "success", "count": 42}# 在task2中可以通过xcom_pull(task_ids='task1')获取到这个字典
温馨提示: XCom 是存储在元数据数据库中的,不适合传递大量数据。如果需要在任务间传递大文件,应该使用外部存储如 GCS。
定时任务的那些坑
Airflow 的调度器很强大,但也有些容易掉进去的坑:
关于 start_date
start_date
参数指定的是 DAG 的首次执行时间,但这并不意味着 DAG 在那个时间点立即执行。调度器会在 start_date + schedule_interval
的时间点触发首次执行。
举个例子,如果 start_date
是 2023-01-01 01:00:00,schedule_interval
是 ‘@daily’,那么第一次执行会在 2023-01-02 01:00:00 触发,处理的是 2023-01-01 的数据。
这个逻辑乍一看有点反直觉,但想想也合理:定时任务通常是处理过去一段时间内的数据,而不是未来的。
关于 catchup
默认情况下,如果 Airflow 发现从 start_date
到当前有遗漏的调度周期,它会尝试"追赶"这些错过的执行。这可能导致一大堆历史任务突然被触发。
通过设置 catchup=False
,可以禁用这种行为:
dag = DAG('my_dag',start_date=datetime(2023, 1, 1),schedule_interval='@daily',catchup=False # 不追赶错过的执行)
温馨提示: 在生产环境中,除非确实需要处理历史数据,否则推荐将 catchup
设置为 False
,避免意外触发大量历史任务造成资源紧张。
故障排除:常见问题
使用 Airflow 时,特别是刚开始,可能会遇到各种奇怪的问题。这儿列几个我踩过的坑:
1. DAG 没有按时触发
检查一下是不是调度器出问题了。在 Composer 环境中,可以查看调度器的日志,通常能找到线索。
另外,确认你的 DAG 文件有没有语法错误。即使一个小错误,整个文件都会被忽略处理。
2. 任务执行很慢
首先检查资源是否足够。Composer 环境的 worker 节点如果太小,处理大量任务会很吃力。
其次,看看是不是任务本身有优化空间,比如 BigQuery 查询是否需要处理大量数据,能否通过分区表或聚集表优化。
3. 内存溢出错误
Airflow 默认的 worker 内存限制比较小,如果处理大量数据可能会溢出。解决方法有:
- 增加 worker 的内存配置
- 优化任务代码,减少内存使用
- 使用 KubernetesPodOperator 在单独的 Pod 中执行内存密集型任务
4. XCom 值丢失
XCom 只能存储相对较小的值(默认 48KB)。如果发现 XCom 值丢失,很可能是数据太大了。
改用外部存储如 GCS 来传递大数据集,XCom 只存储引用。
实际开发中遇到问题别急着怀疑 Airflow 本身有 bug,大多数情况下是配置或代码逻辑的问题。Airflow 是生产级软件,功能稳定性还是有保证的。
写在最后
Apache Airflow 搭配谷歌云,就是数据工程师的得力助手。从简单的 ETL 到复杂的机器学习流水线,它都能轻松应对。
学会了 Airflow,我一个人能干五个人的活儿(而且不用加班)。数据处理任务一旦实现自动化,就能腾出时间做更有价值的事情,比如设计更高级的数据分析系统,或者…喝咖啡摸鱼(咳咳,开个玩笑)。
这篇文章只是冰山一角,Airflow 的功能远不止于此。建议实践中多尝试,多看文档,慢慢你就会发现它的更多强大功能。那些看起来复杂的概念,用着用着就熟悉了。
代码写得好,周末早点跑;流程自动化,老板夸你棒!