当前位置: 首页 > news >正文

深入解析 Flink 批量插入 MariaDB 不生效问题

在使用 Flink 进行数据处理时,批量插入(batch insert)数据库是一种常见的优化策略,可以减少数据库压力,提高写入吞吐量。然而,近期在一个 Flink Job的升级过程 中,我们发现新的 job 老是无法实现数据插入到数据库中,  定位到设置 mariadbBatchSize = 100 时,数据并未插入数据库,而修改为 mariadbBatchSize = 1 后,数据却能立刻写入。为什么会这样呢? 又该如何优化批量写入策略?我们对此进行深入探讨。

问题描述

在 Flink 任务中,我们通过 JDBC Sink 将数据写入 MariaDB,并设置批量写入参数 mariadbBatchSize = 100。然而,在流量较小时,数据并未及时插入数据库,直到流量达到一定程度才触发插入。后来,我们将 mariadbBatchSize 改为 1,数据却能立即写入。

问题分析

1. Flink 批量写入的工作机制

Flink 在进行数据库写入时,通常会采用 批量缓冲 + 触发写入 的方式。其基本流程如下:

  1. 数据缓冲:当 mariadbBatchSize = 100 时,Flink 会将数据缓存在内存中,直到达到 100 条数据才触发批量插入。

  2. 触发条件:一般情况下,以下几种情况会触发数据库写入:

    • 数据量达到 batchSize(如 100 条数据)

    • 达到设定的超时时间(如果 Flink Sink 支持该特性)

    • 任务完成或取消时触发 flush

    • Flink Checkpoint 机制(如果开启了 exactly-once 语义)

  3. 提交数据库:Flink 通过 JDBC 连接池执行 executeBatch(),将缓存的数据批量插入 MariaDB。

2. mariadbBatchSize = 100 导致数据未及时插入的原因

  • 流量不足:当数据流量较小时,缓冲区的数据量达不到 100,导致 executeBatch() 不会被触发。

  • 未设置超时机制:部分 JDBC Sink 并未实现超时机制,即即使数据迟迟未满 100 条,也不会自动触发批量插入。

  • 无 Checkpoint 触发:Flink 可能在 Checkpoint 时才会进行 flush,如果 Checkpoint 频率较低,也可能导致数据长时间滞留在缓冲区。

3. mariadbBatchSize = 1 立即插入的原因

  • 由于 mariadbBatchSize = 1,每来一条数据就会立即执行 executeBatch(),无需等待数据积累到 100 条,从而实现了即时写入

  • 但这样做的缺点是:每一条数据都会单独进行数据库插入,可能导致数据库负载过高,写入吞吐量下降。

解决方案与优化策略

为了解决该问题,我们需要在 降低写入延迟提高数据库吞吐量 之间找到平衡点。以下是一些优化建议:

1. 降低 mariadbBatchSize,避免长时间不触发插入

mariadbBatchSize 适当降低,例如 1020,可以在保证批量写入的同时,减少数据等待的时间。

2. 使用定时 flush 机制

如果 Flink 的 JDBC Sink 支持 flushInterval 机制,可以设定一个合理的时间间隔(如 5 秒),即使数据量未满 batchSize,也会定期写入数据库。

