当前位置: 首页 > news >正文

Flink ClickHouse 连接器数据读取源码深度解析

一、引言

在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。

二、整体架构概述

Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat 及其子类展开。AbstractClickHouseInputFormat 是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormatClickHouseShardInputFormat 实现,它们分别适用于不同的场景,以满足多样化的读取需求。

三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>implements ResultTypeQueryable<RowData> {protected final String[] fieldNames;protected final TypeInformation<RowData> rowDataTypeInfo;protected final Object[][] parameterValues;protected final String parameterClause;protected final String filterClause;protected final long limit;protected AbstractClickHouseInputFormat(String[] fieldNames,TypeInformation<RowData> rowDataTypeInfo,Object[][] parameterValues,String parameterClause,String filterClause,long limit) {this.fieldNames = fieldNames;this.rowDataTypeInfo = rowDataTypeInfo;this.parameterValues = parameterValues;this.parameterClause = parameterClause;this.filterClause = filterClause;this.limit = limit;}

AbstractClickHouseInputFormat 继承自 RichInputFormat 并实现了 ResultTypeQueryable 接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat 实例时,所有必要的信息都已正确设置。

2. AbstractClickHouseInputFormat.Builder

AbstractClickHouseInputFormat.Builder 类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输入格式实例。

public Builder withOptions(ClickHouseReadOptions readOptions) {this.readOptions = readOptions;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {Preconditions.checkNotNull(readOptions);Preconditions.checkNotNull(connectionProperties);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(rowDataTypeInfo);ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(readOptions, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),readOptions.getDatabaseName(),readOptions.getTableName());boolean isDistributed = engineFullSchema != null;if (isDistributed && readOptions.isUseLocal()) {initShardInfo(connectionProvider, engineFullSchema);initPartitionInfo();} else if (readOptions.getPartitionColumn() != null) {initPartitionInfo();}LogicalType[] logicalTypes =Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);return isDistributed && readOptions.isUseLocal()? createShardInputFormat(logicalTypes, engineFullSchema): createBatchInputFormat(logicalTypes);} catch (Exception e) {throw new RuntimeException("Build ClickHouse input format failed.", e);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat

ClickHouseBatchInputFormat 用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat 用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat 会根据分片信息从各个分片上并行读取数据,从而提高读取效率。

private AbstractClickHouseInputFormat createShardInputFormat(LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {return new ClickHouseShardInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,engineFullSchema,shardMap,shardValues,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {return new ClickHouseBatchInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}

这两个方法分别用于创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter 用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData 格式。

4. FilterPushDownHelper
public class FilterPushDownHelper {private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();static {FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);FILTERS.put(BuiltInFunctionDefinitions.AND, AND);FILTERS.put(BuiltInFunctionDefinitions.OR, OR);}public static String convert(List<ResolvedExpression> filters) {int filterSize = filters.size();return filters.stream().map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize)).filter(Optional::isPresent).map(Optional::get).collect(joining(" AND "));}

FilterPushDownHelper 类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert 方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。

四、读取流程总结
  1. 配置参数:使用 AbstractClickHouseInputFormat.BuilderwithXXX 方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。
  2. 构建输入格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchInputFormatClickHouseShardInputFormat 实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。
  3. 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用 FilterPushDownHelper 进行过滤条件的下推,减少不必要的数据传输,提高读取效率。
  4. 资源管理:在读取完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理使用过滤条件下推:尽量使用 FilterPushDownHelper 提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。
  2. 并行读取数据:对于分布式表,可以使用 ClickHouseShardInputFormat 进行分片读取,并行从各个分片上读取数据,提高读取效率。
  3. 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论

通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。

http://www.dtcms.com/a/267894.html

相关文章:

  • G-sensor运动检测功能开源:打破技术壁垒,加速智能硬件开发!
  • Java JDBC的初步了解
  • 力扣网编程45题:跳跃游戏II之正向查找方法(中等)
  • 【深度学习新浪潮】AI在材料力学领域的研究进展一览
  • 基于51单片机智能婴儿床
  • SQL 一键生成 Go Struct!支持字段注释、类型映射、结构体命名规范
  • 从前端转go开发的学习路线
  • 3、Configuring Topics
  • I-Cache、D-Cache 和 SRAM 的区别与联系
  • 系统架构设计师论文分享-论软件体系结构的演化
  • Docker容器中安装MongoDB,导入数据
  • nvm常用指令汇总
  • Spark流水线数据质量检查组件
  • 【认知】如何在高强度工作中保持心理健康和情绪稳定?
  • WizTree v4.2.5 x86 x64 单文件版
  • 让你的asp.net网站在调试模式下也能在局域网通过ip访问
  • Java 双亲委派机制笔记
  • GitCode项目创建指南
  • 一文掌握Qt Quick数字图像处理项目开发(基于Qt 6.9 C++和QML,代码开源)
  • 【黑马点评】(二)缓存
  • PyTorch 2.7深度技术解析:新一代深度学习框架的革命性演进
  • Python作业1
  • 实现Spring MVC登录验证与拦截器保护:从原理到实战
  • Jiraph​ 简介
  • React 各颜色转换方法、颜色值换算工具HEX、RGB/RGBA、HSL/HSLA、HSV、CMYK
  • AcWing--873.欧拉函数
  • ARMv8 创建1、2、3级页表代码与注释
  • 【C++基础】内存管理四重奏:malloc/free vs new/delete - 面试高频考点与真题解析
  • Windows 11 Enterprise LTSC 转 IoT
  • C++ i386/AMD64平台汇编指令对齐长度获取实现