Flink 1.20 flink-config.yml 配置详解
Apache Flink 1.20 flink-config.yml 配置详解
本文档详细解释了Apache Flink 1.20版本中flink-config.yml配置文件的各项参数,帮助用户正确配置和优化Flink集群。
目录
- 核心配置参数
- JobManager配置
- TaskManager配置
- 状态后端配置
- 检查点配置
- 内存管理配置
- 网络配置
- 高可用性配置
- 安全配置
- 其他重要配置
- 完整配置示例
核心配置参数
execution.target
execution.target: local
说明: 执行目标环境,可选值包括:
local
: 本地执行remote
: 远程集群执行yarn
: YARN集群执行kubernetes
: Kubernetes集群执行
parallelism.default
parallelism.default: 1
说明: 默认并行度,决定了所有算子的默认并行实例数。
pipeline.auto-watermark-interval
pipeline.auto-watermark-interval: 200ms
说明: 自动水印生成间隔,用于事件时间处理。
JobManager配置
jobmanager.rpc.address
jobmanager.rpc.address: localhost
说明: JobManager的RPC地址,其他组件通过此地址连接JobManager。
jobmanager.rpc.port
jobmanager.rpc.port: 6123
说明: JobManager的RPC端口。
jobmanager.memory.process.size
jobmanager.memory.process.size: 1600m
说明: JobManager进程的总内存大小。
jobmanager.memory.flink.size
jobmanager.memory.flink.size: 1280m
说明: 用于Flink框架的内存大小(不包括JVM Metaspace和直接内存)。
jobmanager.memory.heap.size
jobmanager.memory.heap.size: 1280m
说明: JobManager的JVM堆内存大小。
jobmanager.archive.fs.dir
jobmanager.archive.fs.dir: hdfs:///completed-jobs/
说明: 存储已完成作业信息的目录。
jobmanager.execution.failover-strategy
jobmanager.execution.failover-strategy: region
说明: 作业失败恢复策略:
full
: 全作业重启region
: 基于region的局部重启
TaskManager配置
taskmanager.numberOfTaskSlots
taskmanager.numberOfTaskSlots: 1
说明: 每个TaskManager的slot数量,决定了TaskManager可以并行执行的任务数。
taskmanager.memory.process.size
taskmanager.memory.process.size: 1728m
说明: TaskManager进程的总内存大小。
taskmanager.memory.flink.size
taskmanager.memory.flink.size: 1280m
说明: 用于Flink框架的内存大小。
taskmanager.memory.framework.heap.size
taskmanager.memory.framework.heap.size: 128m
说明: TaskManager框架使用的堆内存大小。
taskmanager.memory.managed.consumer-weights
taskmanager.memory.managed.consumer-weights: operator:70,managed-memory:30
说明: 管理内存消费者权重分配。
taskmanager.memory.network.min
taskmanager.memory.network.min: 64mb
说明: 网络内存的最小值。
taskmanager.memory.network.max
taskmanager.memory.network.max: 1gb
说明: 网络内存的最大值。
taskmanager.memory.network.fraction
taskmanager.memory.network.fraction: 0.1
说明: 网络内存占总内存的比例。
状态后端配置
state.backend
state.backend: hashmap
说明: 状态后端类型:
hashmap
: 内存状态后端(默认)rocksdb
: RocksDB状态后端filesystem
: 文件系统状态后端
state.backend.rocksdb.timer-service.factory
state.backend.rocksdb.timer-service.factory: ROCKSDB
说明: RocksDB定时器服务工厂:
HEAP
: 堆内存储ROCKSDB
: RocksDB存储
state.checkpoints.dir
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
说明: 检查点数据存储目录。
state.savepoints.dir
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
说明: 保存点数据存储目录。
state.backend.incremental
state.backend.incremental: true
说明: 是否启用增量检查点(仅适用于RocksDB状态后端)。
state.backend.fs.memory-threshold
state.backend.fs.memory-threshold: 1024
说明: 状态数据在内存中的阈值(字节),超过此值将写入文件。
检查点配置
state.checkpointing.interval
state.checkpointing.interval: 3min
说明: 检查点间隔时间。
state.checkpointing.timeout
state.checkpointing.timeout: 10min
说明: 检查点超时时间。
state.checkpointing.min-pause
state.checkpointing.min-pause: 500ms
说明: 连续检查点之间的最小暂停时间。
state.checkpointing.max-concurrent-checkpoints
state.checkpointing.max-concurrent-checkpoints: 1
说明: 最大并发检查点数量。
state.checkpointing.mode
state.checkpointing.mode: EXACTLY_ONCE
说明: 检查点模式:
EXACTLY_ONCE
: 精确一次语义AT_LEAST_ONCE
: 至少一次语义
state.checkpointing.alignment.timeout
state.checkpointing.alignment.timeout: 100ms
说明: 检查点对齐超时时间。
state.checkpointing.unaligned
state.checkpointing.unaligned: false
说明: 是否启用非对齐检查点。
state.checkpointing.tolerable-failed-checkpoints
state.checkpointing.tolerable-failed-checkpoints: 3
说明: 可容忍的连续检查点失败次数。
内存管理配置
taskmanager.memory.managed.size
taskmanager.memory.managed.size: 512mb
说明: 托管内存大小,用于排序、缓存中间结果等。
taskmanager.memory.managed.fraction
taskmanager.memory.managed.fraction: 0.4
说明: 托管内存占总内存的比例。
taskmanager.memory.jvm-metaspace.size
taskmanager.memory.jvm-metaspace.size: 256mb
说明: JVM Metaspace内存大小。
taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.min: 192mb
说明: JVM开销内存最小值。
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.max: 1gb
说明: JVM开销内存最大值。
taskmanager.memory.jvm-overhead.fraction
taskmanager.memory.jvm-overhead.fraction: 0.1
说明: JVM开销内存占总内存的比例。
网络配置
taskmanager.network.memory.fraction
taskmanager.network.memory.fraction: 0.1
说明: 网络内存占总内存的比例。
taskmanager.network.memory.min
taskmanager.network.memory.min: 64mb
说明: 网络内存最小值。
taskmanager.network.memory.max
taskmanager.network.memory.max: 1gb
说明: 网络内存最大值。
taskmanager.network.request-backoff.initial
taskmanager.network.request-backoff.initial: 100ms
说明: 网络请求初始退避时间。
taskmanager.network.request-backoff.max
taskmanager.network.request-backoff.max: 10000ms
说明: 网络请求最大退避时间。
taskmanager.network.detailed-metrics
taskmanager.network.detailed-metrics: false
说明: 是否启用详细的网络指标。
高可用性配置
high-availability
high-availability: zookeeper
说明: 高可用性模式:
NONE
: 不启用HA(默认)zookeeper
: 使用ZooKeeper实现HA
high-availability.storageDir
high-availability.storageDir: hdfs:///flink/ha/
说明: HA元数据存储目录。
high-availability.zookeeper.quorum
high-availability.zookeeper.quorum: localhost:2181
说明: ZooKeeper集群地址列表。
high-availability.zookeeper.path.root
high-availability.zookeeper.path.root: /flink
说明: ZooKeeper中Flink根路径。
high-availability.cluster-id
high-availability.cluster-id: /default_ns
说明: HA集群ID。
high-availability.jobmanager.port
high-availability.jobmanager.port: 6123
说明: HA模式下JobManager端口。
安全配置
security.ssl.enabled
security.ssl.enabled: false
说明: 是否启用SSL/TLS加密。
security.ssl.keystore
security.ssl.keystore: /path/to/keystore.jks
说明: SSL密钥库路径。
security.ssl.keystore-password
security.ssl.keystore-password: password
说明: SSL密钥库密码。
security.ssl.key-password
security.ssl.key-password: password
说明: SSL密钥密码。
security.ssl.truststore
security.ssl.truststore: /path/to/truststore.jks
说明: SSL信任库路径。
security.ssl.truststore-password
security.ssl.truststore-password: password
说明: SSL信任库密码。
security.ssl.algorithm
security.ssl.algorithm: SunX509
说明: SSL算法。
security.ssl.protocol
security.ssl.protocol: TLSv1.2
说明: SSL协议版本。
其他重要配置
rest.port
rest.port: 8081
说明: REST API端口。
rest.address
rest.address: localhost
说明: REST API绑定地址。
rest.bind-port
rest.bind-port: 8081
说明: REST API绑定端口。
rest.bind-address
rest.bind-address: 0.0.0.0
说明: REST API绑定地址。
web.upload.dir
web.upload.dir: /tmp/flink-web-upload
说明: Web界面文件上传目录。
web.tmp.dir
web.tmp.dir: /tmp
说明: Web界面临时文件目录。
metrics.reporter
metrics.reporter: prometheus
说明: 指标报告器类型。
metrics.reporter.prom.class
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
说明: Prometheus指标报告器类。
metrics.reporter.prom.port
metrics.reporter.prom.port: 9249
说明: Prometheus报告器端口。
classloader.resolve-order
classloader.resolve-order: parent-first
说明: 类加载器解析顺序:
child-first
: 子类加载器优先parent-first
: 父类加载器优先
classloader.check-leaked-classloader
classloader.check-leaked-classloader: true
说明: 是否检查类加载器泄漏。
fs.default-scheme
fs.default-scheme: file:///
说明: 默认文件系统方案。
fs.hdfs.hadoopconf
fs.hdfs.hadoopconf: /path/to/hadoop/conf/
说明: Hadoop配置目录路径。
完整配置示例
以下是一个完整的Flink 1.20配置示例,适用于生产环境:
# ===================================================================
# 核心配置
# ===================================================================
execution.target: remote
parallelism.default: 4
pipeline.auto-watermark-interval: 200ms# ===================================================================
# JobManager 配置
# ===================================================================
jobmanager.rpc.address: jobmanager-host
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 4g
jobmanager.memory.flink.size: 3g
jobmanager.memory.heap.size: 3g
jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/
jobmanager.execution.failover-strategy: region# ===================================================================
# TaskManager 配置
# ===================================================================
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 8g
taskmanager.memory.flink.size: 6g
taskmanager.memory.framework.heap.size: 128m
taskmanager.memory.managed.consumer-weights: operator:70,managed-memory:30
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
taskmanager.memory.network.fraction: 0.1# ===================================================================
# 状态后端配置
# ===================================================================
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
state.backend.incremental: true
state.backend.fs.memory-threshold: 1024# ===================================================================
# 检查点配置
# ===================================================================
state.checkpointing.interval: 5min
state.checkpointing.timeout: 10min
state.checkpointing.min-pause: 500ms
state.checkpointing.max-concurrent-checkpoints: 1
state.checkpointing.mode: EXACTLY_ONCE
state.checkpointing.alignment.timeout: 100ms
state.checkpointing.unaligned: false
state.checkpointing.tolerable-failed-checkpoints: 3# ===================================================================
# 内存管理配置
# ===================================================================
taskmanager.memory.managed.size: 2g
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.min: 192mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.1# ===================================================================
# 网络配置
# ===================================================================
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.request-backoff.initial: 100ms
taskmanager.network.request-backoff.max: 10000ms
taskmanager.network.detailed-metrics: false# ===================================================================
# 高可用性配置
# ===================================================================
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default_ns
high-availability.jobmanager.port: 6123# ===================================================================
# 安全配置
# ===================================================================
security.ssl.enabled: false
# 如果启用SSL,请取消注释以下配置
# security.ssl.keystore: /path/to/keystore.jks
# security.ssl.keystore-password: password
# security.ssl.key-password: password
# security.ssl.truststore: /path/to/truststore.jks
# security.ssl.truststore-password: password
# security.ssl.algorithm: SunX509
# security.ssl.protocol: TLSv1.2# ===================================================================
# Web和REST配置
# ===================================================================
rest.port: 8081
rest.address: localhost
rest.bind-port: 8081
rest.bind-address: 0.0.0.0
web.upload.dir: /tmp/flink-web-upload
web.tmp.dir: /tmp# ===================================================================
# 指标配置
# ===================================================================
metrics.reporter: prometheus
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249# ===================================================================
# 类加载器配置
# ===================================================================
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: true# ===================================================================
# 文件系统配置
# ===================================================================
fs.default-scheme: hdfs://namenode:9000
fs.hdfs.hadoopconf: /path/to/hadoop/conf/# ===================================================================
# 其他配置
# ===================================================================
env.java.opts: -XX:+UseG1GC -XX:+PrintGC
env.java.opts.jobmanager: -XX:+UseG1GC -XX:+PrintGC
env.java.opts.taskmanager: -XX:+UseG1GC -XX:+PrintGC
配置最佳实践
-
内存配置:
- 根据实际数据量和处理需求调整内存分配
- 对于大状态作业,增加托管内存和网络内存
- 监控JVM GC情况,必要时调整GC参数
-
检查点配置:
- 根据数据重要性选择检查点模式(EXACTLY_ONCE或AT_LEAST_ONCE)
- 合理设置检查点间隔,平衡性能和容错能力
- 启用增量检查点以减少存储开销(适用于RocksDB)
-
并行度配置:
- 根据数据量和计算复杂度设置合适的并行度
- 考虑集群资源和网络带宽限制
- 不同算子可设置不同的并行度
-
高可用性配置:
- 生产环境建议启用HA模式
- 使用可靠的ZooKeeper集群
- 确保元数据存储的可靠性
-
监控配置:
- 启用指标报告器(如Prometheus)
- 配置适当的日志级别
- 设置告警阈值
通过合理配置这些参数,可以充分发挥Flink的性能优势,确保作业的稳定运行。