JdbcSink.sink(
    "INSERT INTO my_table (field1, field2) VALUES (?, ?)",
    new JdbcStatementBuilder<MyType>() {
        @Override
        public void accept(PreparedStatement ps, MyType value) throws SQLException {
            ps.setString(1, value.getField1());
            ps.setInt(2, value.getField2());
        }
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(100)
        .withBatchIntervalMs(5000) // 设置定时触发间隔
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mariadb://localhost:3306/mydb")
        .withDriverName("org.mariadb.jdbc.Driver")
        .withUsername("user")
        .withPassword("password")
        .build()
)

3. 配合 Checkpoint 进行 flush

如果任务使用了 exactly-once 语义,可以在 Checkpoint 触发时 flush 数据,确保即使流量较低,数据也不会长期滞留。

JdbcExecutionOptions.builder()
    .withBatchSize(100)
    .withBatchIntervalMs(10000) // 每 10 秒 flush 一次
    .withMaxRetries(3) // 避免批量失败
    .build()

4. 优化数据库连接池

如果 batchSize 较小,单条写入的数据库连接开销较高,可以使用连接池优化,如 HikariCP:

new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:mariadb://localhost:3306/mydb")
    .withDriverName("org.mariadb.jdbc.Driver")
    .withUsername("user")
    .withPassword("password")
    .withConnectionCheckTimeoutSeconds(30) // 避免连接过期
    .build()

5. 监控数据库写入情况

可以通过 Flink 的 metrics 监控批量插入的行为,确保数据写入是符合预期的。

env.getMetricGroup().counter("mariadb_insert_count");

最常见的方案配合 Checkpoint 进行 flush
在开启 exactly-once 语义时,Flink 任务通常依赖 Checkpoint 来保证数据一致性,同时触发 flush,确保未满 batchSize 的数据也能及时写入数据库。这是 生产环境 中最常见的做法,尤其适用于数据一致性要求高的应用。

最好的方案(综合考虑)定时 flush 机制 + Checkpoint 触发
单独依赖 Checkpoint 可能导致写入延迟过高,因此 结合定时 flush(如 5 秒)和 Checkpoint 触发 是最佳实践。这样可以确保:

  1. 流量大时batchSize 触发批量插入,提高吞吐量。
  2. 流量小时flushInterval 触发写入,避免数据长时间积压。
  3. 支持 Checkpoint,保证 exactly-once 语义,避免数据丢失或重复写入。

优化数据库连接池 主要是为了减少数据库连接开销,适用于所有方案,但并不是解决 batchSize 触发问题的核心方法,而是性能优化的一个补充。

综述:

  • 推荐组合:Checkpoint + 定时 flush
  • 最常见方案:Checkpoint 触发 flush
  • 优化连接池:作为辅助优化,提高数据库性能

结论

在 Flink 中使用 JDBC Sink 批量写入 MariaDB 时,需要权衡 mariadbBatchSize 的大小。如果 batchSize 过大且流量较低,可能导致数据长时间滞留,无法及时插入;如果 batchSize 过小,则可能降低数据库写入吞吐量。最佳实践是结合 定时 flush、Checkpoint 触发、优化连接池 等方式,实现高效稳定的数据写入。

相关文章:

  • Linux - 网络基础(网络层,数据链路层)
  • [Python入门学习记录(小甲鱼)]第4章 分支与循环
  • 详细讲解ecovadis奖牌分类
  • 美股表格数据:如何获取和分析历史高频分钟回测数据
  • IDEA与Maven使用-学习记录(持续补充...)
  • EasyBCD,多系统名称或启动顺序的修改,用来配置与调整启动配置数据(BCD)
  • 算法进阶——双指针
  • HarmonyOS Next 属性动画和转场动画
  • 若依前后端分离版使用Electron打包前端Vue为Exe文件
  • GStreamer —— 2.7、Windows下Qt加载GStreamer库后运行 - “教程7:多线程和 Pad 可用性“(附:完整源码)
  • 深入解析Java MDC:日志链路追踪的利器
  • 4.2 使用VNote写作手册页面(CHM助手)
  • Python 第二章:夯实基础,掌握核心
  • 玉米籽粒品质相关性状的GWAS和Meta-QTL分析
  • 【Unity】改变游戏运行时Window的窗口标题
  • DeepSeek + 飞书多维表格搭建你的高效工作流
  • React基础之项目创建
  • 在 CentOS 上,常用几种方法来确保 Python 脚本在断开终端后继续运行
  • Cursor如何调试.Net Core控制台程序
  • (接“使用js去复制网页内容的方法”)js中的execCommand怎么复制富文本内容解析
  • 陕西网站建设设计/全国培训机构排名前十
  • 山西省网站建设/广告留电话号的网站
  • 响应式网站是做列表/618网络营销策划方案
  • 网站开发款计入什么科目/百度产品大全首页
  • 成都网站建设app开发/seo对网站优化
  • 深圳画册设计印刷/网站seo主要是做什么的