当前位置: 首页 > news >正文

Flink提交pyflink任务

1.官方文档:
flink1.14:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
flink1.18:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs

2.提交PyFlink作业 - Submitting PyFlink Jobs #

(1)环境检查

Currently, users are able to submit a PyFlink job via the CLI. It does not require to specify the JAR file path or the entry main class, which is different from the Java job submission.
官方翻译:当前用户可以通过命令行提交PyFlink作业。不要指定 jar 文件路径或者主类入口,跟Java作业提交不一样。

When submitting Python job via flink run, Flink will run the command “python”. Please run the following command to confirm that the python executable in current environment points to a supported Python version of 3.6+.
官方翻译:当使用 flink run 提交Python作业时,Flink会运行命令 python。请运行下面的命令确保Python可执行程序在当前环境中,并指向Python 3.6+ 版本。

$ python --version

the version printed here must be 3.6+

(2)运行PyFlink作业 - Run a PyFlink job

The following commands show different PyFlink job submission use-cases:
官方翻译:后续的命令展示了不同的PyFlink作业提交用例:

示例1:$ ./bin/flink run --python examples/python/table/word_count.py

Run a PyFlink job with additional source and resource files. Files specified in --pyFiles will be added to the PYTHONPATH and, therefore, available in the Python code.
官方翻译:使用额外的源和资源文件运行PyFlink作业。在 --pyFiles 指定的文件都会被加入到 PYTHONPATH 中,因此就在python代码中可用。

示例2:$ ./bin/flink run \
      --python examples/python/table/word_count.py \
      --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in --jarfile will be uploaded to the cluster.
官方翻译:运行引用了Java自定义函数或者外部连接器的PyFlink作业。在 --jarfile 后指定的 jar 文件将会被上传到集群。

示例3:$ ./bin/flink run \
      --python examples/python/table/word_count.py \
      --jarfile <jarFile>

Run a PyFlink job with pyFiles and the main entry module specified in --pyModule:
官方翻译:使用 pyFiles 选项运行PyFlink作业需要使用 --pyModule 参数指定主模块入口:

示例4:$ ./bin/flink run \
      --pyModule table.word_count \
      --pyFiles examples/python/table

Submit a PyFlink job on a specific JobManager running on host (adapt the command accordingly):
官方翻译:将PyFlink作业提交到指定的 JVM 上运行:

示例5:$ ./bin/flink run \
      --jobmanager <jobmanagerHost>:8081 \
      --python examples/python/table/word_count.py

Run a PyFlink job using a YARN cluster in Per-Job Mode:
官方翻译:使用以每作业模式的 YARN 集群运行PyFlink作业:

示例6:$ ./bin/flink run \
      --target yarn-per-job
      --python examples/python/table/word_count.py

Run a PyFlink application on a native Kubernetes cluster having the cluster ID , it requires a docker image with PyFlink installed, please refer to Enabling PyFlink in docker:
官方翻译:在指定集群标识的 Kubernetes 原生集群上运行PyFlink应用,需要一个PyFlink的容器镜像,请参考在容器里启用PyFlink:

示例7:$ ./bin/flink run-application \
      --target kubernetes-application \
      --parallelism 8 \
      -Dkubernetes.cluster-id=<ClusterId> \
      -Dtaskmanager.memory.process.size=4096m \
      -Dkubernetes.taskmanager.cpu=2 \
      -Dtaskmanager.numberOfTaskSlots=4 \
      -Dkubernetes.container.image=<PyFlinkImageName> \
      --pyModule word_count \
      --pyFiles /opt/flink/examples/python/table/word_count.py

相关文章:

  • pytest asyncio 支持插件 pytest-asyncio
  • 基于51单片机的的鸡笼补光和恒温系统的设计与实现(源程序+Protues仿真+电路图+元件清单+器件手册)
  • python opencv基础使用总结
  • 41.日常算法
  • 介绍两本学习智谱大模型的入门图书
  • Java小白入门基础知识(二)
  • 晶闸管主要参数分析与损耗计算
  • JavaScript 内置对象-日期对象
  • Ubuntu 系统 LVM 逻辑卷扩容教程
  • Hive之分区表
  • 【大模型系列】Windows系统上运行大语言模型方式
  • SpringBoot+微信小程序+数据可视化的宠物到家喂宠服务(程序+论文+讲解+安装+调试+售后等)
  • HCIA项目实践(网络)---NAT地址转化技术
  • frp-tool,客户端frp命令行工具
  • 【苍穹外卖】学习
  • LeetCode 1299.将每个元素替换为右侧最大元素:倒序遍历,维护最大值,原地修改
  • WPS的AI助手进化跟踪(灵犀+插件)
  • 【NLP251】BertTokenizer 的全部 API 及 使用案例
  • shell脚本备份mysql数据库和库下表
  • 算法刷题--哈希表--字母异位词和两个数组的交集
  • 马上评丨又见酒店坐地起价,“老毛病”不能惯着
  • 船只深夜撞上海上风机后沉没1死1失踪,调查报告公布
  • 关键词看中国经济“一季报”:稳,开局良好看信心
  • 视频丨伊朗阿巴斯港一处油罐发生高强度爆炸:造成大面积破坏,伤亡不明
  • 合同约定拿850万保底利润?重庆市一中院:约定无效,发回重审
  • 药企销售的茶碱层层流转后部分被用于制毒,销售人员一审被判15年