当前位置: 首页 > news >正文

Flink SQL 性能优化实战

最近我们组在大规模上线Flink SQL作业。首先,在进行跑批量初始化完历史数据后,剩下的就是消费Kafka历史数据进行追数了。但是发现某些作业的追数过程十分缓慢,要运行一晚上甚至三四天才能追上最新数据。由于是实时数仓指标计算上线初期,经常验证作业如果有问题就得重蹈覆辙重新追数,效率很低,于是我开始分析Flink SQL的优化。

问题


insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a

上面这个作业的简化版SQL,主要就是做一个分组聚合:

  1. 从tableA分组聚合出结果插入tableB
  2. tableA的联合主键是:a,b(但是a的离散度已经很高了)
  3. tableA的Flink表类型为upset-kafka
  4. tableB的Flink表类型为HBase

初步分析


这个作业跑在集群上的job graph如下:

可以看到有三个vertex:

  1. 第一个是TableSourceScan
  2. 第二个是ChangelogNormalize
  3. 第三个是GroupAggregate

TableSourceScan接入tableA表的upsert-kafka流;

ChangelogNormalize对upset-kafka进行撤回语义的解析;

GroupAggregate对撤回流进行分组聚合,然后写入tableB的HBase;

优化思路1:local/global agg


agg分类:

  • group agg
select count(a) from t group by b
  • over agg
select count(a) over (partition by b order by c) from t
  • window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b

local/global agg:

核心思想与hadoop的combiner是一致的,就是在mapreduce的过程中,在map阶段就做一个预聚合,即combine操作。

[图片上传失败…(image-c0ad24-1650075387085)]

带来的收益是:减少网络shuffle数据,提升计算引擎的性能。

前提条件:

  1. agg的所有agg function都是mergeable(实现merge方法)
  2. table.optimizer.agg-phase-strategy为AUTO或TWO_PHASE
  3. Stream下,minibatch开启;Batch下,AUTO会根据cost选择

解释说明:

mergeable其实就是能用分治法解决的计算问题,例如sum、count等,而avg就不能用分治法先计算部分元素的avg,再计算最终avg了,结果有时候会出错。

table.optimizer.agg-phase-strategy:默认为AUTO,意思是引擎尽量做预聚合;TWO_PHASE表示所有聚合操作都做预聚合;ONE_PHASE表示所有聚合都不做预聚合。

minibatch:即开启微批模式。主要有三个参数:

table.exec.mini-batch.enabled:是否开启,默认不开启
table.exec.mini-batch.size:微批的record buffer大小
table.exec.mini-batch.allow-latency:微批的time buffer大小

minibatch的本质就是平衡实时性和吞吐量的刻度尺。

所以,local/global agg一共需要三个参数控制。

验证


经过对比验证,在这个SQL场景下的效率提升很小。

local/global agg降低了第二个vertex即ChangelogNormalize的sent records的数据量,而并没有使得第一个vertex的数据处理效率有显著提升。

所以,这个作业的瓶颈并不在vertex间, 而在于第一个vertex的处理数据效率。

优化思路二:调大并行度


这个思路的关键在于source upsert-kafka的分区数,这是制约吞吐量的瓶颈。因为在upsert-kafka中,每个partition最多被一个Flink线程读取。

增加了10倍的并行度,source分区也增加10倍后,作业周转时间缩短了将近一半。

优化思路三:RocksDB性能调优


仔细分析这个SQL作业,是对一个联合主键的字段做group by,那么state一定会非常大。

经过在对这个表在数仓中的数据进行分析,发现这个字段的离散度几乎接近于主键的离散度。

而进行group by必然要根据每一条upsert kafka的数据去查验在flink statebackend中物化的source table中该字段值的分布情况,这应该是才是瓶颈所在!

沿着这个思路,开始分析Flink的statebackend机制。

这里我们简单回顾一下Flink statebackend(后面再做专题总结):

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

我们的线上一般采用的是RocksDB作为状态后端,checkpoint dir采用hdfs文件系统。其实我个人觉得这个应该根据作业的特性进行选择,根据我个人的经验以及知识沉淀,选择的主要因素是作业的state大小及对处理数据性能的要求:

  • RocksDBStateBackend可以突破内存的限制,rocksDB的数据逻辑结构和redis相似,但是数据的物理存储结构又和hbase相似,继承自levelDB的LSM树思想,缺点是性能太低
  • 而FsStateBackend是在做snapshot的时候才将内存的state持久化到远端,速度接近于内存状态
  • MemoryStateBackend是纯内存的,一般只用做调试。

但是由于这个大状态作业追数速度实在太慢,我甚至想过:

在追数的时候用FsStateBackend,并配置大内存,且把managed memory调成0,同时将ck的周期设置的很大,基本上不做ck,追上后savepoint。再把状态后端换成RocksDB,并且从FSSatebackend的savepoint处恢复,但是发现1.13才支持savepoint切换statebackend类型。

