当前位置: 首页 > news >正文

Flink 高可用集群部署指南

一、部署架构设计

1. 集群架构

graph TDClient([客户端]) --> JM1[JobManager 1]Client --> JM2[JobManager 2]Client --> JM3[JobManager 3]subgraph ZooKeeper集群ZK1[ZooKeeper 1]ZK2[ZooKeeper 2]ZK3[ZooKeeper 3]endsubgraph TaskManager集群TM1[TaskManager 1]TM2[TaskManager 2]TM3[TaskManager 3]endJM1 --> ZK1JM2 --> ZK2JM3 --> ZK3JM1 --> TM1JM1 --> TM2JM1 --> TM3

2. 节点规划

节点主机名IP 地址角色分配硬件配置
节点1flink-jm110.0.0.101JobManager + ZooKeeper8核16GB
节点2flink-jm210.0.0.102JobManager + ZooKeeper8核16GB
节点3flink-jm310.0.0.103JobManager + ZooKeeper8核16GB
节点4flink-tm110.0.0.104TaskManager16核32GB
节点5flink-tm210.0.0.105TaskManager16核32GB
节点6flink-tm310.0.0.106TaskManager16核32GB

二、环境准备

1. 系统要求

  • 操作系统: CentOS 7.9 或 Ubuntu 20.04 LTS
  • Java版本: OpenJDK 11 (建议使用 Azul Zulu 11)
  • 防火墙: 开放以下端口
    • JobManager: 6123, 6124, 8081, 8082
    • TaskManager: 6121, 6122, 6125
    • ZooKeeper: 2181, 2888, 3888

2. 基础配置(所有节点)

# 创建专用用户
sudo useradd -m -s /bin/bash flink
sudo passwd flink# 配置主机名解析(所有节点)
sudo tee -a /etc/hosts <<EOF
10.0.0.101 flink-jm1
10.0.0.102 flink-jm2
10.0.0.103 flink-jm3
10.0.0.104 flink-tm1
10.0.0.105 flink-tm2
10.0.0.106 flink-tm3
EOF# 配置SSH免密登录(JobManager节点间)
sudo -u flink ssh-keygen -t rsa -P ''
sudo -u flink ssh-copy-id flink@flink-jm1
sudo -u flink ssh-copy-id flink@flink-jm2
sudo -u flink ssh-copy-id flink@flink-jm3# 安装Java
sudo apt install -y openjdk-11-jdk
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' | sudo tee /etc/profile.d/java.sh
source /etc/profile

三、ZooKeeper集群部署

1. 安装配置(所有ZK节点执行)

# 下载解压
cd /opt
sudo wget https://downloads.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
sudo tar -xzf apache-zookeeper-3.8.1-bin.tar.gz
sudo mv apache-zookeeper-3.8.1-bin zookeeper
sudo chown -R flink:flink /opt/zookeeper# 创建数据目录
sudo mkdir /data/zookeeper
sudo chown flink:flink /data/zookeeper# 配置zoo.cfg
sudo -u flink tee /opt/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=100
admin.enableServer=false
server.1=flink-jm1:2888:3888
server.2=flink-jm2:2888:3888
server.3=flink-jm3:2888:3888
EOF# 创建myid文件(每个节点不同)
# flink-jm1:
echo "1" | sudo -u flink tee /data/zookeeper/myid
# flink-jm2:
echo "2" | sudo -u flink tee /data/zookeeper/myid
# flink-jm3:
echo "3" | sudo -u flink tee /data/zookeeper/myid

2. 启动与验证

# 所有ZK节点启动服务
sudo -u flink /opt/zookeeper/bin/zkServer.sh start# 检查集群状态
sudo -u flink /opt/zookeeper/bin/zkCli.sh -server flink-jm1:2181
[zk: flink-jm1:2181(CONNECTED) 0] srvr

四、Flink集群部署

1. 安装Flink(所有节点)

cd /opt
sudo wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
sudo tar -xzf flink-1.17.1-bin-scala_2.12.tgz
sudo mv flink-1.17.1 flink
sudo chown -R flink:flink /opt/flink# 设置环境变量
echo 'export FLINK_HOME=/opt/flink' | sudo tee /etc/profile.d/flink.sh
echo 'export PATH=$PATH:$FLINK_HOME/bin' | sudo tee /etc/profile.d/flink.sh
source /etc/profile

2. 高可用配置(JobManager节点)

