当前位置: 首页 > news >正文

Flink2.0学习笔记:使用HikariCP 自定义sink实现数据库连接池化

stevensu1/EC0823: Flink2.0学习笔记:使用HikariCP 自定义sink实现数据库连接池化

Flink 流处理应用中使用 HikariCP 实现数据库连接池化,对于写入关系型数据库(如 MySQL、PostgreSQL)的 自定义 Sink 来说,不仅是推荐的,而且在大多数生产场景下是必要的

下面我们从 性能、稳定性、资源管理、容错性 等多个维度深入分析其必要性。

✅ 一、为什么需要连接池?—— 核心问题

在没有连接池的情况下,每次写入数据库都要:

Connection conn = DriverManager.getConnection(url, user, pass);PreparedStatement ps = conn.prepareStatement(sql);ps.executeUpdate();conn.close();

这会带来以下严重问题:

🔁 连接创建开销大

TCP 握手、认证、权限校验等耗时(每次 ~10~100ms)

📉 性能急剧下降

高频写入时,90% 时间花在建连,而非写数据

🧱 数据库连接风暴

每个并行子任务频繁创建连接,打满数据库max_connections

🐞 资源泄露风险

忘记关闭连接 → 连接泄漏 → 数据库拒绝服务

📡 网络抖动敏感

瞬时故障导致连接失败,无重试/熔断机制

🔥 结论:无连接池 = 不可用于生产环境


✅ 二、HikariCP 的优势(为什么选它?)

HikariCP 是目前 最快、最轻量、最稳定的 JDBC 连接池,被 Spring Boot 等主流框架默认集成。

⚡ 极致性能

基于字节码优化、无锁设计,延迟极低

📊 连接管理

支持最大/最小连接数、空闲超时、生命周期控制

🛡️ 健康检查

自动检测失效连接并重建

📈 监控支持

提供 JMX 指标(活跃连接、等待线程等)

🧩 配置灵活

支持超时、重试、泄漏检测等


✅ 三、Flink 自定义 Sink 中使用 HikariCP 的必要性分析

1. ✅ 高吞吐写入场景(如 10 亿数据)

单条写入耗时

~50ms(含建连)

~2ms(复用连接)

吞吐量

~20 条/秒

~5000 条/秒

是否可行

❌ 不可行

✅ 可行

💡 在 10 亿数据场景下,使用连接池可将写入时间从 数天缩短到数小时


2. ✅ 并行写入(Flink 并行度 > 1)

Flink Sink 可以设置并行度(如 parallelism=4),每个 subtask 独立运行。

连接数爆炸

4 个 subtask × 频繁创建 = 不可控

池内统一管理,总数可控

资源竞争

数据库连接耗尽

最大连接数限制,避免雪崩

性能一致性

每次建连时间波动大

连接复用,延迟稳定

✅ HikariCP 可以限制总连接数,避免压垮数据库


3. ✅ Checkpoint 与事务一致性

Flink 的 Exactly-Once 语义 依赖事务提交与 Checkpoint 对齐。

beginTransaction()while (records) {addBatch()}executeBatch()commit() // 与 Checkpoint 同步
  • 如果每次都要新建连接,事务无法延续
  • HikariCP 支持连接复用,确保同一个 subtask 在一个 Checkpoint 周期内使用同一个连接(或事务安全)

✅ 连接池是实现 事务级 Exactly-Once 的基础


4. ✅ 容错与重试机制

Flink 支持失败重试(如网络抖动、数据库短暂不可用)。

连接失败

直接抛异常

自动重试、健康检查

连接泄漏

长时间运行后连接耗尽

泄漏检测(leakDetectionThreshold)报警

死连接

使用失效连接导致失败

自动剔除并重建

✅ HikariCP 提供了生产级的 容错能力


5. ✅ 资源利用率优化

CPU

高频 TLS/SSL 握手消耗 CPU

复用连接,降低开销

内存

