当前位置: 首页 > wzjs >正文

创建网站 英文广告营销策略

创建网站 英文,广告营销策略,详情页设计论文,wordpress 文章发布目录 背景 一、pyflink安装 二、编写demo程序 三、提交yarn前准备 四、提交任务 五、踩坑记录 1、提交任务时客户端出现语法错误 2、提交任务时客户端出现lzma包找不到 3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionIn…

目录

背景

一、pyflink安装

二、编写demo程序

三、提交yarn前准备

 四、提交任务

五、踩坑记录

1、提交任务时客户端出现语法错误

2、提交任务时客户端出现lzma包找不到

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

5、提交任务时taskmanager上报错出现Permission denied

六、总结


背景

        最近项目需要,学习研究使用python来开发flink任务,在此做相关笔记。

一、pyflink安装

        在本地执行如下命令,需要注意的是,pyflink必须要求python版本大于等于3.6,我本地是3.12

创建虚拟环境

python -m venv venv

切换虚拟环境

venv/bin/activate

执行安装命令

pip install apache-flink==1.14

 安装成功后如下

二、编写demo程序

        这里直copy官方教程

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().get_configuration().set_string("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

在本地执行该脚本,输出如下

三、提交yarn前准备

准备一台linux服务器,并装有flink客户端(我使用的版本是flink1.14.2,这里不说客户端如何安装了,下载包解压安装即可)

在服务器上搭建pyflink运行环境,参考第一章节

将demo程序上传到该服务器上

其中env为python虚拟环境目录,py_env.zip为将env使用zip进行压缩的文件

 四、提交任务

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

看到终端打印如下日志

访问yarn集群web管理页面,在running下看到有对应的任务时,即表示任务已经提交到yarn集群

查看任务详情

五、踩坑记录

1、提交任务时客户端出现语法错误

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]File "main.py", line 55ds.print()^
SyntaxError: invalid syntax

解决方法:上述问题排查发现是flink客户端版本差异导致,编写的demo代码,和flink客户端版本要一致,否则会出现一些莫名其妙的问题,统一调整flink版本为一致,包括pyflink,flink客户端等

2、提交任务时客户端出现lzma包找不到

Traceback (most recent call last):File "/home/zhubao/env/lib/python3.7/site-packages/fastavro/read.py", line 2, in <module>from . import _readFile "fastavro/_read.pyx", line 11, in init fastavro._readFile "/home/master/python3/lib/python3.7/lzma.py", line 27, in <module>from _lzma import *
ModuleNotFoundError: No module named '_lzma'

解决方法:该错误表明系统缺少Python的LZMA压缩模块依赖(_lzma),这是Python标准库中处理.xz压缩文件的底层C扩展模块,需要进行安装

# 使用root权限执行如下命令
yum install -y xz-devel python-backports-lzma
# 使用普通用户执行安装lzma包命令
pip install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

安装完成后,需要对lzma文件进行修改,找到lzma.py文件,一般在$PYTHON_HOME/lib/python3.7目录下(根据实际版本),主要是加上try except

修改完成后保存退出,重新执行解决该问题

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

Traceback (most recent call last):File "main.py", line 99, in <module>word_count(known_args.input, known_args.output)File "main.py", line 39, in word_countds = env.from_collection(word_count_data)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 958, in from_collectionreturn self._from_collection(collection, type_info)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 981, in _from_collectionj_input_format = PythonTypeUtils.getCollectionInputFormat(File "/home/zhubao/env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1550, in __getattr__"{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

解决方法:还是版本不匹配导致,请确保pyflink与客户端版本一致

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Traceback (most recent call last):File "<string>", line 1, in <module>
ImportError: No module named pyflink

解决方法:在flink配置文件中,加上python执行环境,打开flink-conf.yaml文件,一般在$FLINK_HOME/conf目录下,编辑该文件,在末尾加上python执行环境

python.client.executable: /home/zhubao/env/bin/python

5、提交任务时taskmanager上报错出现Permission denied

Caused by: java.io.IOException: Cannot run program "/home/zhubao/env/bin/python": error=13, Permission denied

解决方法:找了一些方案,但最终通过将整个python执行环境打包提交到yarn上解决,方法如下

将python环境达成zip包

zip -r py_env.zip /home/zhubao/env/

提交任务命令增加指定环境包与执行环境

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

六、总结

        以上是使用pyflink进行flink任务开发,以及将任务提交到yarn集群方法。

http://www.dtcms.com/wzjs/501835.html

相关文章:

  • 中央广播电视总台央视少儿客户端深圳网络优化推广公司
  • 自己本地可以做网站服务器能去百度上班意味着什么
  • 商标注册在哪个部门申请海外aso优化
  • wordpress文章框seo运营人士揭秘
  • 南通开发区:高质量发展百度seo搜索引擎优化方案
  • 做网站互联网公司排名软文代写费用
  • 国外vps私人网站优化排名软件推广
  • 学校校园网站建设方案深圳网站关键词排名优化
  • 宁夏做网站建设公司百度引擎入口
  • 上海电子通科技网站建设网站下载
  • 手机网站页面布局游戏推广对接平台
  • 室内设计ppt模板免费东莞关键词排名快速优化
  • 宁波网站建设招商加盟输入关键词进行搜索
  • 宁波网站建设rswl百度关键词推广方案
  • 广西网站制作公司合肥网络优化推广公司
  • 东莞视频网站制作广州最新消息
  • 宜宾建设网国内好的seo
  • 临沂网站建设举措百度热搜关键词排名
  • wordpress站群 优化单个药品营销策划方案
  • 免费app制作工具湖南网站seo找行者seo
  • 武汉网站的制作市场推广计划怎么写
  • 网站建设伍金手指下拉7搜索引擎排名优化包括哪些方面
  • 江西求做网站百度平台商家客服电话
  • 想做网站怎么跟做网站的公司谈判百度搜索引擎的功能
  • 明年做那个网站致富seo是什么职位简称
  • 网站使用的主色调拉新推广渠道
  • 怎样推销网站建设软文营销成功案例
  • 昆明旅行社网站开发百度一下官网入口
  • 网站建设 中企动力泉州抖音搜索关键词推广
  • 柳州市建设投资开发公司网站深圳全网推互联科技有限公司