当前位置: 首页 > news >正文

Flink 1.20 flink-config.yml 配置详解

Apache Flink 1.20 flink-config.yml 配置详解

本文档详细解释了Apache Flink 1.20版本中flink-config.yml配置文件的各项参数,帮助用户正确配置和优化Flink集群。

目录

  1. 核心配置参数
  2. JobManager配置
  3. TaskManager配置
  4. 状态后端配置
  5. 检查点配置
  6. 内存管理配置
  7. 网络配置
  8. 高可用性配置
  9. 安全配置
  10. 其他重要配置
  11. 完整配置示例

核心配置参数

execution.target

execution.target: local

说明: 执行目标环境,可选值包括:

  • local: 本地执行
  • remote: 远程集群执行
  • yarn: YARN集群执行
  • kubernetes: Kubernetes集群执行

parallelism.default

parallelism.default: 1

说明: 默认并行度,决定了所有算子的默认并行实例数。

pipeline.auto-watermark-interval

pipeline.auto-watermark-interval: 200ms

说明: 自动水印生成间隔,用于事件时间处理。

JobManager配置

jobmanager.rpc.address

jobmanager.rpc.address: localhost

说明: JobManager的RPC地址,其他组件通过此地址连接JobManager。

jobmanager.rpc.port

jobmanager.rpc.port: 6123

说明: JobManager的RPC端口。

jobmanager.memory.process.size

jobmanager.memory.process.size: 1600m

说明: JobManager进程的总内存大小。

jobmanager.memory.flink.size

jobmanager.memory.flink.size: 1280m

说明: 用于Flink框架的内存大小(不包括JVM Metaspace和直接内存)。

jobmanager.memory.heap.size

jobmanager.memory.heap.size: 1280m

说明: JobManager的JVM堆内存大小。

jobmanager.archive.fs.dir

jobmanager.archive.fs.dir: hdfs:///completed-jobs/

说明: 存储已完成作业信息的目录。

jobmanager.execution.failover-strategy

jobmanager.execution.failover-strategy: region

说明: 作业失败恢复策略:

  • full: 全作业重启
  • region: 基于region的局部重启

TaskManager配置

taskmanager.numberOfTaskSlots

taskmanager.numberOfTaskSlots: 1

说明: 每个TaskManager的slot数量,决定了TaskManager可以并行执行的任务数。

taskmanager.memory.process.size

taskmanager.memory.process.size: 1728m

说明: TaskManager进程的总内存大小。

taskmanager.memory.flink.size

taskmanager.memory.flink.size: 1280m

说明: 用于Flink框架的内存大小。

taskmanager.memory.framework.heap.size

taskmanager.memory.framework.heap.size: 128m

说明: TaskManager框架使用的堆内存大小。

taskmanager.memory.managed.consumer-weights

taskmanager.memory.managed.consumer-weights: operator:70,managed-memory:30

说明: 管理内存消费者权重分配。

taskmanager.memory.network.min

taskmanager.memory.network.min: 64mb

说明: 网络内存的最小值。

taskmanager.memory.network.max

taskmanager.memory.network.max: 1gb

说明: 网络内存的最大值。

taskmanager.memory.network.fraction

taskmanager.memory.network.fraction: 0.1

说明: 网络内存占总内存的比例。

状态后端配置

state.backend

state.backend: hashmap

说明: 状态后端类型:

  • hashmap: 内存状态后端(默认)
  • rocksdb: RocksDB状态后端
  • filesystem: 文件系统状态后端

state.backend.rocksdb.timer-service.factory

state.backend.rocksdb.timer-service.factory: ROCKSDB

说明: RocksDB定时器服务工厂:

  • HEAP: 堆内存储
  • ROCKSDB: RocksDB存储

state.checkpoints.dir

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

说明: 检查点数据存储目录。

state.savepoints.dir

state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

说明: 保存点数据存储目录。

state.backend.incremental

state.backend.incremental: true

说明: 是否启用增量检查点(仅适用于RocksDB状态后端)。

