当前位置: 首页 > news >正文

Apache Airflow (十二) :PythonOperator

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。

关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation

python_callable(python callable):调用的python函数

op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。

op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。

PythonOperator调度案例

import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# python中 *  关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
    print(a)
    print(b)
    print("hello airflow1")

# 返回的值只会打印到日志中
    return{"sss1":"xxx1"}

def print__hello2(random_base):
    print(random_base)
    print("hello airflow2")

# 返回的值只会打印到日志中
    return{"sss2":"xxx2"}

default_args = {
    'owner':'maliu',
    'start_date':datetime(2021, 10, 1),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_pythoncode',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=PythonOperator(
    task_id='first',
    #填写  print__hello1 方法时,不要加上“()”
    python_callable=print__hello1,
    # op_args 对应 print_hello1 方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs 对应 print__hello1 方法中的b参数
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='second',
    #填写  print__hello2 方法时,不要加上“()”
    python_callable=print__hello2,
    # random_base 参数对应 print_hello2 方法中参数“random_base”
    op_kwargs={"random_base":random.randint(0,9)},
    dag=dag
)

first >> second

相关文章:

  • nginx学习(4)Nginx 负载均衡
  • 【鸿蒙最新全套教程】<HarmonyOS第一课>1、运行Hello World
  • 傅里叶级数@正弦级数和余弦级数@奇偶延拓和周期延拓
  • VBA如何快速识别Excel单元格中的文本数字
  • 【每日刷题——语音信号篇】
  • AIGC ChatGPT4对Gbase数据库进行总结
  • 网络安全涉及哪些方面?
  • Python大数据之linux学习总结——day11_ZooKeeper
  • Python-----PyInstaller的简单使用
  • wpf devexpress在未束缚模式中生成Tree
  • Python-正则表达式使用
  • Docker命令 常用中间件运维部署,方便构建自己服务
  • 多态语法详解
  • “移动机器人课程群实践创新的困境与突围”素材
  • ArkTS - HarmonyOS服务卡片(创建)
  • 精密云工程:智能激活业务速率 ——华为云11.11联合大促倒计时 仅剩3日
  • 2023年中职“网络安全“—Web 渗透测试②
  • mac上配置maven
  • 计算机网络的性能指标
  • OpenAI内斗剧情反转!微软力保ChatGPT之父回归?
  • 佩斯科夫:俄方代表团15日将在伊斯坦布尔等候乌克兰代表团
  • 财政部党组召开2025年巡视工作会议暨第一轮巡视动员部署会
  • 日本广岛大学一处拆迁工地发现疑似未爆弹
  • 母亲节书单|关于生育自由的未来
  • 人民日报刊文:加快解放和发展新质战斗力
  • 道指跌逾100点,特斯拉涨近5%