当前位置: 首页 > news >正文

Flink on Native K8S安装部署

Flink on Native K8S安装部署

1、拥有一个K8S环境

2 、在K8S client机器上下载安装flink

wget  https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
tar -xvf flink-1.18.1-bin-scala_2.12.tgz

3、创建namespace

kubectl create namespace test-flink-session

4、创建secret用于拉取flink镜像

kubectl create secret docker-registry flink-registry-secret \--docker-server=registry-vpc.cn-beijing.aliyuncs.com \--docker-username=xxxx \--docker-password=****** \-n test-flink-session

5、创建账户并授予访问k8s资源权限

# 新建serviceaccount
kubectl create serviceaccount flink-service-account -n test-flink-session
#
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=test-flink-session:flink-service-account
# 权限验证
kubectl auth can-i list pods --namespace test-flink-session --as system:serviceaccount:test-flink-session:flink-service-account

6、HA方式启动flink-session

修改配置文件flink-conf.yaml

env.java.home: /opt/jdk1.8.0_202
env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${FLINK_LOG_PREFIX}.hprofjobmanager.memory.process.size: 1536m
jobmanager.memory.jvm-metaspace.size: 420m
taskmanager.memory.process.size: 1536mexecution.checkpointing.interval: 10m
taskmanager.memory.managed.fraction: 0.2
jobstore.expiration-time: 86400
rest.flamegraph.enabled: true
state.savepoints.dir: oss://wh-bigdata/flink/prod-savepoint/

命令行启动flink-session集群

/opt/deploy/flink-1.18.1/bin/kubernetes-session.sh -Dkubernetes.cluster-id=test-flink-cluster \
-Dkubernetes.service-account=flink-service-account \
-Dkubernetes.namespace=test-flink-session \
-Dkubernetes.jobmanager.cpu.amount=0.05 \
-Dkubernetes.jobmanager.cpu.limit-factor=20 \
-Dkubernetes.taskmanager.cpu.amount=0.05 \
-Dkubernetes.taskmanager.cpu.limit-factor=20 \
-Dkubernetes.container.image.pull-secrets=flink-registry-secret \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dfs.oss.endpoint=oss-cn-beijing.aliyuncs.com \
-Dfs.oss.accessKeyId=****** \
-Dfs.oss.accessKeySecret=****** \
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=oss://wh-bigdata/flink/recovery \
-Dkubernetes.jobmanager.replicas=2 \
-Dweb.upload.dir=/opt/flink/artifacts \
-Dkubernetes.flink.log.dir=/opt/flink/log/data-sync-center-source \
-Dstate.backend=hashmap \
-Dstate.checkpoints.dir=oss://weihai-bigdata/flink/checkpoint \
-Dkubernetes.pod-template-file.default=/opt/deploy/flink-1.18.1/flink-pod-template.yaml

这些属性也可以配置在flink-conf.yaml中, kubernetes.pod-template-file.default指定了创建JobManager和TaskManager Pod时模版文件

apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
namespace: test-flink-session
spec:
tolerations:
- key: "environment"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
environment: flink
containers:
# Do not change the main container name
- name: flink-main-container
image: wh-cn-beijing.cr.volces.com/images/base-images:1.18-oss
imagePullPolicy: Always
env:
- name: TZ
value: Asia/Shanghai
resources:
requests:
cpu: "0.05"
memory: "1024Mi"
ephemeral-storage: 1024Mi
limits:
cpu: "0.1"memory: "2048Mi"
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/volumes/hostpath
name: flink-volume-hostpath
- mountPath: /opt/flink/artifacts
name: flink-artifact
- mountPath: /opt/flink/log
name: flink-logs
# Use sidecar container to push logs to remote storage or do some other debugging things
volumes:
- name: flink-volume-hostpath
hostPath:
path: /tmp
type: Directory
- name: flink-artifact
persistentVolumeClaim:
claimName: nas-flink
- name: flink-logs
persistentVolumeClaim:
claimName: nas-flink-log

7、查看创建的资源

# 查看所有service
kubectl get svc -n test-flink-session
# 查看所有cm 
kubectl get cm -n test-flink-session
# 查看所有deploy:  
kubectl describe deploy -n test-flink-session
# 删掉deploy
kubectl delete deploy -n test-flink-session test-flink-cluster

8、固定IP和service端口

查看pod及端口

kubectl get pod -n test-flink-session -o wide

