当前位置: 首页 > news >正文

谷歌云+Apache Airflow,数据处理自动化的强力武器

在这里插入图片描述

文章目录

    • Airflow 是个啥?
    • 在谷歌云上搭建 Airflow
    • 创建第一个数据处理 DAG
    • Airflow 中的常用操作符
      • 1\. GCSToGCSOperator - 复制或移动 GCS 文件
      • 2\. GCSFileTransformOperator - 在 GCS 文件上运行转换
      • 3\. DataflowTemplateOperator - 启动 Dataflow 作业
      • 4\. BigQueryCheckOperator - 检查 BigQuery 查询结果
    • 动态生成 DAG
    • 监控和调试 DAG
      • 通过 Web UI 监控
      • 日志查看
      • 设置告警
    • 生产环境最佳实践
      • 1\. 避免把敏感信息写在 DAG 代码里
      • 2\. 合理设置重试策略
      • 3\. 使用 Pools 限制并发
      • 4\. 使用 SubDAGs 组织复杂流程
    • 高级技巧:自定义组件
      • 自定义 Operator
      • 自定义 Sensor
    • XComs:任务间通信的桥梁
    • 定时任务的那些坑
      • 关于 start_date
      • 关于 catchup
    • 故障排除:常见问题
      • 1\. DAG 没有按时触发
      • 2\. 任务执行很慢
      • 3\. 内存溢出错误
      • 4\. XCom 值丢失
    • 写在最后

Apache Airflow 这工具是真的香,我在谷歌云平台上用了好几年了。可以肯定地说,它彻底改变了我处理数据流程的方式。以前手动跑的那些 ETL 任务,现在全都交给 Airflow 自动执行了,省心不少。那我今儿就跟大家唠唠,怎么在谷歌云上搭建 Airflow,然后让你的数据处理任务全自动化起来。

Airflow 是个啥?

简单来讲,Apache Airflow 是个工作流管理平台,让你可以编程式地创建、调度和监控复杂的工作流。

它的核心理念就是把工作流定义为 有向无环图 (DAG),图里每个节点是一个任务,边则表示任务之间的依赖关系。举个栗子,你可以设定任务 B 只有在任务 A 成功完成后才会执行,这样就构成了 A→B 的依赖关系。

    # 一个简单的DAG定义示例from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimedef hello_world():print("Hello, Airflow!")dag = DAG('hello_world_dag',start_date=datetime(2023, 1, 1),schedule_interval='@daily')task = PythonOperator(task_id='hello_task',python_callable=hello_world,dag=dag)

这段代码定义了个每天执行一次的 DAG,里面就一个打印 “Hello, Airflow!” 的简单任务。

温馨提示: DAG 不是狗,是 Directed Acyclic Graph (有向无环图) 的缩写!记住这点,不然面试容易露馅…

在谷歌云上搭建 Airflow

谷歌云提供了 Cloud Composer 服务,这玩意儿其实就是托管版的 Airflow。用它的好处是不用操心基础设施的维护,直接上手写工作流就行。

搭建步骤大致如下:

1. 登录谷歌云控制台
2. 找到并启用 Cloud Composer API
3. 创建一个新的 Composer 环境
    gcloud composer environments create my-airflow-env \--location us-central1 \--zone us-central1-a \--machine-type n1-standard-2 \--python-version 3 \--airflow-version 2.3.3

这个命令会创建一个名为 “my-airflow-env” 的 Composer 环境,用的是 Airflow 2.3.3 版本。整个过程大概要 20-30 分钟,别急,去泡杯咖啡歇会儿。

温馨提示: Composer 环境创建完后,谷歌会自动生成一个网址,通过这个网址可以访问 Airflow 的 Web UI。记得把这个链接保存下来,后面要经常用到。

云服务嘛,当然不是白给的。Composer 环境会根据你选的配置产生费用,包括计算资源、存储和网络流量等。要是预算紧张,可以先用小规格的机器,比如 n1-standard-1,后面需要了再升级。

创建第一个数据处理 DAG