state.backend.fs.memory-threshold

state.backend.fs.memory-threshold: 1024

说明: 状态数据在内存中的阈值(字节),超过此值将写入文件。

检查点配置

state.checkpointing.interval

state.checkpointing.interval: 3min

说明: 检查点间隔时间。

state.checkpointing.timeout

state.checkpointing.timeout: 10min

说明: 检查点超时时间。

state.checkpointing.min-pause

state.checkpointing.min-pause: 500ms

说明: 连续检查点之间的最小暂停时间。

state.checkpointing.max-concurrent-checkpoints

state.checkpointing.max-concurrent-checkpoints: 1

说明: 最大并发检查点数量。

state.checkpointing.mode

state.checkpointing.mode: EXACTLY_ONCE

说明: 检查点模式:

  • EXACTLY_ONCE: 精确一次语义
  • AT_LEAST_ONCE: 至少一次语义

state.checkpointing.alignment.timeout

state.checkpointing.alignment.timeout: 100ms

说明: 检查点对齐超时时间。

state.checkpointing.unaligned

state.checkpointing.unaligned: false

说明: 是否启用非对齐检查点。

state.checkpointing.tolerable-failed-checkpoints

state.checkpointing.tolerable-failed-checkpoints: 3

说明: 可容忍的连续检查点失败次数。

内存管理配置

taskmanager.memory.managed.size

taskmanager.memory.managed.size: 512mb

说明: 托管内存大小,用于排序、缓存中间结果等。

taskmanager.memory.managed.fraction

taskmanager.memory.managed.fraction: 0.4

说明: 托管内存占总内存的比例。

taskmanager.memory.jvm-metaspace.size

taskmanager.memory.jvm-metaspace.size: 256mb

说明: JVM Metaspace内存大小。

taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.min: 192mb

说明: JVM开销内存最小值。

taskmanager.memory.jvm-overhead.max

taskmanager.memory.jvm-overhead.max: 1gb

说明: JVM开销内存最大值。

taskmanager.memory.jvm-overhead.fraction

taskmanager.memory.jvm-overhead.fraction: 0.1

说明: JVM开销内存占总内存的比例。

网络配置

taskmanager.network.memory.fraction

taskmanager.network.memory.fraction: 0.1

说明: 网络内存占总内存的比例。

taskmanager.network.memory.min

taskmanager.network.memory.min: 64mb

说明: 网络内存最小值。

taskmanager.network.memory.max

taskmanager.network.memory.max: 1gb

说明: 网络内存最大值。

taskmanager.network.request-backoff.initial

taskmanager.network.request-backoff.initial: 100ms

说明: 网络请求初始退避时间。

taskmanager.network.request-backoff.max

taskmanager.network.request-backoff.max: 10000ms

说明: 网络请求最大退避时间。

taskmanager.network.detailed-metrics

taskmanager.network.detailed-metrics: false

说明: 是否启用详细的网络指标。

高可用性配置

high-availability

high-availability: zookeeper

说明: 高可用性模式:

  • NONE: 不启用HA(默认)
  • zookeeper: 使用ZooKeeper实现HA

high-availability.storageDir

high-availability.storageDir: hdfs:///flink/ha/

说明: HA元数据存储目录。

high-availability.zookeeper.quorum

high-availability.zookeeper.quorum: localhost:2181

说明: ZooKeeper集群地址列表。

high-availability.zookeeper.path.root

high-availability.zookeeper.path.root: /flink

说明: ZooKeeper中Flink根路径。

high-availability.cluster-id

high-availability.cluster-id: /default_ns

说明: HA集群ID。

high-availability.jobmanager.port

high-availability.jobmanager.port: 6123

说明: HA模式下JobManager端口。

安全配置

security.ssl.enabled

security.ssl.enabled: false

说明: 是否启用SSL/TLS加密。

security.ssl.keystore

security.ssl.keystore: /path/to/keystore.jks

说明: SSL密钥库路径。

security.ssl.keystore-password

security.ssl.keystore-password: password

