当前位置: 首页 > news >正文

Flink ClickHouse 连接器维表源码深度解析

在 Flink ClickHouse Connector 中,维表(Lookup Table)功能允许在流处理过程中实时地从外部数据库(这里是 ClickHouse)中查询数据,用于丰富流中的记录。下面我们将深入分析 ClickHouse 维表相关的源码。

1. 维表支持概述

Flink 提供了 LookupTableSource 接口来支持维表功能,ClickHouseDynamicTableSource 实现了这个接口,从而使得 ClickHouse 可以作为维表使用。

2. ClickHouseDynamicTableSource 类分析

2.1 类定义和接口实现
public class ClickHouseDynamicTableSourceimplements ScanTableSource,LookupTableSource,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown {

ClickHouseDynamicTableSource 实现了多个接口,其中 LookupTableSource 是实现维表功能的关键接口。

2.2 构造函数
public ClickHouseDynamicTableSource(ClickHouseReadOptions readOptions,int lookupMaxRetryTimes,@Nullable LookupCache cache,Properties properties,DataType physicalRowDataType) {this.readOptions = readOptions;this.connectionProperties = properties;this.lookupMaxRetryTimes = lookupMaxRetryTimes;this.cache = cache;this.physicalRowDataType = physicalRowDataType;
}

构造函数接收多个参数,包括读取选项、最大重试次数、缓存对象、连接属性和物理行数据类型。

2.3 getLookupRuntimeProvider 方法
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {// ClickHouse only support non-nested look up keysString[] keyNames = new String[context.getKeys().length];for (int i = 0; i < keyNames.length; i++) {int[] innerKeyArr = context.getKeys()[i];Preconditions.checkArgument(innerKeyArr.length == 1, "ClickHouse only support non-nested look up keys");keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);}final RowType rowType = (RowType) physicalRowDataType.getLogicalType();ClickHouseRowDataLookupFunction lookupFunction =new ClickHouseRowDataLookupFunction(readOptions,lookupMaxRetryTimes,DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),keyNames,rowType);if (cache != null) {return PartialCachingLookupProvider.of(lookupFunction, cache);} else {return LookupFunctionProvider.of(lookupFunction);}
}
  • 获取维表查询键:通过 LookupContext 获取维表查询的键,并确保这些键是非嵌套的。
  • 创建 ClickHouseRowDataLookupFunction:该函数用于实际执行从 ClickHouse 中查询维表数据的操作。
  • 缓存支持:如果配置了缓存(cache 不为 null),则使用 PartialCachingLookupProvider 来提供带有缓存功能的维表查询;否则,直接使用 LookupFunctionProvider 提供基本的维表查询。
2.4 ClickHouseRowDataLookupFunction

这个类是实现维表查询的核心类,它继承自 RichFunction,用于在运行时执行维表查询。虽然代码中没有直接给出该类的详细实现,但可以推测它会使用 ClickHouseReadOptions 中的配置信息,通过 JDBC 连接到 ClickHouse 数据库,并根据查询键执行 SQL 查询。

3. 维表查询的流程

  1. 定义维表:在 Flink SQL 中,使用 CREATE TABLE 语句定义一个 ClickHouse 表作为维表。
CREATE TABLE clickhouse_lookup_table (id INT,name STRING,-- 其他字段PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'clickhouse','url' = 'jdbc:ch://127.0.0.1:8123','database-name' = 'tutorial','table-name' = 'lookup_table',-- 其他配置
);
  1. 流表与维表关联:在 SQL 查询中,使用 JOIN 操作将流表与维表关联起来。
SELECT s.id, l.name
FROM stream_table s
JOIN clickhouse_lookup_table FOR SYSTEM_TIME AS OF s.proctime() l
ON s.id = l.id;
  1. 运行时查询:在流处理过程中,Flink 会根据 ClickHouseRowDataLookupFunction 从 ClickHouse 中查询维表数据,并将结果与流数据进行关联。

4. 缓存支持

为了提高维表查询的性能,Flink 支持对维表数据进行缓存。通过配置 LookupOptions 中的相关参数,可以启用部分缓存(PARTIAL)。

options.add(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.PARTIAL);
options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, Duration.ofMinutes(5));
options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, Duration.ofMinutes(10));
options.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS, 10000L);

getLookupRuntimeProvider 方法中,如果配置了缓存,会使用 PartialCachingLookupProvider 来管理缓存和查询操作。

5. 总结

Flink ClickHouse Connector 的维表支持通过 ClickHouseDynamicTableSource 类和 ClickHouseRowDataLookupFunction 类实现。它允许在流处理过程中实时地从 ClickHouse 中查询数据,并支持缓存功能以提高性能。在使用时,需要在 SQL 中定义维表并进行关联,Flink 会自动处理维表查询和数据关联的逻辑。

http://www.dtcms.com/a/267857.html

相关文章:

  • 【Note】《Kafka: The Definitive Guide》第四章:Kafka 消费者全面解析:如何从 Kafka 高效读取消息
  • 深入理解Kafka幂等性:原理、边界与最佳实践
  • Neo4j 综合练习作业
  • Android 应用开发 | 一种限制拷贝速率解决因 IO 过高导致系统卡顿的方法
  • java ThreadLocal源码分析
  • 深度学习6(多分类+交叉熵损失原理+手写数字识别案例TensorFlow)
  • 高效处理大体积Excel文件的Java技术方案解析
  • 安卓之service
  • QT 菜单栏设计使用方法
  • 基于AndServer的RPC架构:Android原生SO文件远程调用实战指南
  • Python 机器学习核心入门与实战进阶 Day 4 - 支持向量机(SVM)原理与分类实战
  • 深度学习图像分类数据集—蘑菇识别分类
  • plantuml用法总结
  • Java设计模式之行为型模式(策略模式)介绍与说明
  • 利用低空无人机影像进行树种实例分割
  • 深入解析Vue中v-model的双向绑定实现原理
  • 牛客周赛99
  • 关于 栈帧变化完整流程图(函数嵌套)
  • 大模型面试:RAG与Agent相关
  • 《Redis》集群
  • 【Note】《Kafka: The Definitive Guide》 第二章 Installing Kafka:Kafka 安装与运行
  • Redis--主从复制详解
  • 【Docker基础】Docker容器挂载方式深度解析:--volume与--mount参数对比
  • QT6 源(155)模型视图架构里的列表视图 QListView:接着学习成员函数,信号函数,附上本类的源代码带注释。
  • HCIA-网络地址转换(NAT)
  • CppCon 2018 学习:Woes of Scope Guards and Unique_Resource
  • 抖音小游戏(IAA)巨量引擎投放指南
  • [shadPS4] 内存管理 | 权限管理 |文件系统 | 挂载和句柄
  • 【BTC】数据结构
  • 7,TCP服务器