当前位置: 首页 > wzjs >正文

苏州网站制作及推广国内比较好的软文网站

苏州网站制作及推广,国内比较好的软文网站,做web网站原型设计软件,网站开发项目设计文档目录 背景 一、pyflink安装 二、编写demo程序 三、提交yarn前准备 四、提交任务 五、踩坑记录 1、提交任务时客户端出现语法错误 2、提交任务时客户端出现lzma包找不到 3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionIn…

目录

背景

一、pyflink安装

二、编写demo程序

三、提交yarn前准备

 四、提交任务

五、踩坑记录

1、提交任务时客户端出现语法错误

2、提交任务时客户端出现lzma包找不到

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

5、提交任务时taskmanager上报错出现Permission denied

六、总结


背景

        最近项目需要,学习研究使用python来开发flink任务,在此做相关笔记。

一、pyflink安装

        在本地执行如下命令,需要注意的是,pyflink必须要求python版本大于等于3.6,我本地是3.12

创建虚拟环境

python -m venv venv

切换虚拟环境

venv/bin/activate

执行安装命令

pip install apache-flink==1.14

 安装成功后如下

二、编写demo程序

        这里直copy官方教程

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().get_configuration().set_string("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

在本地执行该脚本,输出如下

三、提交yarn前准备

准备一台linux服务器,并装有flink客户端(我使用的版本是flink1.14.2,这里不说客户端如何安装了,下载包解压安装即可)

在服务器上搭建pyflink运行环境,参考第一章节

将demo程序上传到该服务器上

其中env为python虚拟环境目录,py_env.zip为将env使用zip进行压缩的文件

 四、提交任务

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

看到终端打印如下日志

访问yarn集群web管理页面,在running下看到有对应的任务时,即表示任务已经提交到yarn集群

查看任务详情

五、踩坑记录

1、提交任务时客户端出现语法错误

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]File "main.py", line 55ds.print()^
SyntaxError: invalid syntax

解决方法:上述问题排查发现是flink客户端版本差异导致,编写的demo代码,和flink客户端版本要一致,否则会出现一些莫名其妙的问题,统一调整flink版本为一致,包括pyflink,flink客户端等

2、提交任务时客户端出现lzma包找不到

Traceback (most recent call last):File "/home/zhubao/env/lib/python3.7/site-packages/fastavro/read.py", line 2, in <module>from . import _readFile "fastavro/_read.pyx", line 11, in init fastavro._readFile "/home/master/python3/lib/python3.7/lzma.py", line 27, in <module>from _lzma import *
ModuleNotFoundError: No module named '_lzma'

解决方法:该错误表明系统缺少Python的LZMA压缩模块依赖(_lzma),这是Python标准库中处理.xz压缩文件的底层C扩展模块,需要进行安装

# 使用root权限执行如下命令
yum install -y xz-devel python-backports-lzma
# 使用普通用户执行安装lzma包命令
pip install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

安装完成后,需要对lzma文件进行修改,找到lzma.py文件,一般在$PYTHON_HOME/lib/python3.7目录下(根据实际版本),主要是加上try except

修改完成后保存退出,重新执行解决该问题

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

Traceback (most recent call last):File "main.py", line 99, in <module>word_count(known_args.input, known_args.output)File "main.py", line 39, in word_countds = env.from_collection(word_count_data)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 958, in from_collectionreturn self._from_collection(collection, type_info)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 981, in _from_collectionj_input_format = PythonTypeUtils.getCollectionInputFormat(File "/home/zhubao/env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1550, in __getattr__"{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

解决方法:还是版本不匹配导致,请确保pyflink与客户端版本一致

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Traceback (most recent call last):File "<string>", line 1, in <module>
ImportError: No module named pyflink

解决方法:在flink配置文件中,加上python执行环境,打开flink-conf.yaml文件,一般在$FLINK_HOME/conf目录下,编辑该文件,在末尾加上python执行环境

python.client.executable: /home/zhubao/env/bin/python

5、提交任务时taskmanager上报错出现Permission denied

Caused by: java.io.IOException: Cannot run program "/home/zhubao/env/bin/python": error=13, Permission denied

解决方法:找了一些方案,但最终通过将整个python执行环境打包提交到yarn上解决,方法如下

将python环境达成zip包

zip -r py_env.zip /home/zhubao/env/

提交任务命令增加指定环境包与执行环境

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

六、总结

        以上是使用pyflink进行flink任务开发,以及将任务提交到yarn集群方法。

http://www.dtcms.com/wzjs/118218.html

相关文章:

  • 响应式和非响应式网站南昌seo排名优化
  • 淘宝站外网站可以做吗网站推广模式
  • 陕西省住房与城乡建设部网站媒体:北京不再公布疫情数据
  • 织梦网站换空间深圳企业网站制作公司
  • 空间设计手法有哪些账号seo是什么
  • 做商城网站费用做百度推广代运营有用吗
  • 做片视频在线观看网站悟空建站seo服务
  • 深圳网站建设外贸公司价格软文网
  • 好听的网站名称国外域名注册平台
  • 福州网站建设方案现在有什么技能培训班
  • 连云港网站制作公司哪家好自媒体发布平台有哪些
  • 佛山网站建设与设计nba交易最新消息
  • dz网站建设视频教程营销网站的建造步骤
  • 专业网站建设模板棋牌软件制作开发多少钱
  • 网站建设凡科百度网站快速排名公司
  • 教你做网站网站友情链接连接
  • wordpress网站地图提交搜索引擎优化的名词解释
  • 做视频的背景音乐哪里下载网站在线搭建网站
  • 静态网页和动态网页的区别和联系seo服务价格表
  • 相亲网站上做绿叶的女人很多营销策划公司名称
  • 南浔建设局网站做网站多少钱一年
  • 香港做一楼一凤的网站合法吗推广怎么做
  • 电商会学着做网站呢百度搜索引擎优化指南最新版
  • 郑州网站建设制作搜索引擎优化介绍
  • 给几个网站谢谢关键词推广优化外包
  • 温州网站建站免费b站软件推广网站
  • 上海市卫生健康委员会seo关键词推广多少钱
  • 建设银行广达行网站网站收录入口申请查询
  • 广州网站开发十度网络最好b2b平台是什么意思
  • 云一网站建设长春网站建设推广