flink-conf.yaml 关键配置:
# flink-jm1、flink-jm2、flink-jm3节点配置
# /opt/flink/conf/flink-conf.yaml# 高可用配置
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha
high-availability.zookeeper.quorum: flink-jm1:2181,flink-jm2:2181,flink-jm3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /flink-cluster# 状态后端配置(需HDFS支持)
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.localdir: /data/rocksdb# JobManager配置
jobmanager.rpc.address: flink-jm1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4096m
jobmanager.scheduler: adaptive# TaskManager配置
taskmanager.memory.process.size: 24576m  # 24GB
taskmanager.memory.managed.size: 8192m   # 8GB 堆外内存
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.network.min: 512m
taskmanager.memory.network.max: 1024m# 网络与通信
taskmanager.network.bind-policy: ip
akka.ask.timeout: 60s# Web UI
rest.address: 0.0.0.0
rest.port: 8081# 检查点配置
execution.checkpointing.interval: 5min
execution.checkpointing.timeout: 10min
execution.checkpointing.mode: EXACTLY_ONCE
masters配置:
# /opt/flink/conf/masters(所有JobManager节点相同)
flink-jm1:8081
flink-jm2:8081
flink-jm3:8081
workers配置:
# /opt/flink/conf/workers(所有节点相同)
flink-tm1
flink-tm2
flink-tm3

3. TaskManager节点配置

# /opt/flink/conf/flink-conf.yaml(所有TaskManager节点)# 覆盖JobManager地址配置
jobmanager.rpc.address: flink-jm1# TaskManager专用配置
taskmanager.memory.process.size: 24576m  
taskmanager.memory.managed.size: 8192m
taskmanager.numberOfTaskSlots: 8

4. 配置HDFS支持(可选)