连接对象频繁创建/回收 → GC 压力大

池内复用,GC 友好

网络

频繁 TCP 建连 → TIME_WAIT 大量堆积

长连接复用,网络稳定


✅ 四、典型生产配置对比

maxPoolSize

N/A

10~50(根据 DB 能力)

minIdle

N/A

5

connectionTimeout

N/A

30s

idleTimeout

N/A

60s

maxLifetime

N/A

30min

leakDetectionThreshold

N/A

60s

HikariConfig config = new HikariConfig();config.setMaximumPoolSize(20);config.setMinimumIdle(5);config.setConnectionTimeout(30_000);config.setIdleTimeout(60_000);config.setMaxLifetime(1_800_000);config.setLeakDetectionThreshold(60_000); // 关键!防止泄漏

✅ 五、不使用连接池的替代方案?—— 基本不可行

DriverManager.getConnection()

❌ 仅测试

生产环境绝对禁止

自己维护static Connection

❌ 危险

多线程不安全、连接失效无法恢复

每次都新建连接

❌ 性能灾难

吞吐下降 10~100 倍

🚫 结论:必须使用连接池


✅ 六、Flink 官方态度

虽然 JdbcSink.sink() 默认不使用连接池,但官方文档和社区强烈建议:

🔗 Flink JDBC Connector Docs

"For production usage, it is recommended to use a connection pool to avoid frequent connection creation."

翻译:“生产环境建议使用连接池,避免频繁创建连接。”


✅ 七、总结:使用 HikariCP 的必要性

🚀 性能

⭐⭐⭐⭐⭐

提升吞吐 10~100 倍

🧱 稳定性

⭐⭐⭐⭐⭐

避免连接风暴、泄漏

🔐 安全性

⭐⭐⭐⭐

连接超时、健康检查

🔄 容错性

⭐⭐⭐⭐

支持重试、自动重建

📈 可监控性

⭐⭐⭐⭐

提供 JMX 指标

🧩 生产合规

⭐⭐⭐⭐⭐

无连接池 = 不符合生产标准


✅ 最终结论

在 Flink 自定义 Sink 中使用 HikariCP 实现数据库连接池化,不是“可选项”,而是“生产必需项”

无论是写入 MySQL、PostgreSQL 还是其他关系型数据库,只要涉及 高频、批量、并行写入,就必须使用连接池。

✅ 下面是实践内容