说明: SSL密钥库密码。

security.ssl.key-password

security.ssl.key-password: password

说明: SSL密钥密码。

security.ssl.truststore

security.ssl.truststore: /path/to/truststore.jks

说明: SSL信任库路径。

security.ssl.truststore-password

security.ssl.truststore-password: password

说明: SSL信任库密码。

security.ssl.algorithm

security.ssl.algorithm: SunX509

说明: SSL算法。

security.ssl.protocol

security.ssl.protocol: TLSv1.2

说明: SSL协议版本。

其他重要配置

rest.port

rest.port: 8081

说明: REST API端口。

rest.address

rest.address: localhost

说明: REST API绑定地址。

rest.bind-port

rest.bind-port: 8081

说明: REST API绑定端口。

rest.bind-address

rest.bind-address: 0.0.0.0

说明: REST API绑定地址。

web.upload.dir

web.upload.dir: /tmp/flink-web-upload

说明: Web界面文件上传目录。

web.tmp.dir

web.tmp.dir: /tmp

说明: Web界面临时文件目录。

metrics.reporter

metrics.reporter: prometheus

说明: 指标报告器类型。

metrics.reporter.prom.class

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

说明: Prometheus指标报告器类。

metrics.reporter.prom.port

metrics.reporter.prom.port: 9249

说明: Prometheus报告器端口。

classloader.resolve-order

classloader.resolve-order: parent-first

说明: 类加载器解析顺序:

  • child-first: 子类加载器优先
  • parent-first: 父类加载器优先

classloader.check-leaked-classloader

classloader.check-leaked-classloader: true

说明: 是否检查类加载器泄漏。

fs.default-scheme

fs.default-scheme: file:///

说明: 默认文件系统方案。

fs.hdfs.hadoopconf

fs.hdfs.hadoopconf: /path/to/hadoop/conf/

说明: Hadoop配置目录路径。

完整配置示例

以下是一个完整的Flink 1.20配置示例,适用于生产环境:

# ===================================================================
# 核心配置
# ===================================================================
execution.target: remote
parallelism.default: 4
pipeline.auto-watermark-interval: 200ms# ===================================================================
# JobManager 配置
# ===================================================================
jobmanager.rpc.address: jobmanager-host
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4g
jobmanager.memory.flink.size: 3g
jobmanager.memory.heap.size: 3g
jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/
jobmanager.execution.failover-strategy: region# ===================================================================
# TaskManager 配置
# ===================================================================
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 8g
taskmanager.memory.flink.size: 6g
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.managed.consumer-weights: operator:70,managed-memory:30
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
taskmanager.memory.network.fraction: 0.1# ===================================================================
# 状态后端配置
# ===================================================================
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
state.backend.incremental: true
state.backend.fs.memory-threshold: 1024# ===================================================================
# 检查点配置
# ===================================================================
state.checkpointing.interval: 5min
state.checkpointing.timeout: 10min
state.checkpointing.min-pause: 500ms
state.checkpointing.max-concurrent-checkpoints: 1
state.checkpointing.mode: EXACTLY_ONCE
state.checkpointing.alignment.timeout: 100ms
state.checkpointing.unaligned: false
state.checkpointing.tolerable-failed-checkpoints: 3# ===================================================================
# 内存管理配置
# ===================================================================
taskmanager.memory.managed.size: 2g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.min: 192mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.1# ===================================================================
# 网络配置
# ===================================================================
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.request-backoff.initial: 100ms
taskmanager.network.request-backoff.max: 10000ms
taskmanager.network.detailed-metrics: false# ===================================================================
# 高可用性配置
# ===================================================================
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default_ns
high-availability.jobmanager.port: 6123# ===================================================================
# 安全配置
# ===================================================================
security.ssl.enabled: false
# 如果启用SSL,请取消注释以下配置
# security.ssl.keystore: /path/to/keystore.jks
# security.ssl.keystore-password: password
# security.ssl.key-password: password
# security.ssl.truststore: /path/to/truststore.jks
# security.ssl.truststore-password: password
# security.ssl.algorithm: SunX509
# security.ssl.protocol: TLSv1.2# ===================================================================
# Web和REST配置
# ===================================================================
rest.port: 8081
rest.address: localhost
rest.bind-port: 8081
rest.bind-address: 0.0.0.0
web.upload.dir: /tmp/flink-web-upload
web.tmp.dir: /tmp# ===================================================================
# 指标配置
# ===================================================================
metrics.reporter: prometheus
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249# ===================================================================
# 类加载器配置
# ===================================================================
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: true# ===================================================================
# 文件系统配置
# ===================================================================
fs.default-scheme: hdfs://namenode:9000
fs.hdfs.hadoopconf: /path/to/hadoop/conf/# ===================================================================
# 其他配置
# ===================================================================
env.java.opts: -XX:+UseG1GC -XX:+PrintGC
env.java.opts.jobmanager: -XX:+UseG1GC -XX:+PrintGC
env.java.opts.taskmanager: -XX:+UseG1GC -XX:+PrintGC

