当前位置: 首页 > wzjs >正文

美发营销型网站一级a做爰片视频免费观看网站

美发营销型网站,一级a做爰片视频免费观看网站,秦皇岛市妇幼保健院,做汽车配件外贸用什么网站一、引言 在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分…
一、引言

在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。

二、整体架构概述

Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat 及其子类展开。AbstractClickHouseInputFormat 是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormatClickHouseShardInputFormat 实现,它们分别适用于不同的场景,以满足多样化的读取需求。

三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>implements ResultTypeQueryable<RowData> {protected final String[] fieldNames;protected final TypeInformation<RowData> rowDataTypeInfo;protected final Object[][] parameterValues;protected final String parameterClause;protected final String filterClause;protected final long limit;protected AbstractClickHouseInputFormat(String[] fieldNames,TypeInformation<RowData> rowDataTypeInfo,Object[][] parameterValues,String parameterClause,String filterClause,long limit) {this.fieldNames = fieldNames;this.rowDataTypeInfo = rowDataTypeInfo;this.parameterValues = parameterValues;this.parameterClause = parameterClause;this.filterClause = filterClause;this.limit = limit;}

AbstractClickHouseInputFormat 继承自 RichInputFormat 并实现了 ResultTypeQueryable 接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat 实例时,所有必要的信息都已正确设置。

2. AbstractClickHouseInputFormat.Builder

AbstractClickHouseInputFormat.Builder 类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输入格式实例。

public Builder withOptions(ClickHouseReadOptions readOptions) {this.readOptions = readOptions;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {Preconditions.checkNotNull(readOptions);Preconditions.checkNotNull(connectionProperties);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(rowDataTypeInfo);ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(readOptions, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),readOptions.getDatabaseName(),readOptions.getTableName());boolean isDistributed = engineFullSchema != null;if (isDistributed && readOptions.isUseLocal()) {initShardInfo(connectionProvider, engineFullSchema);initPartitionInfo();} else if (readOptions.getPartitionColumn() != null) {initPartitionInfo();}LogicalType[] logicalTypes =Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);return isDistributed && readOptions.isUseLocal()? createShardInputFormat(logicalTypes, engineFullSchema): createBatchInputFormat(logicalTypes);} catch (Exception e) {throw new RuntimeException("Build ClickHouse input format failed.", e);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat

ClickHouseBatchInputFormat 用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat 用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat 会根据分片信息从各个分片上并行读取数据,从而提高读取效率。

private AbstractClickHouseInputFormat createShardInputFormat(LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {return new ClickHouseShardInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,engineFullSchema,shardMap,shardValues,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {return new ClickHouseBatchInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}

这两个方法分别用于创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter 用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData 格式。

4. FilterPushDownHelper
public class FilterPushDownHelper {private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();static {FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);FILTERS.put(BuiltInFunctionDefinitions.AND, AND);FILTERS.put(BuiltInFunctionDefinitions.OR, OR);}public static String convert(List<ResolvedExpression> filters) {int filterSize = filters.size();return filters.stream().map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize)).filter(Optional::isPresent).map(Optional::get).collect(joining(" AND "));}

FilterPushDownHelper 类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert 方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。

四、读取流程总结
  1. 配置参数:使用 AbstractClickHouseInputFormat.BuilderwithXXX 方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。
  2. 构建输入格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchInputFormatClickHouseShardInputFormat 实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。
  3. 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用 FilterPushDownHelper 进行过滤条件的下推,减少不必要的数据传输,提高读取效率。
  4. 资源管理:在读取完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理使用过滤条件下推:尽量使用 FilterPushDownHelper 提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。
  2. 并行读取数据:对于分布式表,可以使用 ClickHouseShardInputFormat 进行分片读取,并行从各个分片上读取数据,提高读取效率。
  3. 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论

通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。


文章转载自:

http://746CJ8bT.pznqt.cn
http://qQfZZm0I.pznqt.cn
http://QYHOOfeJ.pznqt.cn
http://KHpwileU.pznqt.cn
http://9ZilNNDL.pznqt.cn
http://tMMSua6o.pznqt.cn
http://hdkP24c6.pznqt.cn
http://VNJ21ksi.pznqt.cn
http://RycNqNyy.pznqt.cn
http://HYFWbLZ6.pznqt.cn
http://3z5rCaGH.pznqt.cn
http://vTTohNtF.pznqt.cn
http://VyWlxaVr.pznqt.cn
http://3BM3qP2p.pznqt.cn
http://sXPfFg6U.pznqt.cn
http://3JuaorEG.pznqt.cn
http://RaYpiFOX.pznqt.cn
http://ypnzrQ2A.pznqt.cn
http://4hSyNWDx.pznqt.cn
http://RAJdK3IB.pznqt.cn
http://pxJUAlRc.pznqt.cn
http://47GBzs8A.pznqt.cn
http://BPH813lp.pznqt.cn
http://ICeitUQ6.pznqt.cn
http://izSp7QcB.pznqt.cn
http://Kyr7JsVq.pznqt.cn
http://P2wIfae1.pznqt.cn
http://GUkgveyz.pznqt.cn
http://FB2902Ft.pznqt.cn
http://F8lmRzfu.pznqt.cn
http://www.dtcms.com/wzjs/628440.html

相关文章:

  • 常州青竹网络做网站3d建模软件有哪些
  • 嘉兴电子商务网站建设瑞安网站建设优化
  • 石家庄个人建站网站策划建站制作企业
  • 凡科建站电话咨询软装设计用什么软件
  • 服务器网站建设建站教程pdf
  • 电影网站带采集哪个建站系统适合外贸网站建设
  • 中文个人网站欣赏怎么用域名建网站
  • 网站设计版式什么是企业营销型网站?
  • php做网站完整视频学设计的基础是什么
  • 沈阳求做商城 网站做网站运营工资是不是很低
  • 做网站江西商务网站设计实训报告
  • 网站建设与设计意义网站关键词描述
  • 网站名称 域名wordpress个人站主题
  • 如何避免网站被攻击安庆做网站电话
  • 企业网站seo托管怎么做与做机器人有关的网站
  • 杭州建设项目审批网站微信小程序开发流程文档
  • 建材公司网站建设案例为什么四川省建设厅网站打不开
  • 长沙建设网站的公司做网站用语言
  • 网站设计团队有哪些职业电商网站设计系列
  • 网站建设服务器技术有哪些汉南公司网站建设
  • 襄城县住房和城乡建设局网站seo推广优化外包公司
  • 怎做网站转app申请域名后如何发布网站
  • 河南省能源规划建设局网站天津seo网站设计
  • 360网站提交收录入口如何做企业网站建设
  • 网站规划与网页设计总结网页出现网站维护
  • 哪个网站能上传自己做的简历哈尔滨网站建设2017
  • 网站平台建设专家意见深圳网站建设推广优化
  • 揭阳做网站的佛山专业网站设计方案
  • 网站建设培训视频工程机械网
  • wordpress+视频站模版wordpress live chat