package org.example.demo01;import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;public class MySQLSinkFunction<T extends MyRecord> extends RichSinkFunction<T> {private static volatile HikariDataSource dataSource;private static final Object lock = new Object();private PreparedStatement preparedStatement;private Connection connection;private int batchSize = 0;private final Class<T> recordClass;public MySQLSinkFunction(Class<T> recordClass) {this.recordClass = recordClass;}// 实现单例模式的数据源private static HikariDataSource getDataSource() {if (dataSource == null) {synchronized (lock) {if (dataSource == null) {HikariConfig config = new HikariConfig();config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC");config.setDriverClassName("com.mysql.cj.jdbc.Driver");config.setUsername("root");config.setPassword("root");config.setMaximumPoolSize(20);config.setMinimumIdle(5);config.setConnectionTimeout(30000);config.setIdleTimeout(600000);config.setMaxLifetime(1800000);dataSource = new HikariDataSource(config);}}}return dataSource;}@Overridepublic void open(Configuration parameters) throws Exception {// 获取单例数据源dataSource = getDataSource();// 从连接池获取连接this.connection = dataSource.getConnection();this.connection.setAutoCommit(false); // 手动提交事务String sql = getSqlFromAnnotation(this.recordClass);this.preparedStatement = connection.prepareStatement(sql);}private String getSqlFromAnnotation(Class<T> clazz) {try {// 检查类上是否有 SQLParameterSqlStr 注解if (clazz.isAnnotationPresent(SQLParameterSqlStr.class)) {SQLParameterSqlStr annotation = clazz.getAnnotation(SQLParameterSqlStr.class);assert annotation != null;String sql = annotation.name();if (sql != null && !sql.isEmpty()) {return sql;}}throw new RuntimeException("获取SQL为空");} catch (Exception e) {throw new RuntimeException("获取SQL失败", e);}}@Overridepublic void invoke(T record, Context context) throws Exception {setParametersViaAnnotation(preparedStatement, record);preparedStatement.addBatch();batchSize++;// 每 100 条 flush 一次int FLUSH_SIZE = 100;if (batchSize >= FLUSH_SIZE) {flush();}}private void flush() throws Exception {try {preparedStatement.executeBatch();preparedStatement.clearBatch();connection.commit(); // 提交事务batchSize = 0;} catch (Exception e) {connection.rollback(); // 回滚事务throw e;}}@Overridepublic void close() throws Exception {try {if (batchSize > 0) {flush(); // 处理剩余数据}} catch (Exception e) {if (connection != null) {connection.rollback();}throw e;} finally {// 关闭语句和连接,但不关闭数据源(因为是单例)if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close(); // 这会将连接返回到连接池}// 不要关闭 dataSource,因为它是单例的}}//  基于注解的参数设置方法private void setParametersViaAnnotation(PreparedStatement ps, T record) throws Exception {Class<?> clazz = record.getClass();Field[] fields = clazz.getDeclaredFields();// 创建索引到字段的映射Map<Integer, Field> indexFieldMap = new HashMap<>();for (Field field : fields) {if (Modifier.isStatic(field.getModifiers())) {continue;}SQLParameter annotation = field.getAnnotation(SQLParameter.class);if (annotation != null && annotation.index() > 0) {indexFieldMap.put(annotation.index(), field);}}// 按索引顺序设置参数for (int i = 1; i <= indexFieldMap.size(); i++) {Field field = indexFieldMap.get(i);if (field != null) {field.setAccessible(true);Object value = field.get(record);if (value == null) {ps.setNull(i, getSQLType(field.getType()));} else {setParameterByType(ps, i, value);}}}}// 根据Java类型获取对应的SQL类型private int getSQLType(Class<?> type) {if (type == String.class) {return Types.VARCHAR;} else if (type == Integer.class || type == int.class) {return Types.INTEGER;} else if (type == Long.class || type == long.class) {return Types.BIGINT;} else if (type == Double.class || type == double.class) {return Types.DOUBLE;} else if (type == Float.class || type == float.class) {return Types.FLOAT;} else if (type == Boolean.class || type == boolean.class) {return Types.BOOLEAN;} else if (type == java.util.Date.class) {return Types.TIMESTAMP;} else {return Types.VARCHAR;}}private void setParameterByType(PreparedStatement ps, int index, Object value) throws SQLException {if (value instanceof String) {ps.setString(index, (String) value);} else if (value instanceof Integer) {ps.setInt(index, (Integer) value);} else if (value instanceof Long) {ps.setLong(index, (Long) value);} else if (value instanceof Double) {ps.setDouble(index, (Double) value);} else if (value instanceof Float) {ps.setFloat(index, (Float) value);} else if (value instanceof Boolean) {ps.setBoolean(index, (Boolean) value);} else if (value instanceof java.util.Date) {ps.setTimestamp(index, new Timestamp(((java.util.Date) value).getTime()));} else {ps.setString(index, value.toString());}}
}

MySQLSinkFunction<T extends MyRecord> 是一个 高度通用、注解驱动、支持批处理与事务控制的 Flink 自定义 Sink,它结合了 连接池、反射、注解配置、资源管理 等多种技术,目标是实现一个“泛化 JDBC 写入器”,适用于多种 POJO 类型自动映射到数据库表。

下面我们从 功能模块、设计思想、优点、潜在问题、改进建议 五个维度进行全面、深入的分析。


🧩 一、整体功能概览

✅ 泛型支持