好了,环境搭好了,接下来就要写我们的第一个正经 DAG 了。假设我们要处理谷歌云存储(GCS)上的一些 CSV 文件,然后把处理后的数据存入 BigQuery。

    from airflow import DAGfrom airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperatorfrom airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': '我是数据工程师','depends_on_past': False,'email_on_failure': True,'email': 'your-email@example.com','retries': 3,'retry_delay': timedelta(minutes=5)}with DAG('gcs_to_bigquery_pipeline',default_args=default_args,description='从GCS加载数据到BigQuery并转换',schedule_interval='0 2 * * *',  # 每天凌晨2点执行start_date=datetime(2023, 1, 1),catchup=False) as dag:# 任务1: 从GCS导入数据到BigQueryload_data = GCSToBigQueryOperator(task_id='load_csv_to_bq',bucket='my-data-bucket',source_objects=['data/sales_*.csv'],destination_project_dataset_table='my_project.sales_data.raw_sales',schema_fields=[{'name': 'id', 'type': 'INTEGER'},{'name': 'date', 'type': 'DATE'},{'name': 'amount', 'type': 'FLOAT'},{'name': 'customer_id', 'type': 'STRING'}],write_disposition='WRITE_TRUNCATE',skip_leading_rows=1,allow_quoted_newlines=True)# 任务2: 在BigQuery中转换数据transform_data = BigQueryExecuteQueryOperator(task_id='transform_data',sql="""SELECT date,SUM(amount) as daily_sales,COUNT(DISTINCT customer_id) as customer_countFROM `my_project.sales_data.raw_sales`GROUP BY dateORDER BY date DESC""",destination_dataset_table='my_project.sales_data.daily_summary',write_disposition='WRITE_TRUNCATE',use_legacy_sql=False)# 设置任务依赖关系load_data >> transform_data

这个 DAG 做了啥呢?

1. 第一个任务用 `GCSToBigQueryOperator` 把 GCS 里的 CSV 文件加载到 BigQuery 表中
2. 第二个任务用 `BigQueryExecuteQueryOperator` 执行 SQL 查询,计算日销售总额和客户数,结果写入另一个表
3. 最后一行 `load_data >> transform_data` 表示第二个任务依赖于第一个任务

看到那个 >> 操作符没?这就是 Airflow 里设置任务依赖关系的方式,超直观的!你还可以用 <<,意思一样,就是写法不同:transform_data << load_data

温馨提示: 在 Airflow 2.0 以上版本,推荐使用 with DAG(...) as dag: 的上下文管理器写法,这样不用给每个任务都指定 dag 参数,代码看起来更整洁。

Airflow 中的常用操作符

Airflow 提供了一大堆操作符(Operators),针对不同的任务类型。在谷歌云环境中,最常用的有这些:

1. GCSToGCSOperator - 复制或移动 GCS 文件

    from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperatorcopy_files = GCSToGCSOperator(task_id='copy_files',source_bucket='source-bucket',source_objects=['path/to/file.csv'],destination_bucket='destination-bucket',destination_object='path/to/destination/file.csv')

2. GCSFileTransformOperator - 在 GCS 文件上运行转换

    from airflow.providers.google.cloud.operators.gcs import GCSFileTransformOperatortransform_file = GCSFileTransformOperator(task_id='transform_file',source_bucket='my-bucket',source_object='raw/data.json',destination_bucket='my-bucket',destination_object='processed/data.json',transform_script=['python', '/path/to/transformation_script.py'])

3. DataflowTemplateOperator - 启动 Dataflow 作业

    from airflow.providers.google.cloud.operators.dataflow import DataflowTemplateOperatorrun_dataflow = DataflowTemplateOperator(task_id='dataflow_job',template='gs://dataflow-templates/latest/Word_Count',parameters={'inputFile': 'gs://my-bucket/input/my_file.txt','output': 'gs://my-bucket/output/word_count'},location='us-central1',project_id='my-project-id')

4. BigQueryCheckOperator - 检查 BigQuery 查询结果

    from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperatorcheck_data_quality = BigQueryCheckOperator(task_id='check_data_quality',sql="""SELECT COUNT(*)FROM `my_project.sales_data.daily_summary`WHERE daily_sales < 0""",use_legacy_sql=False)

这个操作符会执行 SQL 查询,如果结果不为零或者 False,则任务失败。超适合做数据质量检查!

温馨提示: 别小看这些操作符,它们虽然简单,但组合起来威力很大。我见过有人用 100 多个操作符组成的 DAG 处理 TB 级数据,跑起来稳得一批!

动态生成 DAG