配置最佳实践

  1. 内存配置

    • 根据实际数据量和处理需求调整内存分配
    • 对于大状态作业,增加托管内存和网络内存
    • 监控JVM GC情况,必要时调整GC参数
  2. 检查点配置

    • 根据数据重要性选择检查点模式(EXACTLY_ONCE或AT_LEAST_ONCE)
    • 合理设置检查点间隔,平衡性能和容错能力
    • 启用增量检查点以减少存储开销(适用于RocksDB)
  3. 并行度配置

    • 根据数据量和计算复杂度设置合适的并行度
    • 考虑集群资源和网络带宽限制
    • 不同算子可设置不同的并行度
  4. 高可用性配置

    • 生产环境建议启用HA模式
    • 使用可靠的ZooKeeper集群
    • 确保元数据存储的可靠性
  5. 监控配置

    • 启用指标报告器(如Prometheus)
    • 配置适当的日志级别
    • 设置告警阈值

通过合理配置这些参数,可以充分发挥Flink的性能优势,确保作业的稳定运行。

http://www.dtcms.com/a/499609.html

相关文章:

  • 湖州网站集约化平台南京做网站哪家公司好
  • 【深度学习新浪潮】2025全球机器学习技术大会:Agent技术突破与产业落地全景解析
  • 长沙整站优化梅河口信息网
  • Linux入门:动静态库的理解与加载
  • 云上救火指南:AWS常见服务告警的快速恢复与最小影响方案
  • 九号线香网站建设淘客网站后台怎么做
  • FPGA 入门 3 个月学习计划表
  • 专业的做网站网站做外链好嘛
  • ios android 小程序 蓝牙 CRC16_MODBUS
  • 沧州网站建设优化案例怎么创建一个网站
  • 【小沐杂货铺】基于Three.js渲染三维风力发电机(WebGL、vue、react、WindTurbine)
  • Socket 网络编程
  • 哪里可以做网站网站兼容性怎么解决
  • 网站备案流程实名认证哪个平台做网站好
  • 最版网站建设案例中国建设银行开户行查询
  • 衡水网站制作多少钱世界著名产品设计作品
  • 我们如何更好地相处和协作?
  • Vlanif的作用
  • 62.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--新增功能--自训练ML模型
  • 北京住房城乡建设网站秦皇岛手机网站制作公司
  • 【SpringBoot从初学者到专家的成长18】SpringBoot中的数据持久化:JPA与Hibernate的结合
  • Ubuntu服务器已下载Nginx安装包的安装指南
  • 高可用Prometheus问题集锦
  • wap建站模板物流网站怎么开
  • 【Leetcode hot 100】763.划分字母区间
  • Agent向量存储中的记忆衰退与记忆过载解决方案
  • php网站跟随导航扁平化配色方案网站
  • 降噪算法的效果分析
  • FreeSWITCH RTP 自动调整花费时间太久
  • 怎么在一个网站做编辑一流的镇江网站优化