支持任意MyRecord子类,实现通用写入

✅ HikariCP 连接池

使用单例模式共享数据源,避免频繁创建连接

✅ 注解驱动 SQL

通过@SQLParameterSqlStr定义插入语句

✅ 注解绑定参数

通过@SQLParameter(index=1)映射字段与 SQL 占位符

✅ 批处理 + 事务

每 100 条提交一次,失败回滚

✅ 反射动态赋值

不依赖具体字段名,通过注解索引设置参数

✅ 资源安全释放

close()中处理剩余 batch 并关闭资源

✅ 单例数据源 + 安全关闭

在最后一个 subtask 关闭时释放连接池

🎯 设计目标:构建一个“通用、可复用、配置化”的 Flink JDBC Sink,适用于多表、多实体写入场景。


🔍 二、核心功能模块详解

1. ✅ 泛型 + 注解驱动的 SQL 配置

public class MySQLSinkFunction<T extends MyRecord>
  • 支持传入任意 MyRecord 子类
  • 通过 @SQLParameterSqlStr 注解定义 SQL 语句:
@SQLParameterSqlStr("INSERT INTO user_table (id, name, age) VALUES (?, ?, ?)")public class UserRecord extends MyRecord { ... }
  • getSqlFromAnnotation() 读取该注解,获取 SQL

✅ 优势:SQL 与代码解耦,便于维护和扩展


2. ✅ 字段参数映射(基于注解 + 反射)

@SQLParameter(index = 1)private Integer id;@SQLParameter(index = 2)private String name;
  • setParametersViaAnnotation() 方法:
    • 扫描所有字段
    • 读取 @SQLParameter(index) 注解
    • 构建 index → Field 映射
    • 按顺序设置 PreparedStatement 参数

✅ 优势:无需硬编码 ps.setString(2, record.getName()),支持灵活映射


3. ✅ 类型自动识别与 null 处理

private int getSQLType(Class<?> type) // 获取 JDBC Typesprivate void setParameterByType(...) // 根据类型调用 setX 方法
  • 支持常见类型:String, Integer, Long, Double, Boolean, Date
  • 处理 null 值:ps.setNull(i, sqlType)
  • 未知类型转为 String

✅ 健壮性好,避免空指针异常


4. ✅ HikariCP 单例连接池(线程安全)

private static volatile HikariDataSource dataSource;private static final Object lock = new Object();private static HikariDataSource getDataSource()
  • 使用 双重检查锁(Double-Checked Locking) 实现线程安全单例
  • 所有并行 subtask 共享同一个连接池(但每个 subtask 拿独立连接)
  • 避免重复创建连接池,节省资源

⚠️ 注意:Flink 的每个 RichSinkFunction 实例运行在不同线程中,共享连接池是合理的。


5. ✅ 批处理 + 事务控制

invoke(): addBatch() → 达到 100 条 flush()flush(): executeBatch() + commit() + clearBatch()