每次重新部署deployment时,rest-svc端口都会变化,当前flink k8s native模式不能通过配置文件指定rest-svc端口,需要修改k8s svc的nodePort来固定端口

#强制service端口
kubectl  replace --force -f /opt/deploy/flink-1.18.1/svc/test-flink-cluster.yaml#test-flink-cluster.yaml文件内容如下:
#给jobmanager pod打上特殊标签
#pod_name=$(kubectl get pod -n test-flink-session |grep test-flink-cluster | awk  -F ' '  '{print $1}')
#kubectl label pod ${pod_name} environment=test-flink-cluster-rest -n test-flink-session
#test-flink-cluste.yaml
apiVersion: v1
kind: Service
metadata:
name: test-flink-cluste-rest
namespace: test-flink-session
spec:type: NodePort#selector:#  environment: test-flink-cluste-restports:- name: restprotocol: TCPport: 8081targetPort: 8081nodePort: 32583  #此处用于固定rest-svc的端口		

也可通过kubectl edit svc test-flink-cluste-rest -n test-flink-session将ports下的nodePort修改为32583

9、提交fink任务

flink-session有内存泄漏的风险,任务提交次数多了后,会导致JobManager OOM crash, 在HA的模式下会重新拉起一个JobManager恢复之前执行的Job

  • 通过命令行提交,用户的main函数在提交机器上执行,编译成StreamGraph后通过rest接口提交给session集群执行

    # 向flink session cluster提交任务
    ./bin/flink run   --target kubernetes-session -Dkubernetes.cluster-id=test-flink-cluster -Dkubernetes.namespace=test-flink-session -d -c com.wh.crm.DemoTask /home/appuser/wh-crm-flink-1.0.1-SNAPSHOT.jar -env test #log配置文件:conf/log4j-console.properties
    # 查看日志
    kubectl logs -n test-flink-session -f test-flink-cluster-taskmanager-1-4
    #进入Pod查看
    kubectl exec -n test-flink-session -it  test-flink-cluster-taskmanager-1-4 -- bash
    
  • 通过webui提交,在web ui上传jar包,存放到JobManager Pod本地磁盘临时目录中,如果希望session集群重启后依然能看到上传的jar, 则需要将临时目录挂载到持久化卷中

    注意 Entry Class为必填项,且需要注意主类名后不要有空格。其余三个参数为可选项。

    Parallelism: 全局的并行度,会被代码中setParallelism()方法设置的并行度覆盖

    Program Arguments: 命令行使用 - 或 – 设置的自定义参数,flink代码中使用ParameterTool.fromArgs进行解析。例如,-env test

    Savepoint Path: flink任务savepoint的地址,用于断点恢复任务。

    Allow Non Restored State:设置Savepoint Path后,可以选择是否开启,一般设置Savepoint Path后需要开启。

  • Application模式:

    镜像中需包含Job Jar

    /home/appuser/flink-1.18.1/bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=binlog-kafka-handle \
    -Dkubernetes.service-account=flink-service-account \
    -Dkubernetes.namespace=test-flink-session \
    -Dkubernetes.jobmanager.cpu.amount=0.05 \
    -Dkubernetes.jobmanager.cpu.limit=0.05 \
    -Dkubernetes.taskmanager.cpu.amount=0.05 \
    -Dkubernetes.taskmanager.cpu.limit=0.05 \
    -Dkubernetes.container.image.pull-secrets=flink-registry-secret \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dtaskmanager.numberOfTaskSlots=1 \
    -Djobmanager.memory.process.size=1536mb \
    -Djobmanager.memory.jvm-metaspace.size=718mb \
    -Dtaskmanager.memory.process.size=1024mb \
    -Dtaskmanager.memory.managed.size=32mb \
    -Dtaskmanager.memory.jvm-metaspace.size=128mb \
    -Dfs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com \
    -Dfs.oss.accessKeyId=****** \
    -Dfs.oss.accessKeySecret=****** \
    -Dkubernetes.pod-template-file.default=/home/appuser/flink-1.18.1/flink-oss-pod-template.yaml \
    -c com.wh.binlog.CdcMysqlWashingTask \
    local:///opt/flink/artifacts/binlog_kafka_handle-out-1.1-SNAPSHOT.jar \  #jar包在镜像中的路径
    -env test #参数
    

有时需定制flink-image, 让image中包含用户库依赖的jar:

