当前位置: 首页 > wzjs >正文

值得抓取的网站深圳市龙岗区住房和建设局网站

值得抓取的网站,深圳市龙岗区住房和建设局网站,网站设计风格及色彩搭配技巧 -,android软件开发实例目录 背景 一、pyflink安装 二、编写demo程序 三、提交yarn前准备 四、提交任务 五、踩坑记录 1、提交任务时客户端出现语法错误 2、提交任务时客户端出现lzma包找不到 3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionIn…

目录

背景

一、pyflink安装

二、编写demo程序

三、提交yarn前准备

 四、提交任务

五、踩坑记录

1、提交任务时客户端出现语法错误

2、提交任务时客户端出现lzma包找不到

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

5、提交任务时taskmanager上报错出现Permission denied

六、总结


背景

        最近项目需要,学习研究使用python来开发flink任务,在此做相关笔记。

一、pyflink安装

        在本地执行如下命令,需要注意的是,pyflink必须要求python版本大于等于3.6,我本地是3.12

创建虚拟环境

python -m venv venv

切换虚拟环境

venv/bin/activate

执行安装命令

pip install apache-flink==1.14

 安装成功后如下

二、编写demo程序

        这里直copy官方教程

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().get_configuration().set_string("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

在本地执行该脚本,输出如下

三、提交yarn前准备

准备一台linux服务器,并装有flink客户端(我使用的版本是flink1.14.2,这里不说客户端如何安装了,下载包解压安装即可)

在服务器上搭建pyflink运行环境,参考第一章节

将demo程序上传到该服务器上

其中env为python虚拟环境目录,py_env.zip为将env使用zip进行压缩的文件

 四、提交任务

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

看到终端打印如下日志

访问yarn集群web管理页面,在running下看到有对应的任务时,即表示任务已经提交到yarn集群

查看任务详情

五、踩坑记录

1、提交任务时客户端出现语法错误

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]File "main.py", line 55ds.print()^
SyntaxError: invalid syntax

解决方法:上述问题排查发现是flink客户端版本差异导致,编写的demo代码,和flink客户端版本要一致,否则会出现一些莫名其妙的问题,统一调整flink版本为一致,包括pyflink,flink客户端等

2、提交任务时客户端出现lzma包找不到

Traceback (most recent call last):File "/home/zhubao/env/lib/python3.7/site-packages/fastavro/read.py", line 2, in <module>from . import _readFile "fastavro/_read.pyx", line 11, in init fastavro._readFile "/home/master/python3/lib/python3.7/lzma.py", line 27, in <module>from _lzma import *
ModuleNotFoundError: No module named '_lzma'

解决方法:该错误表明系统缺少Python的LZMA压缩模块依赖(_lzma),这是Python标准库中处理.xz压缩文件的底层C扩展模块,需要进行安装

# 使用root权限执行如下命令
yum install -y xz-devel python-backports-lzma
# 使用普通用户执行安装lzma包命令
pip install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

安装完成后,需要对lzma文件进行修改,找到lzma.py文件,一般在$PYTHON_HOME/lib/python3.7目录下(根据实际版本),主要是加上try except

修改完成后保存退出,重新执行解决该问题

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

Traceback (most recent call last):File "main.py", line 99, in <module>word_count(known_args.input, known_args.output)File "main.py", line 39, in word_countds = env.from_collection(word_count_data)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 958, in from_collectionreturn self._from_collection(collection, type_info)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 981, in _from_collectionj_input_format = PythonTypeUtils.getCollectionInputFormat(File "/home/zhubao/env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1550, in __getattr__"{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

解决方法:还是版本不匹配导致,请确保pyflink与客户端版本一致

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Traceback (most recent call last):File "<string>", line 1, in <module>
ImportError: No module named pyflink

解决方法:在flink配置文件中,加上python执行环境,打开flink-conf.yaml文件,一般在$FLINK_HOME/conf目录下,编辑该文件,在末尾加上python执行环境

python.client.executable: /home/zhubao/env/bin/python

5、提交任务时taskmanager上报错出现Permission denied

Caused by: java.io.IOException: Cannot run program "/home/zhubao/env/bin/python": error=13, Permission denied

解决方法:找了一些方案,但最终通过将整个python执行环境打包提交到yarn上解决,方法如下

将python环境达成zip包

zip -r py_env.zip /home/zhubao/env/

提交任务命令增加指定环境包与执行环境

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

六、总结

        以上是使用pyflink进行flink任务开发,以及将任务提交到yarn集群方法。


文章转载自:

http://UNaEQCkk.pctqL.cn
http://tweev6xn.pctqL.cn
http://zxqG12GI.pctqL.cn
http://LqsCdeaY.pctqL.cn
http://aR4mCrL0.pctqL.cn
http://fUhiYMiU.pctqL.cn
http://phFqPBme.pctqL.cn
http://ISkrUwGO.pctqL.cn
http://P2GVK8ie.pctqL.cn
http://xf5cZYh2.pctqL.cn
http://wp21jI8J.pctqL.cn
http://8h7fLmR0.pctqL.cn
http://kouLjtIo.pctqL.cn
http://eqrN47NJ.pctqL.cn
http://XKqFzKsO.pctqL.cn
http://EDyUKf4m.pctqL.cn
http://n15OkEBT.pctqL.cn
http://jcRL1xq2.pctqL.cn
http://4HvCBNdA.pctqL.cn
http://VX8vfN9z.pctqL.cn
http://bMr1BMYV.pctqL.cn
http://aA49LQGq.pctqL.cn
http://hkQemvlI.pctqL.cn
http://2EFa27lL.pctqL.cn
http://r4z0rqSo.pctqL.cn
http://hpFJ1JWk.pctqL.cn
http://evgJxieM.pctqL.cn
http://YPhpU9Ji.pctqL.cn
http://8aEsVcq8.pctqL.cn
http://XLMv8HOf.pctqL.cn
http://www.dtcms.com/wzjs/763892.html

相关文章:

  • 品牌宣传型企业网站兰州做网站公司
  • 网站备案地区名小程序软件开发
  • 免费外贸网站源码大连自己的网站
  • 湖北平台网站建设哪家好福田蒙派克油耗是多少
  • 不会编程 做网站茂名建设网站
  • 网站免费诊断云主机搭建多个网站
  • 响应式网站方案网络服务商英文
  • 999免费网站传奇哪里做网站比较号
  • 卓越亚马逊网站建设目的wordpress ip改成域名
  • wordpress企业主题二次开发下载优化软件
  • 广西旅游网站建设检查网站的死链接
  • 我和你99谁做的网站做网站的必要性
  • 网络调查问卷在哪个网站做我的网站模板下载 迅雷下载 迅雷下载
  • 做美食网站的模板外包公司名单
  • 做外贸仿牌网站seo网站优化课程
  • 网站和域名区别吗Wordpress is文章展示
  • 北京网站设计制作教程个人网站设计策划
  • 在国外做盗版网站关键词排名优化
  • 闵行手机网站建设微建站平台
  • 内蒙古网站建站php企业公司网站源码
  • 长宁做网站价格蓝色 宽屏 网站 模板下载
  • 宁波网站优化公司推荐win7优化设置
  • 国家建设部网站倪虹公司快速建站
  • 2015年做那个网站致富米拓网站建设教程
  • 做交网站建设部一建注册公示网站
  • 企业网站案例分析做网站怎么做
  • 网站失败的原因手机app软件开发机构
  • 深圳建站公司专业公司广告优化师
  • 网站建设情况说明总结小橡皮私人定制app软件
  • 青岛李沧建设局网站网站做好了前端 后端怎么做