Flink ClickHouse 连接器数据读取源码深度解析
一、引言
在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。
二、整体架构概述
Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat
及其子类展开。AbstractClickHouseInputFormat
是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormat
和 ClickHouseShardInputFormat
实现,它们分别适用于不同的场景,以满足多样化的读取需求。
三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>implements ResultTypeQueryable<RowData> {protected final String[] fieldNames;protected final TypeInformation<RowData> rowDataTypeInfo;protected final Object[][] parameterValues;protected final String parameterClause;protected final String filterClause;protected final long limit;protected AbstractClickHouseInputFormat(String[] fieldNames,TypeInformation<RowData> rowDataTypeInfo,Object[][] parameterValues,String parameterClause,String filterClause,long limit) {this.fieldNames = fieldNames;this.rowDataTypeInfo = rowDataTypeInfo;this.parameterValues = parameterValues;this.parameterClause = parameterClause;this.filterClause = filterClause;this.limit = limit;}
AbstractClickHouseInputFormat
继承自 RichInputFormat
并实现了 ResultTypeQueryable
接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat
实例时,所有必要的信息都已正确设置。
2. AbstractClickHouseInputFormat.Builder
AbstractClickHouseInputFormat.Builder
类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat
的实例。它提供了一系列的 withXXX
方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build
方法创建具体的输入格式实例。
public Builder withOptions(ClickHouseReadOptions readOptions) {this.readOptions = readOptions;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}
这些 withXXX
方法通过将传入的参数赋值给 Builder
类的成员变量,并返回 this
指针,实现了链式调用的效果。例如,用户可以这样使用:
AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {Preconditions.checkNotNull(readOptions);Preconditions.checkNotNull(connectionProperties);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(rowDataTypeInfo);ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(readOptions, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),readOptions.getDatabaseName(),readOptions.getTableName());boolean isDistributed = engineFullSchema != null;if (isDistributed && readOptions.isUseLocal()) {initShardInfo(connectionProvider, engineFullSchema);initPartitionInfo();} else if (readOptions.getPartitionColumn() != null) {initPartitionInfo();}LogicalType[] logicalTypes =Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);return isDistributed && readOptions.isUseLocal()? createShardInputFormat(logicalTypes, engineFullSchema): createBatchInputFormat(logicalTypes);} catch (Exception e) {throw new RuntimeException("Build ClickHouse input format failed.", e);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}
在 build
方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider
对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormat
或 ClickHouseBatchInputFormat
实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider
以释放连接资源。
3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat
ClickHouseBatchInputFormat
用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat
用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat
会根据分片信息从各个分片上并行读取数据,从而提高读取效率。
private AbstractClickHouseInputFormat createShardInputFormat(LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {return new ClickHouseShardInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,engineFullSchema,shardMap,shardValues,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {return new ClickHouseBatchInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}
这两个方法分别用于创建 ClickHouseShardInputFormat
和 ClickHouseBatchInputFormat
实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter
用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData
格式。
4. FilterPushDownHelper
public class FilterPushDownHelper {private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();static {FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);FILTERS.put(BuiltInFunctionDefinitions.AND, AND);FILTERS.put(BuiltInFunctionDefinitions.OR, OR);}public static String convert(List<ResolvedExpression> filters) {int filterSize = filters.size();return filters.stream().map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize)).filter(Optional::isPresent).map(Optional::get).collect(joining(" AND "));}
FilterPushDownHelper
类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert
方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。
四、读取流程总结
- 配置参数:使用
AbstractClickHouseInputFormat.Builder
的withXXX
方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。 - 构建输入格式:调用
build
方法,根据是否为分布式表以及是否使用本地表,选择创建ClickHouseBatchInputFormat
或ClickHouseShardInputFormat
实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。 - 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用
FilterPushDownHelper
进行过滤条件的下推,减少不必要的数据传输,提高读取效率。 - 资源管理:在读取完成后,关闭
ClickHouseConnectionProvider
以释放连接资源,避免资源泄漏。
五、优化建议
- 合理使用过滤条件下推:尽量使用
FilterPushDownHelper
提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。 - 并行读取数据:对于分布式表,可以使用
ClickHouseShardInputFormat
进行分片读取,并行从各个分片上读取数据,提高读取效率。 - 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论
通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。