# 所有节点
sudo tee -a /opt/flink/conf/flink-conf.yaml <<EOF
fs.hdfs.hadoopconf: /etc/hadoop/conf
fs.hdfs.hdfsdefault: hdfs-default.xml
fs.hdfs.hdfssite: hdfs-site.xml
EOF# 复制Hadoop配置文件到Flink目录
sudo cp /etc/hadoop/conf/*-site.xml /opt/flink/conf/

五、启动集群

1. 启动JobManager集群

# 在每个JobManager节点执行
sudo -u flink $FLINK_HOME/bin/jobmanager.sh start# 检查启动状态
sudo -u flink $FLINK_HOME/bin/jobmanager.sh status

2. 启动TaskManager集群

# 在每个TaskManager节点执行
sudo -u flink $FLINK_HOME/bin/taskmanager.sh start# 检查启动状态
sudo -u flink $FLINK_HOME/bin/taskmanager.sh status

3. 查看集群状态

# 查看JobManager列表
sudo -u flink $FLINK_HOME/bin/jobmanager.sh list# 查看Web UI
http://flink-jm1:8081
http://flink-jm2:8081
http://flink-jm3:8081

六、高可用验证测试

1. 提交示例作业

$FLINK_HOME/bin/flink run -m flink-jm1:8081 \$FLINK_HOME/examples/streaming/StateMachineExample.jar

2. 故障转移测试

# 查找主JobManager PID
ps aux | grep '[j]obmanager'# 模拟故障,杀死主JobManager
kill -9 <JM_PID># 观察日志(约10-30秒后自动恢复)
tail -f /opt/flink/log/flink-flink-jobmanager-*.log

3. 检查点验证

# 查看检查点状态
hdfs dfs -ls /flink/checkpoints# 列出正在运行的作业
$FLINK_HOME/bin/flink list -m flink-jm2:8081

七、运维管理脚本

1. 集群启动/停止脚本

#!/bin/bash
# flink-cluster.shcase $1 in
start)for jm in flink-jm1 flink-jm2 flink-jm3; dossh flink@$jm "$FLINK_HOME/bin/jobmanager.sh start"donefor tm in flink-tm1 flink-tm2 flink-tm3; dossh flink@$tm "$FLINK_HOME/bin/taskmanager.sh start"done;;
stop)for tm in flink-tm1 flink-tm2 flink-tm3; dossh flink@$tm "$FLINK_HOME/bin/taskmanager.sh stop"donefor jm in flink-jm1 flink-jm2 flink-jm3; dossh flink@$jm "$FLINK_HOME/bin/jobmanager.sh stop"done;;
restart)$0 stopsleep 5$0 start;;
status)for jm in flink-jm1 flink-jm2 flink-jm3; doecho "=== $jm ==="ssh flink@$jm "$FLINK_HOME/bin/jobmanager.sh status"donefor tm in flink-tm1 flink-tm2 flink-tm3; doecho "=== $tm ==="ssh flink@$tm "$FLINK_HOME/bin/taskmanager.sh status"done;;
*)echo "Usage: $0 {start|stop|restart|status}"exit 1;;
esac

2. 日志监控脚本

#!/bin/bash
# monitor-flink-logs.shtail -f /opt/flink/log/flink-flink-*.log \| awk '/ERROR/ {print "\033[31m" $0 "\033[39m"; next}/WARN/ {print "\033[33m" $0 "\033[39m"; next}/Transition.+MASTER/ {print "\033[32m" $0 "\033[39m"; next}{print}'

八、常见问题解决

1. JobManager无法选举

​症状​​:日志中出现No leader available错误
​解决方案​​:

# 检查ZooKeeper连接
$FLINK_HOME/bin/flink list -m zookeeper# 清空临时状态(谨慎操作)
hdfs dfs -rm -r /flink/ha/*

2. TaskManager无法注册

​症状​​:Web UI中不显示TaskManager
​解决方案​​:

# 检查网络连通性
telnet flink-jm1 6123# 检查防火墙
sudo ufw status# 增加网络超时(flink-conf.yaml)
taskmanager.registration.timeout: 5min

3. 检查点失败

​症状​​:作业因检查点超时失败
​解决方案​​:

# 优化配置(flink-conf.yaml)
execution.checkpointing.interval: 2min
execution.checkpointing.timeout: 5min
state.backend.rocksdb.localdir: /data/rocksdb

九、备份与恢复

1. Savepoint操作

# 手动创建Savepoint
flink savepoint <job-id> hdfs:///flink/savepoints# 从Savepoint恢复
flink run -s hdfs:///flink/savepoints/savepoint-... \-m flink-jm1:8081 /path/to/job.jar

2. 配置备份

# 备份关键配置
tar -czvf flink-conf-backup.tar.gz /opt/flink/conf# 备份作业JAR包
hdfs dfs -copyFromLocal /opt/flink/jobs /flink/job-backups

十、安全增强建议

1. 启用Kerberos认证

# flink-conf.yaml
security.kerberos.login.keytab: /etc/security/keytabs/flink.service.keytab
security.kerberos.login.principal: flink/_HOST@REALM
security.kerberos.login.contexts: Client

2. SSL加密通信

# flink-conf.yaml
security.ssl.enabled: true
security.ssl.keystore: /etc/ssl/flink.keystore
security.ssl.truststore: /etc/ssl/flink.truststore
security.ssl.keystore-password: changeme
security.ssl.truststore-password: changeme

3. 访问控制

# Web UI访问限制
rest.address: 127.0.0.1
# 或使用代理+Nginx基础认证

完成上述部署后,您将获得一个高可用的 Flink 集群,能够承受节点故障并保证作业持续运行。建议首次部署完成后进行完整的故障转移测试,确保高可用功能按预期工作。

十一、关联知识

【分布式技术】中间件-分布式协调服务zookeeper

相关文章:

  • 【Algorithm】Union-Find简单介绍
  • Filebeat收集nginx日志到elasticsearch,最终在kibana做展示(二)
  • JAVA之 Lambda
  • 算法训练第九天
  • docker快速部署OS web中间件 数据库 编程应用
  • 第14节 Node.js 全局对象
  • 【推荐算法】WideDeep推荐模型:融合记忆与泛化的智能推荐引擎
  • 37.第二阶段x64游戏实战-封包-寻找Socket套接字
  • Oracle杀进程注意事项
  • oracle数据恢复—oracle数据库执行truncate命令后的怎么恢复数据?
  • Java并发编程实战 Day 9:锁优化技术
  • C语言 — 编译和链接
  • 【杂谈】-吉卜力化(Ghiblified ) AI 图像:艺术与隐私的交织
  • PDF 转 HTML5 —— HTML5 填充图形不支持 Even-Odd 奇偶规则?(第二部分)
  • PyCharm中运行.py脚本程序
  • chrome使用手机调试触屏web
  • 大模型学习
  • ROS2中实现导航仿真
  • Hive SQL常见操作
  • 云服务器宕机或自动重启怎么办
  • 做商城型网站/网站seo分析工具
  • 网络互动公司排名/优化seo
  • 国外免费做网站软件/电脑编程培训学校哪家好
  • 成都市那里有网站建设制作公司/设计本网站
  • 做设计找图片的网站/网站新站整站排名
  • 石家庄网站建设石家庄/网络推广外包想手机蛙软件