有时候我们需要根据不同的配置生成多个类似的 DAG,比如对不同的数据源执行相同的处理逻辑。手动复制粘贴?太 low 了!看看下面这个例子:

    from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimeimport yamlimport os# 读取配置文件config_path = os.path.join(os.path.dirname(__file__), 'configs')for config_file in os.listdir(config_path):if not config_file.endswith('.yaml'):continuewith open(os.path.join(config_path, config_file)) as f:config = yaml.safe_load(f)# 为每个配置创建一个DAGdag_id = f"process_{config['source_name']}"globals()[dag_id] = DAG(dag_id,schedule_interval=config['schedule'],start_date=datetime.fromisoformat(config['start_date']),catchup=False)def process_data(config, **kwargs):print(f"处理来自 {config['source_name']} 的数据")# 实际处理逻辑# 创建处理任务task = PythonOperator(task_id='process_data',python_callable=process_data,op_kwargs={'config': config},dag=globals()[dag_id])

这段代码会读取 configs 目录下的所有 YAML 配置文件,为每个配置创建一个单独的 DAG。

配置文件例子 (configs/sales.yaml):

    source_name: salesschedule: "0 4 * * *"start_date: "2023-01-01T00:00:00"

温馨提示: 当你有很多类似的 DAG 时,这种动态生成的方法能大大减少代码重复,提高可维护性。但别过度使用,如果生成了太多 DAG (比如上千个),可能会给 Airflow 调度器带来性能压力。

监控和调试 DAG

DAG 写完了,跑起来了,但总会遇到各种问题。别慌,Airflow 提供了全面的监控和调试工具。

通过 Web UI 监控

Cloud Composer 环境创建好后,就能访问 Airflow 的 Web UI。里面可以看到:

1. DAG 列表和它们的运行状态
2. 任务执行历史和日志
3. 可视化的 DAG 图表

界面虽然不像那些商业软件那么花哨,但功能是真的全。特别是那个 Graph View,能直观地展示任务依赖关系,对排查问题很有帮助。

日志查看

执行失败的任务,点进去就能看到详细日志。不过有时候日志不够详细,这时候可以修改你的代码,加点日志输出:

    def my_task(**context):logging.info("开始执行任务...")try:# 任务逻辑logging.info(f"处理的数据量: {len(data)}")except Exception as e:logging.error(f"出错了!错误信息: {str(e)}")raiselogging.info("任务执行完成")

多打些日志,出了问题容易定位多了。

设置告警

