当前位置: 首页 > news >正文

Flink实战教程从入门到精通(基础篇)(三)Flink集群部署

一、集群角色

        Flink提交作业和执行任务,需要几个关键组件:

        客户端(Client):代码由客户端获取并做转换,之后提交给JobManger

        JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。

        TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。

        注意:Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的介绍:让大家有一个初步的认识,之后再展开讲述不同情形下的Flink部署。 

二、集群搭建

1、集群规划

节点服务器

node-1node-2node-3

角色

JobManager

TaskManager

TaskManager

TaskManager

具体安装部署如下:

2、下载并减压安装包

2.1下载并解压安装包 

        (1)下载安装包 flink-1.17.0-bin-scala 2.12.tg2,将该jar包上传到 node-1节点服务器的/opt/software路径上。

        (2)在/opt/software 路径上解压 flink-1.17.0-bin-scala 2.12.tgz 到/opt/module 路径上。

可以使用命令 tar -zxvf flink-1.17.0-bin-scala 2.12.tgz -C /opt/module,从而解压到/opt/module路径上。

2.2 修改集群配置

        (1)进入conf路径,修改flink-confyaml文件,指定node-1节点服务器为JobManager 

修改内容如下:

# JobManager节点地址.
jobmanager.rpc.address: node-1
jobmanager.bind-host: 0.0.0.0
rest.address: node-1
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node-1

(2)修改workers文件,指定node-1、node-2和node-3为TaskManager

修改内容如下:

修改如下内容:
node-1
node-2
node-3

 (3)修改masters文件

root@node-1:/opt/module/flink-1.17.0/conf# vim masters

修改内容如下:

node-1:8081

 (4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:

        jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。

        taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。

        taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。

        parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。

关于Slot和并行度的概念,我们会在下一章做详细讲解。 

3、分发安装目录 

(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。

(2)修改node-2节点的conf/flink-conf.yaml中的 taskmanager.host 

修改内容如下:

# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: node-2

 (3)修改node-3节点的conf/flink-conf.yaml中的 taskmanager.host 

修改内容如下:

# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: node-3

 4、启动集群

(1)在node-1节点服务器上执行start-cluster.sh启动Flink集群:

root@node-1:/opt/module/flink-1.17.0/bin# ./start-cluster.sh 

 (2)启动成功:

root@node-1:/opt/module/flink-1.17.0/bin# ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node-1.
Starting taskexecutor daemon on host node-1.
Starting taskexecutor daemon on host node-2.
Starting taskexecutor daemon on host node-3.
root@node-1:/opt/module/flink-1.17.0/bin# 
注意:

1、连接名:如果设置连接名为node-1、node-2、node-3,需要在三台服务器的/etc/hosts 文件中修改配置:三台服务器都需要配置

root@node-2:/opt/module/flink-1.17.0/conf# cat /etc/hosts
127.0.0.1 localhost

192.168.50.55 node-1
192.168.50.35 node-2
192.168.50.75 node-3


# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
root@node-2:/opt/module/flink-1.17.0/conf# 

2、脚本权限:需要对/opt/module/flink-1.17.0/bin下面的所有脚本设置为最高权限,防止集群启动脚本权限不足,无法启动。三台服务器都需要配置。

5、访问Web UI

启动成功后,同样可以访问http://hadoop102:8081http://192.168.50.55:8081http://hadoop102:8081对flink集群和任务进行监控管理。

        这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。 

三、向集群提交作业 

        在上一章中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。本节我们将以该程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。

1、环境准备

        在node-1中执行以下命令启动netcat

root@node-1:~# nc -lk 7777

2、程序打包 

(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:

注意:cn.konne.un.WordCountUn,这个main方法包

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>cn.konne.un.WordCountUn</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(2)插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。

NFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.579 s
[INFO] Finished at: 2025-03-21T21:25:26+08:00
[INFO] Final Memory: 67M/763M
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

3、在Web UI上提交作业

(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。

jar包完成后,如下图所示:

(2)点击该JAR包,出现任务配置页面,进行相应配置。

        主要配置程序入口主类的全类名任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。

(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。 

(4)测试

 服务器node-1,执行命令nc -lk 7777,启动socket端口:

root@node-1:~# nc -lk 7777
hello

        点击print,查看输出的节点,然后点击TaskManager,可以看到是node-1执行了程序,点击More查看具体日志。如下图:

自动跳转至节点执行日志,如下图:

 

点击标准输出 stdount,就可以查看标准日志,如下图:

(5)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。

4、命令行提交作业

        除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.17.0下

(1)首先需要启动集群。


root@node-1:/opt/module/flink-1.17.0/bin# bin/start-cluster.sh

(2)在node-1中执行以下命令启动netcat。


root@node-1:~# nc -lk 7777

(3)将flink程序运行jar包上传到/opt/module/flink-1.17.0路径。

(4)进入到flink的安装路径下,在命令行使用flink run命令提交作业。

bin/flink run -m node-1:8081 -c cn.konne.un.WordCountUn ./Flink-First-Demo-1.0-SNAPSHOT.jar

这里的参数 -m指定了提交到的JobManager,-c指定了入口类。

(5)在浏览器中打开Web UI,http://node-1:8081 查看应用执行情况。

用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。

 

相关文章:

  • 软考-软件设计师-面向对象
  • CSS 中@media查询的工作原理,如何利用它实现不同设备的样式适配
  • HarmonyOS Failure[MSG_ERR_INSTALL_GRANT_REQUEST_PERMISSIONS_FAILED]报错权限自查
  • ripro 主题激活 问题写入授权Token失败,可能无文件写入权限
  • 场外个股期权是什么?场外个股期权还能做吗?
  • 四.ffmpeg对yuv数据进行h264编码
  • 一道原创OI题(普及-)——ZCS的随机游走的数据生成器
  • 飞机燃油系统故障频发?数字仿真带来全新解决方案
  • 课程5. 迁移学习
  • Spring Boot集成Redis并设置密码后报错: NOAUTH Authentication required
  • 2020年全国职业院校技能大赛改革试点赛高职组“云计算”竞赛赛卷
  • 参数估计学习笔记通俗易懂版(包括点估计和区间估计(区间估包括总体均值的置信区间(总体标准差未知、总体标准差已知)和总体方差的置信区间))
  • Mybatis能执行一对一、一对多的关联查询吗?都有哪些实现方式,以及它们之间的区别?
  • 【React】useMemo、useCallback
  • 关于VSCode使用过程中的一些问题记录(持续更新)
  • kernel中外部传递参数使用方法
  • 20250321在荣品的PRO-RK3566开发板的buildroot系统下使用UART1
  • 几个JSON在AutoCAD二次开发中应用比较有优势的场景及具体案例
  • 威联通 后台可用命令查看Bash
  • <项目> 主从Reactor模型的高并发服务器
  • 七大交响乐团在沪“神仙斗法”,时代交响奏出何等时代新声
  • 中华人民共和国和俄罗斯联邦在纪念中国人民抗日战争、苏联伟大卫国战争胜利和联合国成立80周年之际关于进一步深化中俄新时代全面战略协作伙伴关系的联合声明
  • 招行:拟出资150亿元全资发起设立金融资产投资公司
  • 第1现场 | 50多年来首次!印度举行大规模民防演习
  • 中国中古史集刊高质量发展论坛暨《唐史论丛》创刊四十周年纪念会召开
  • “救护车”转运病人半路加价,从宝鸡到西安往返都要多收钱