只剩下调优RocksDB一条路了。根据之前对HBase的LSM原理的理解,进行知识迁移,马上对RocksDB有了一定的认识。在HBase中调优效果最明显无乎:

blockcache读缓存、memStore写缓存、增加布隆过滤器、提升compact效率

沿着这个思路,再查阅了一番RocksDB资料后,决定先对如下参数进行调优:

  • state.backend.rocksdb.block.cache-size
state.backend.rocksdb.block.blocksize

Block 块是 RocksDB 保存在磁盘中的 SST 文件的基本单位,它包含了一系列列有序的 Key 和 Value 集合,可以设置固定的大小。

但是,通过增加 Block Size,会显著增加读放大(Read Amplification)效应,令读取数据时,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小没有变,就会⼤大减少 Cache 中可存放的 Block 数。如果 Cache 中还存处理索引和过滤器等内容,那么可放置的数据块数目就会更少,可能需要更多的磁盘 IO 操作,找到数据就更更慢了,此时读取性能会大幅下降。反之,如果减小BlockSize,会让读的性能有不少提升,但是写性能会下降,⽽而且对 SSD 寿命也不利。

因此我的调优经验是,如果需要增加 Block Size 的大小来提升读写性能,请务必一并增加 Block Cache Size 的大小,这样才可以取得比较好的读写性能。Block Cache,缓存清除算法⽤用的是 LRU(Least Recently Used)。

验证


测试对比后发现,原本半天左右完成的作业只需要一到两个小时即可追上数据!

感悟


性能调优就如同把脉治病,关键在于对症下药。

前期,要分析当前场景下真正制约性能的瓶颈所在,后期,在症结处用效果最明显的方式处理症结。


文章转载自:
http://chaffingly.wkuuf.cn
http://chokecherry.wkuuf.cn
http://afghanistani.wkuuf.cn
http://bardlet.wkuuf.cn
http://aonb.wkuuf.cn
http://adze.wkuuf.cn
http://calculagraph.wkuuf.cn
http://bedrabble.wkuuf.cn
http://antimechanized.wkuuf.cn
http://belletristic.wkuuf.cn
http://antisepticise.wkuuf.cn
http://anaesthetization.wkuuf.cn
http://baldicoot.wkuuf.cn
http://begohm.wkuuf.cn
http://britches.wkuuf.cn
http://acarpelous.wkuuf.cn
http://antidiphtheritic.wkuuf.cn
http://asbestotic.wkuuf.cn
http://biotoxicology.wkuuf.cn
http://bonkers.wkuuf.cn
http://brainwash.wkuuf.cn
http://buccolingual.wkuuf.cn
http://bokhara.wkuuf.cn
http://brawler.wkuuf.cn
http://aplacental.wkuuf.cn
http://bauble.wkuuf.cn
http://amphora.wkuuf.cn
http://barquisimeto.wkuuf.cn
http://beguiler.wkuuf.cn
http://aubrietia.wkuuf.cn
http://www.dtcms.com/a/280445.html

相关文章:

  • 使用Dify+fastmcp 实现mcp服务,内含详细步骤与源码
  • Windows远程FX的编解码器性能优化
  • 算法在前端框架中的集成
  • 三十二、【核心功能改造】数据驱动:重构仪表盘与关键指标可视化
  • 原型继承(prototypal inheritance)的工作原理
  • Java实现word、pdf转html保留格式
  • 19.如何将 Python 字符串转换为 Slug
  • 全面安装指南:在Linux、Windows和macOS上部署Apache Cassandra
  • 基于STM32与中航ZH-E3L字符卡通信在LED屏显示数据
  • 华为敏态开发流程敏捷开发费用估算敏态IT财务分析模板
  • 进程探秘:从 PCB 到 fork 的核心原理之旅
  • Lang3
  • Spring Ioc Bean 到底是什么
  • 朝鲜升级供应链恶意软件XORIndex,再次瞄准npm生态系统
  • 从springcloud-gateway了解同步和异步,webflux webMvc、共享变量
  • 四种高效搭建SpringBoot项目的方式详解
  • 基于UDP/IP网络游戏加速高级拥塞控制算法(示意:一)
  • SpringBoot 实现 Redis读写分离
  • 【PTA数据结构 | C语言版】根据前序序列重构二叉树
  • npm install failed如何办?
  • 【10】MFC入门到精通——MFC 创建向导对话框、属性页类、属性表类、代码
  • centos 安装java 环境
  • FreeRTOS学习笔记——总览
  • 【Docker基础】Dockerfile构建与运行流程完全指南:从原理到实践优化
  • CentOS 8-BClinux8.2更换为阿里云镜像源:保姆级教程
  • 【第二章自定义功能菜单_MenuItemAttribute_顶部菜单栏(本章进度1/7)】
  • Rust基础-part5-引用
  • 【jvm|基本原理】第四天
  • 游戏行业中的恶梦:不断升级的DDoS攻击
  • 深入理解C++11 std::iota:从原理到实践