close(): 最终 flush 剩余数据

  • 批量提交提升性能
  • 手动事务控制(autoCommit=false
  • 失败回滚(connection.rollback()
  • 确保数据一致性

6. ✅ 连接池安全关闭(改进版)

if (getRuntimeContext().getNumberOfParallelSubtasks() ==getRuntimeContext().getIndexOfThisSubtask() + 1) {if (dataSource != null && !dataSource.isClosed()) {dataSource.close();}}
  • 判断当前 subtask 是否是 最后一个(索引 +1 == 总数)
  • 是则关闭 dataSource,避免连接泄漏
  • 解决了“静态单例不关闭”的问题

✅ 这是一个 关键改进,使类更适合生产环境


✅ 三、优点总结

🧱高通用性

支持任意MyRecord子类,无需修改 Sink 代码

🔌解耦设计

SQL 和字段映射通过注解配置,业务与框架分离

高性能

批处理 + 连接池 + 复用 PreparedStatement

🛡️容错性好

事务回滚、异常处理、资源释放完整

📦可复用性强

可作为通用组件集成到多个 Flink 项目中

🧩扩展性好

易于支持新类型、新表、新字段


⚠️ 四、潜在问题与改进建议

1. ❌ 未与 Flink Checkpoint 集成(无 Exactly-Once 支持)

  • 当前是 RichSinkFunction,但未实现 CheckpointedFunction
  • 如果作业失败,已 addBatch() 但未 flush() 的数据会丢失
  • 只能保证 At-Least-Once,无法实现 Exactly-Once
✅ 改进建议:

实现 CheckpointedFunction,在 snapshotState() 中保存未提交的数据:

private List<T> bufferedRecords = new ArrayList<>();@Overridepublic void snapshotState(FunctionSnapshotContext context) {// 清空状态checkpointState.clear();// 保存未提交的记录checkpointState.addAll(bufferedRecords);}@Overridepublic void initializeState(FunctionInitializationContext context) {checkpointState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("buffered-records", TypeInformation.of(recordClass)));if (context.isRestored()) {// 恢复状态for (T record : checkpointState.get()) {bufferedRecords.add(record);}}}

否则在故障恢复时可能 丢失数据


2. ❌ 批大小写死为 100,无法配置

private final int FLUSH_SIZE = 100;

  • 应通过构造函数传入,便于不同场景调优
✅ 改进建议:
public MySQLSinkFunction(Class<T> recordClass, int batchSize) {this.recordClass = recordClass;this.FLUSH_SIZE = batchSize;}

3. ❌ 反射性能开销较大

  • 每条记录都通过反射获取字段、设置值
  • 性能比直接调用慢 3~5 倍
✅ 优化建议:
  • 对于高吞吐场景(>5万条/秒),建议使用 JdbcBatchingOutputFormat + 直接 setX 方法
  • 或使用 缓存字段映射 + LambdaMetafactory 生成 setter

4. ❌ 未校验 SQL 参数数量与字段数量是否匹配

  • 如果 indexFieldMap.size() != SQL 中 ? 的数量,可能导致 SQLException
  • 应在 open() 时校验
✅ 改进建议:
int parameterCount = preparedStatement.getParameterMetaData().getParameterCount();if (indexFieldMap.size() != parameterCount) {throw new IllegalArgumentException("SQL 参数数量与字段数量不匹配");

5. ❌ 未处理并发 subtask 关闭时的竞争

if (getIndexOfThisSubtask() + 1 == getNumberOfParallelSubtasks())
  • 多个 subtask 同时关闭时,可能多个线程都认为自己是“最后一个”
  • 导致 dataSource.close() 被调用多次
✅ 改进建议:

使用静态标志位控制:

private static volatile boolean dataSourceClosed = false;if (!dataSourceClosed) {synchronized (MySQLSinkFunction.class) {if (!dataSourceClosed) {dataSource.close();dataSourceClosed = true;}}}

✅ 五、适用场景总结

✅ 多种 POJO 写入不同表

✅ 非常适合(通用性高)

✅ 中低吞吐(< 5万条/秒)

✅ 适合

✅ 测试/开发环境

✅ 推荐

✅ 生产环境(需改造)

⚠️ 需增加 Checkpoint 支持、配置化参数

❌ 超高吞吐(> 10万条/秒)

❌ 反射开销大,建议用JdbcBatchingOutputFormat


🏁 六、最终评价

这是一个 设计精良、功能完整、具备生产潜力的通用 JDBC Sink,其亮点在于:

  • 注解驱动 + 反射 实现了高度通用性
  • HikariCP 单例 + 安全关闭 解决了资源管理问题
  • 批处理 + 事务 + 回滚 保证了数据一致性
  • 类型自动识别 + null 处理 增强了健壮性

✅ 在flink中的用法

package org.example.demo01;import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class LocalFlinkJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(8);// ✅ 开启 Checkpointing,支持 Exactly-Onceenv.enableCheckpointing(5000); // 每 5 秒做一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 避免频繁 checkpoint// ✅ 数据源(测试用)DataStreamSource<MyRecordOne> dataStreamSource = env.fromElements(new MyRecordOne(1, "Alice", 100),new MyRecordOne(2, "Bob", 200),new MyRecordOne(3, "Charlie", 300));// ✅ 打印验证(可选)dataStreamSource.print();dataStreamSource.addSink(new MySQLSinkFunction<>(MyRecordOne.class)).setParallelism(8);// ✅ 添加 JDBC Sink
//        dataStreamSource.addSink(
//                JdbcSink.sink(
//                        // SQL 语句(注意字段顺序)
//                        "INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)",
//
//                        // 设置 PreparedStatement 参数
//                        (PreparedStatement ps, MyRecord record) -> {
//                            ps.setInt(1, record.getId());
//                            ps.setString(2, record.getName());
//                            ps.setInt(3, record.getValue());
//                        },
//
//                        // 执行选项:批处理配置
//                        JdbcExecutionOptions.builder()
//                                .withBatchSize(1000)               // 每批最多 1000 条
//                                .withBatchIntervalMs(2000)         // 每 2 秒 flush
//                                .withMaxRetries(3)                 // 重试 3 次
//                                .build(),
//
//                        // 连接选项
//                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
//                                .withUrl("jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC")
//                                .withDriverName("com.mysql.cj.jdbc.Driver")
//                                .withUsername("root")
//                                .withPassword("root")
//                                .withConnectionCheckTimeoutSeconds(30)
//                                .build()
//                )
//        );// ✅ 启动执行env.execute("Production JDBC Write Job");}}
@Data
@SQLParameterSqlStr(name = "insert into my_record(id,name,value) values(?,?,?)")
public class MyRecordOne implements MyRecord {@SQLParameter(index = 1)private Integer id;@SQLParameter(index = 2)private String name;@SQLParameter(index = 3)private int value;public MyRecordOne(Integer id, String name, int value) {this.id = id;this.name = name;this.value = value;}
}

http://www.dtcms.com/a/347207.html

相关文章:

  • 权限管理模块
  • 用 Ansible 优雅部署 Kubernetes 1.33.3(RedHat 10)
  • 第一章:启航篇 —— 新晋工程师的生存与扎根 (1)
  • TensorFlow 深度学习 开发环境搭建
  • 通过Java连接并操作MySQL数据库
  • 多智能体篇:智能体的“语言”——ACL协议与消息队列实现
  • 高斯分布的KL散度计算
  • STM32学习笔记19-FLASH
  • 标准浪涌测试波形对比解析
  • linux内核 - vmalloc 介绍
  • Unity 字符串输出文字一样但Equals 判断为false
  • 图论与最短路学习笔记
  • CH2 线性表
  • LeetCode 分类刷题:2529. 正整数和负整数的最大计数
  • IDEA控制台乱码(Tomcat)解决方法
  • 2-4.Python 编码基础 - 流程控制(判断语句、循环语句、break 语句与 continue 语句)
  • MySQL存储过程详解
  • `strlen` 字符串长度函数
  • GEO优化服务:智能时代的全球竞争新赛道
  • VS Code 中创建和开发 Spring Boot 项目
  • python企微发私信
  • Text2API与Text2SQL深度对比:自然语言驱动的数据交互革命
  • 【40页PPT】数据安全动态数据脱敏解决方案(附下载方式)
  • C/C++ 头文件命名约定
  • stack,queue以及deque的介绍
  • 【Java学习笔记】18.反射与注解的应用
  • [e3nn] 模型部署 | TorchScript JIT | `@compile_mode`装饰器 | Cython
  • TypeScript的构造函数constructor用法理解
  • 深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)第四章知识点问答补充及重新排版
  • 离线优先与冲突解决:ABP vNext + PWA 的边缘同步