生产环境中,你肯定不想一直盯着 UI 看。设置告警,让系统自动通知你:

    default_args = {# 其他参数...'email': ['your-email@example.com'],'email_on_failure': True,'email_on_retry': False,'retries': 3,'retry_delay': timedelta(minutes=5)}

除了邮件通知,还可以通过 Cloud Monitoring 设置更复杂的告警,比如连续失败超过 3 次发送短信啥的。

温馨提示: 千万别把告警阈值设太低,不然手机会被告警信息炸到没电!我之前就犯过这错,结果大半夜被短信轰炸醒好几次…

生产环境最佳实践

在生产环境使用 Airflow,有些坑是必须注意的:

1. 避免把敏感信息写在 DAG 代码里

永远不要把密码、API 密钥之类的敏感信息硬编码在 DAG 里。正确的做法是用 Airflow 的 Connections 和 Variables:

    from airflow.models import Variablefrom airflow.hooks.base import BaseHook# 使用Variable存储的值config = Variable.get("my_config", deserialize_json=True)# 使用Connection存储的连接信息conn = BaseHook.get_connection("my_gcp_connection")

在 Composer 环境中,这些值会被加密存储。

2. 合理设置重试策略

网络抽风、依赖服务偶尔挂掉这种情况很常见,所以设置合理的重试很重要:

    default_args = {# 其他参数...'retries': 3,  # 最多重试3次'retry_delay': timedelta(minutes=5),  # 每次重试间隔5分钟'retry_exponential_backoff': True,  # 使用指数退避策略'max_retry_delay': timedelta(hours=1)  # 最大重试间隔1小时}

指数退避策略(exponential backoff)意思是每次重试的间隔时间会越来越长,避免对外部服务造成过大压力。

3. 使用 Pools 限制并发

默认情况下,Airflow 会尽可能并行执行不互相依赖的任务。但有些外部系统可能扛不住太多并发请求,这时候就需要用 Pools 来限制:

    task = BigQueryExecuteQueryOperator(# 其他参数...pool='bigquery_pool',  # 使用名为bigquery_pool的资源池pool_slots=2  # 占用2个槽位)

在 Web UI 中创建名为 bigquery_pool 的资源池,设置槽位总数,比如 10。这样即使有 100 个查询任务,同时执行的也不会超过 10 个(或者 5 个,如果每个任务占 2 个槽位)。

4. 使用 SubDAGs 组织复杂流程

当 DAG 变得特别复杂时,可以用 SubDAGs 把相关任务组织在一起:

    from airflow.operators.subdag import SubDagOperatordef create_processing_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):with DAG(f"{parent_dag_id}.{child_dag_id}",start_date=start_date,schedule_interval=schedule_interval) as dag:# 子DAG的任务定义# ...return dagprocessing_tasks = SubDagOperator(task_id='processing_tasks',subdag=create_processing_subdag('main_dag', 'processing_tasks', start_date, schedule_interval),dag=main_dag)

不过,SubDAG 用起来有点复杂,如果不是很必要,可以考虑用 TaskGroups 代替,效果类似但更轻量:

    from airflow.utils.task_group import TaskGroupwith TaskGroup(group_id='processing_tasks') as processing_group:# 定义一组相关任务task1 = PythonOperator(...)task2 = PythonOperator(...)task1 >> task2

温馨提示: SubDAGs 在内部实际上是作为单独的 DAG 运行的,可能导致调度问题。大多数情况下,TaskGroups 是更好的选择,它只是在 UI 上对任务进行分组,不影响实际执行逻辑。

高级技巧:自定义组件

用了一段时间,你可能会发现标准的操作符满足不了你的特殊需求。没关系,Airflow 允许你自定义各种组件。

自定义 Operator

    from airflow.models.baseoperator import BaseOperatorfrom airflow.utils.decorators import apply_defaultsclass MyCustomOperator(BaseOperator):@apply_defaultsdef __init__(self, my_parameter, *args, **kwargs):super().__init__(*args, **kwargs)self.my_parameter = my_parameterdef execute(self, context):# 实现你的操作逻辑self.log.info(f"使用参数 {self.my_parameter} 执行自定义操作")# 可以返回值,会被推送到XComreturn "操作结果"

自定义 Sensor

Sensor 是一种特殊的操作符,会周期性地检查条件,直到条件满足才继续执行:

    from airflow.sensors.base import BaseSensorOperatorclass MyCustomSensor(BaseSensorOperator):def __init__(self, my_parameter, *args, **kwargs):super().__init__(*args, **kwargs)self.my_parameter = my_parameterdef poke(self, context):# 返回True表示条件满足,返回False表示需要继续等待# 这个方法会被周期性调用self.log.info(f"检查条件是否满足,参数: {self.my_parameter}")# 实现你的检查逻辑return condition_is_met  # 返回布尔值

这些自定义组件可以打包成插件,放在 Composer 环境的 plugins 目录下,就能在所有 DAG 中使用了。

XComs:任务间通信的桥梁

有时候,一个任务的输出需要传递给另一个任务使用。Airflow 提供了 XComs(Cross-Communication)机制来实现这一点:

    def task1(**context):# 向XCom推送值value = "这是任务1生成的值"context['task_instance'].xcom_push(key='my_value', value=value)def task2(**context):# 从XCom拉取值value = context['task_instance'].xcom_pull(task_ids='task1', key='my_value')print(f"从任务1获取的值: {value}")task1 = PythonOperator(task_id='task1', python_callable=task1, dag=dag)task2 = PythonOperator(task_id='task2', python_callable=task2, dag=dag)task1 >> task2

默认情况下,Operator 的返回值也会自动推送到 XCom:

    def task1():return {"status": "success", "count": 42}# 在task2中可以通过xcom_pull(task_ids='task1')获取到这个字典

温馨提示: XCom 是存储在元数据数据库中的,不适合传递大量数据。如果需要在任务间传递大文件,应该使用外部存储如 GCS。

定时任务的那些坑

Airflow 的调度器很强大,但也有些容易掉进去的坑:

关于 start_date

start_date 参数指定的是 DAG 的首次执行时间,但这并不意味着 DAG 在那个时间点立即执行。调度器会在 start_date + schedule_interval 的时间点触发首次执行。

举个例子,如果 start_date 是 2023-01-01 01:00:00,schedule_interval 是 ‘@daily’,那么第一次执行会在 2023-01-02 01:00:00 触发,处理的是 2023-01-01 的数据。

这个逻辑乍一看有点反直觉,但想想也合理:定时任务通常是处理过去一段时间内的数据,而不是未来的。

关于 catchup

默认情况下,如果 Airflow 发现从 start_date 到当前有遗漏的调度周期,它会尝试"追赶"这些错过的执行。这可能导致一大堆历史任务突然被触发。

通过设置 catchup=False,可以禁用这种行为:

    dag = DAG('my_dag',start_date=datetime(2023, 1, 1),schedule_interval='@daily',catchup=False  # 不追赶错过的执行)

温馨提示: 在生产环境中,除非确实需要处理历史数据,否则推荐将 catchup 设置为 False,避免意外触发大量历史任务造成资源紧张。

故障排除:常见问题

使用 Airflow 时,特别是刚开始,可能会遇到各种奇怪的问题。这儿列几个我踩过的坑:

1. DAG 没有按时触发

检查一下是不是调度器出问题了。在 Composer 环境中,可以查看调度器的日志,通常能找到线索。

另外,确认你的 DAG 文件有没有语法错误。即使一个小错误,整个文件都会被忽略处理。

2. 任务执行很慢

首先检查资源是否足够。Composer 环境的 worker 节点如果太小,处理大量任务会很吃力。

其次,看看是不是任务本身有优化空间,比如 BigQuery 查询是否需要处理大量数据,能否通过分区表或聚集表优化。

3. 内存溢出错误

Airflow 默认的 worker 内存限制比较小,如果处理大量数据可能会溢出。解决方法有:

  • 增加 worker 的内存配置
  • 优化任务代码,减少内存使用
  • 使用 KubernetesPodOperator 在单独的 Pod 中执行内存密集型任务

4. XCom 值丢失

XCom 只能存储相对较小的值(默认 48KB)。如果发现 XCom 值丢失,很可能是数据太大了。

改用外部存储如 GCS 来传递大数据集,XCom 只存储引用。

实际开发中遇到问题别急着怀疑 Airflow 本身有 bug,大多数情况下是配置或代码逻辑的问题。Airflow 是生产级软件,功能稳定性还是有保证的。

写在最后

Apache Airflow 搭配谷歌云,就是数据工程师的得力助手。从简单的 ETL 到复杂的机器学习流水线,它都能轻松应对。

学会了 Airflow,我一个人能干五个人的活儿(而且不用加班)。数据处理任务一旦实现自动化,就能腾出时间做更有价值的事情,比如设计更高级的数据分析系统,或者…喝咖啡摸鱼(咳咳,开个玩笑)。

这篇文章只是冰山一角,Airflow 的功能远不止于此。建议实践中多尝试,多看文档,慢慢你就会发现它的更多强大功能。那些看起来复杂的概念,用着用着就熟悉了。

代码写得好,周末早点跑;流程自动化,老板夸你棒!

http://www.dtcms.com/a/438017.html

相关文章:

  • 小红书自动化运营:智能体+RPA自动化+MCP实现采集仿写和自动发布
  • 网站域名和网站网址建筑培训网 江苏
  • 定制开发开源AI智能名片S2B2C商城小程序的会员制运营研究——以“老铁用户”培养为核心目标
  • 【aigc】chrome-devtools-mcp怎么玩?
  • 从《Life of A Pixel》来看Chrome的渲染机制
  • 【项目实战 Day9】springboot + vue 苍穹外卖系统(用户端订单模块 + 商家端订单管理模块 完结)
  • Mac 安装Neo4j教程
  • blender 解决shift快捷键和中英切换重复的问题
  • 网站动态图怎么做阳明拍卖公司网站
  • 01_Docker 部署 Ollama 模型(支持 NVIDIA GPU)
  • 苏州新区网站制作wordpress视频格式
  • 一位Android用户的科技漫游手记
  • android中调用相册
  • 安卓基础组件031-Retrofit 网络请求框架
  • Redis 黑马点评-商户查询缓存
  • Android geckoview 集成,JS交互,官方demo
  • 【APK安全】Android 权限校验核心风险与防御指南
  • 单调队列与单调栈
  • 设计与优化Java API:构建高效、可维护的接口
  • Locality Sensitive Hashing (LSH) 详解:高效检测语言语句重复的利器
  • 阿里云网站开发零起步如何做设计师
  • 后端开发基础概念MVC以及Entity,DAO,DO,DTO,VO等概念
  • 七大排序算法的基本原理
  • Gateway-过滤器
  • 科普:Python 中,字典的“动态创建键”特性
  • Java 21 或 JavaFX 打包 exe 之 GraalVM Native Image 方案
  • 1.2.3 MCP(Model Context Protocol)
  • dede网站栏目管理网络科技是做什么的
  • 《Gdb 调试实战指南:不同风格于VS下的一种调试模式》
  • lua虚拟机的垃圾回收机制