#Dockerfile文件
FROM registry.cn-beijing.aliyuncs.com/yican/flink:1.18
RUN ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
ENV FLINK_HOME=/opt/flink
WORKDIR $FLINK_HOME
RUN mkdir ./plugins/oss-fs-hadoop && rm -rf  ./lib/flink-table-planner-loader-1.18.1.jar
RUN cp ./opt/flink-oss-fs-hadoop-1.18.1.jar ./plugins/oss-fs-hadoop/ && cp ./opt/flink-table-planner_2.12-1.18.1.jar ./lib/
## chown很重要,否则本地copy的类在镜像中时root用户无法被加载
COPY --chown=flink:flink *.jar /opt/flink/lib/
#COPY flink-fs-hadoop-shaded-1.18-SNAPSHOT.jar flink-oss-fs-hadoop-1.18.1.jar flink-hadoop-fs-1.18.1.jar ./plugins/oss-fs-hadoop/
#RUN  sed -i '/^env\.java\.opts\.all:/d' $FLINK_HOME/conf/flink-conf.yaml; \
#     echo 'env.java.opts: "-Dfile.encoding=UTF-8"' | tee -a $FLINK_HOME/conf/flink-conf.yaml > /dev/null
## 更新包索引并安装必要的工具
#RUN apt-get update && \
#    apt-get install -y unzip# 安装ossutil
#RUN curl https://gosspublic.alicdn.com/ossutil/install.sh | bash#COPY ossutilconfig /root

10、关闭flink-sesesion

$ echo 'stop' | ./bin/kubernetes-session.sh \-Dkubernetes.namespace=test-flink-session \-Dkubernetes.cluster-id=test-flink-cluster \-Dexecution.attached=true或者手动清理
# 删除deployment
kubectl  delete deployment/test-flink-cluster -n test-flink-session
#删除svc
kubectl  delete svc test-flink-cluster-rest -n test-flink-session
#删除cm, 存储了配置信息、Pod模版、HA信息
kubectl  delete cm flink-config-test-flink-cluster pod-template-test-flink-cluster test-flink-cluster-cluster-config-map -n test-flink-session

11、错误日志发kafka

配置log4j-console.properties

rootLogger.appenderRef.kafka.ref = KafkaAppenderappender.kafka.type = Kafka
appender.kafka.name = KafkaAppender
appender.kafka.syncSend = false
appender.kafka.topic = ${sys:log-topic:-flink-app-log}
appender.kafka.p.type=Property
appender.kafka.p.name=bootstrap.servers
appender.kafka.p.value=${sys:kafka-broker}
appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.kafka.filter.threshold.type = ThresholdFilter
appender.kafka.filter.threshold.level = ERROR 
http://www.dtcms.com/a/339871.html

相关文章:

  • 3.Kotlin 集合 Set 所有方法
  • es9.0.1语义检索简单示例
  • 颠覆性进化:OpenAI正式发布GPT-5,AI大模型进入“超级智能”时代
  • InnoDB为什么使用B+树实现索引?
  • 神经网络拆解:用Excel模拟手写数字识别
  • Flume学习笔记
  • OR+DBLINK的关联SQL优化思路
  • Transformer中的编码器和解码器是什么?
  • LLMs之RL之GSPO:《Group Sequence Policy Optimization》翻译与解读
  • 高校数字化转型实战:破解数据孤岛、构建智能指标体系与AI落地路径
  • 数据清理后续
  • 低功耗模式
  • Java配置文件
  • Consul- acl机制!
  • 01-Docker-简介、安装与使用
  • Linux学习-通信(信号,共享内存)
  • C++实现教务管理系统,文件操作账户密码登录(附源码)
  • gitlab、jenkins等应用集成ldap
  • AI学习之DeepSeek本地化部署
  • 数据结构-栈和队列
  • Go语言并发编程 ----- sync包
  • Js逆向案例 Scrape Spa2(Webpack自吐)
  • 2020年EAAI SCI1区TOP,基于ORPFOA算法的多无人机在线变化任务路径规划,深度解析+性能实测
  • RAG 面试题(实时更新补充)
  • 基于SpringBoot的篮球馆预约管理系统【2026最新】
  • 西门子博途DB数据块的详细用法以及如何与威纶通触摸屏变量关联
  • 中兴B862AV3.2M/B862AV3.1-M2 晨星mso9385_安卓9_原厂备份救砖包
  • 机械原理的齿轮怎么学?
  • 【网络运维】Playbook部署文件:Files模块库&JINJA2模板
  • 根据Wireshark捕获数据包时间和长度绘制电脑发射信号波形