Flink ClickHouse 连接器维表源码深度解析
在 Flink ClickHouse Connector 中,维表(Lookup Table)功能允许在流处理过程中实时地从外部数据库(这里是 ClickHouse)中查询数据,用于丰富流中的记录。下面我们将深入分析 ClickHouse 维表相关的源码。
1. 维表支持概述
Flink 提供了 LookupTableSource
接口来支持维表功能,ClickHouseDynamicTableSource
实现了这个接口,从而使得 ClickHouse 可以作为维表使用。
2. ClickHouseDynamicTableSource
类分析
2.1 类定义和接口实现
public class ClickHouseDynamicTableSourceimplements ScanTableSource,LookupTableSource,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown {
ClickHouseDynamicTableSource
实现了多个接口,其中 LookupTableSource
是实现维表功能的关键接口。
2.2 构造函数
public ClickHouseDynamicTableSource(ClickHouseReadOptions readOptions,int lookupMaxRetryTimes,@Nullable LookupCache cache,Properties properties,DataType physicalRowDataType) {this.readOptions = readOptions;this.connectionProperties = properties;this.lookupMaxRetryTimes = lookupMaxRetryTimes;this.cache = cache;this.physicalRowDataType = physicalRowDataType;
}
构造函数接收多个参数,包括读取选项、最大重试次数、缓存对象、连接属性和物理行数据类型。
2.3 getLookupRuntimeProvider
方法
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {// ClickHouse only support non-nested look up keysString[] keyNames = new String[context.getKeys().length];for (int i = 0; i < keyNames.length; i++) {int[] innerKeyArr = context.getKeys()[i];Preconditions.checkArgument(innerKeyArr.length == 1, "ClickHouse only support non-nested look up keys");keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);}final RowType rowType = (RowType) physicalRowDataType.getLogicalType();ClickHouseRowDataLookupFunction lookupFunction =new ClickHouseRowDataLookupFunction(readOptions,lookupMaxRetryTimes,DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),keyNames,rowType);if (cache != null) {return PartialCachingLookupProvider.of(lookupFunction, cache);} else {return LookupFunctionProvider.of(lookupFunction);}
}
- 获取维表查询键:通过
LookupContext
获取维表查询的键,并确保这些键是非嵌套的。 - 创建
ClickHouseRowDataLookupFunction
:该函数用于实际执行从 ClickHouse 中查询维表数据的操作。 - 缓存支持:如果配置了缓存(
cache
不为null
),则使用PartialCachingLookupProvider
来提供带有缓存功能的维表查询;否则,直接使用LookupFunctionProvider
提供基本的维表查询。
2.4 ClickHouseRowDataLookupFunction
类
这个类是实现维表查询的核心类,它继承自 RichFunction
,用于在运行时执行维表查询。虽然代码中没有直接给出该类的详细实现,但可以推测它会使用 ClickHouseReadOptions
中的配置信息,通过 JDBC 连接到 ClickHouse 数据库,并根据查询键执行 SQL 查询。
3. 维表查询的流程
- 定义维表:在 Flink SQL 中,使用
CREATE TABLE
语句定义一个 ClickHouse 表作为维表。
CREATE TABLE clickhouse_lookup_table (id INT,name STRING,-- 其他字段PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'clickhouse','url' = 'jdbc:ch://127.0.0.1:8123','database-name' = 'tutorial','table-name' = 'lookup_table',-- 其他配置
);
- 流表与维表关联:在 SQL 查询中,使用
JOIN
操作将流表与维表关联起来。
SELECT s.id, l.name
FROM stream_table s
JOIN clickhouse_lookup_table FOR SYSTEM_TIME AS OF s.proctime() l
ON s.id = l.id;
- 运行时查询:在流处理过程中,Flink 会根据
ClickHouseRowDataLookupFunction
从 ClickHouse 中查询维表数据,并将结果与流数据进行关联。
4. 缓存支持
为了提高维表查询的性能,Flink 支持对维表数据进行缓存。通过配置 LookupOptions
中的相关参数,可以启用部分缓存(PARTIAL
)。
options.add(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.PARTIAL);
options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, Duration.ofMinutes(5));
options.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, Duration.ofMinutes(10));
options.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS, 10000L);
在 getLookupRuntimeProvider
方法中,如果配置了缓存,会使用 PartialCachingLookupProvider
来管理缓存和查询操作。
5. 总结
Flink ClickHouse Connector 的维表支持通过 ClickHouseDynamicTableSource
类和 ClickHouseRowDataLookupFunction
类实现。它允许在流处理过程中实时地从 ClickHouse 中查询数据,并支持缓存功能以提高性能。在使用时,需要在 SQL 中定义维表并进行关联,Flink 会自动处理维表查询和数据关联的逻辑。