当前位置: 首页 > wzjs >正文

网站 信用卡支付接口百度竞价收费标准

网站 信用卡支付接口,百度竞价收费标准,政府网站的栏目建设,微门户网站建设一、引言 在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分…
一、引言

在大数据处理流程中,从存储系统中高效读取数据是进行后续分析的基础。Flink ClickHouse 连接器为我们提供了从 ClickHouse 数据库读取数据的能力,使得我们可以将 ClickHouse 中存储的海量数据引入到 Flink 流处理或批处理作业中进行进一步的分析和处理。下面我们将深入解析其数据读取的源码实现,探索其背后的技术细节和优化策略。

二、整体架构概述

Flink ClickHouse 连接器的数据读取主要围绕 AbstractClickHouseInputFormat 及其子类展开。AbstractClickHouseInputFormat 是一个抽象基类,它定义了读取数据的基本流程和方法,为具体的读取实现提供了统一的接口。具体的读取逻辑由其子类 ClickHouseBatchInputFormatClickHouseShardInputFormat 实现,它们分别适用于不同的场景,以满足多样化的读取需求。

三、核心类及方法详细解析
1. AbstractClickHouseInputFormat
public abstract class AbstractClickHouseInputFormat extends RichInputFormat<RowData, InputSplit>implements ResultTypeQueryable<RowData> {protected final String[] fieldNames;protected final TypeInformation<RowData> rowDataTypeInfo;protected final Object[][] parameterValues;protected final String parameterClause;protected final String filterClause;protected final long limit;protected AbstractClickHouseInputFormat(String[] fieldNames,TypeInformation<RowData> rowDataTypeInfo,Object[][] parameterValues,String parameterClause,String filterClause,long limit) {this.fieldNames = fieldNames;this.rowDataTypeInfo = rowDataTypeInfo;this.parameterValues = parameterValues;this.parameterClause = parameterClause;this.filterClause = filterClause;this.limit = limit;}

AbstractClickHouseInputFormat 继承自 RichInputFormat 并实现了 ResultTypeQueryable 接口。它包含了一些重要的属性,如字段名、行数据类型信息、参数值、过滤条件和限制条件等。这些属性将在数据读取过程中发挥重要作用,用于指定读取的数据范围和格式。构造函数用于初始化这些属性,确保在创建 AbstractClickHouseInputFormat 实例时,所有必要的信息都已正确设置。

2. AbstractClickHouseInputFormat.Builder

AbstractClickHouseInputFormat.Builder 类同样采用了建造者模式,用于构建 AbstractClickHouseInputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输入格式实例。

public Builder withOptions(ClickHouseReadOptions readOptions) {this.readOptions = readOptions;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseInputFormat.Builder builder = new AbstractClickHouseInputFormat.Builder();
builder.withOptions(readOptions).withConnectionProperties(connectionProperties);
public AbstractClickHouseInputFormat build() {Preconditions.checkNotNull(readOptions);Preconditions.checkNotNull(connectionProperties);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(rowDataTypeInfo);ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(readOptions, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),readOptions.getDatabaseName(),readOptions.getTableName());boolean isDistributed = engineFullSchema != null;if (isDistributed && readOptions.isUseLocal()) {initShardInfo(connectionProvider, engineFullSchema);initPartitionInfo();} else if (readOptions.getPartitionColumn() != null) {initPartitionInfo();}LogicalType[] logicalTypes =Arrays.stream(fieldTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new);return isDistributed && readOptions.isUseLocal()? createShardInputFormat(logicalTypes, engineFullSchema): createBatchInputFormat(logicalTypes);} catch (Exception e) {throw new RuntimeException("Build ClickHouse input format failed.", e);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。如果是分布式表且使用本地表,会初始化分片信息和分区信息;如果指定了分区列,也会初始化分区信息。最后,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例。无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchInputFormat 和 ClickHouseShardInputFormat

ClickHouseBatchInputFormat 用于批量读取数据,它会一次性从 ClickHouse 数据库中读取多条记录,减少了与数据库的交互次数,提高了读取性能。而 ClickHouseShardInputFormat 用于分片读取数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardInputFormat 会根据分片信息从各个分片上并行读取数据,从而提高读取效率。

private AbstractClickHouseInputFormat createShardInputFormat(LogicalType[] logicalTypes, DistributedEngineFull engineFullSchema) {return new ClickHouseShardInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,engineFullSchema,shardMap,shardValues,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}private AbstractClickHouseInputFormat createBatchInputFormat(LogicalType[] logicalTypes) {return new ClickHouseBatchInputFormat(new ClickHouseConnectionProvider(readOptions, connectionProperties),new ClickHouseRowConverter(RowType.of(logicalTypes)),readOptions,fieldNames,rowDataTypeInfo,parameterValues,parameterClause,filterClause,limit);
}

这两个方法分别用于创建 ClickHouseShardInputFormatClickHouseBatchInputFormat 实例,会传入必要的参数,如连接提供者、行转换器、读取选项、字段名等。行转换器 ClickHouseRowConverter 用于将从 ClickHouse 数据库中读取的原始数据转换为 Flink 可以处理的 RowData 格式。

4. FilterPushDownHelper
public class FilterPushDownHelper {private static final Map<FunctionDefinition, SqlClause> FILTERS = new HashMap<>();static {FILTERS.put(BuiltInFunctionDefinitions.EQUALS, EQ);FILTERS.put(BuiltInFunctionDefinitions.NOT_EQUALS, NOT_EQ);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN, GT);FILTERS.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, GT_EQ);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN, LT);FILTERS.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, LT_EQ);FILTERS.put(BuiltInFunctionDefinitions.IS_NULL, IS_NULL);FILTERS.put(BuiltInFunctionDefinitions.IS_NOT_NULL, IS_NOT_NULL);FILTERS.put(BuiltInFunctionDefinitions.AND, AND);FILTERS.put(BuiltInFunctionDefinitions.OR, OR);}public static String convert(List<ResolvedExpression> filters) {int filterSize = filters.size();return filters.stream().map(expression -> FilterPushDownHelper.convertExpression(expression, filterSize)).filter(Optional::isPresent).map(Optional::get).collect(joining(" AND "));}

FilterPushDownHelper 类用于将 Flink 的过滤表达式转换为 ClickHouse 可以理解的 SQL 过滤条件。通过静态初始化块,将 Flink 的内置函数定义映射到相应的 SQL 子句。convert 方法会将多个过滤表达式转换为一个 SQL 过滤条件字符串,从而实现过滤条件的下推。过滤条件下推可以减少从 ClickHouse 数据库中读取的数据量,提高读取效率。

四、读取流程总结
  1. 配置参数:使用 AbstractClickHouseInputFormat.BuilderwithXXX 方法设置读取选项、连接属性、字段信息等参数。这些参数将决定数据读取的范围、格式和方式。
  2. 构建输入格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchInputFormatClickHouseShardInputFormat 实例。这个过程中会进行参数检查、连接创建、分片信息和分区信息的初始化等操作。
  3. 数据读取:通过创建的输入格式实例,从 ClickHouse 数据库批量或分片读取数据。在读取过程中,可以使用 FilterPushDownHelper 进行过滤条件的下推,减少不必要的数据传输,提高读取效率。
  4. 资源管理:在读取完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理使用过滤条件下推:尽量使用 FilterPushDownHelper 提供的功能,将过滤条件下推到 ClickHouse 数据库端进行处理,减少从数据库中读取的数据量。
  2. 并行读取数据:对于分布式表,可以使用 ClickHouseShardInputFormat 进行分片读取,并行从各个分片上读取数据,提高读取效率。
  3. 调整批量大小:根据实际的业务场景和硬件资源,合理调整批量大小,以平衡读取性能和内存使用。
六、结论

通过对 Flink ClickHouse 连接器数据读取源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据读取的整体流程。这有助于我们在实际应用中更好地配置和优化数据读取过程,提高读取性能和准确性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。

http://www.dtcms.com/wzjs/356586.html

相关文章:

  • 创建网站商城营销渠道有哪几种
  • 东莞建站响应式网站多少钱最新军事动态最新消息
  • 什么网站做ppt好信息流推广渠道有哪些
  • 注册一个网站seo工资待遇 seo工资多少
  • WordPress自定义登录页面网站seo优化公司
  • 龙口网站制作多少钱网络推广文案有哪些
  • 怎样做网站吸引客户单页网站设计
  • 西安的网站设计与制作首页优化网站广告优化
  • 嘉兴cms模板建站比较好的网络优化公司
  • 网站建设教程集体苏州久远网络国内可访问的海外网站和应用
  • 专业b2c电商网站开发南京seo建站
  • 做个商城网站怎么做便宜吗今日腾讯新闻最新消息
  • 微信视频网站建设多少钱百度投诉中心电话
  • 优设导航百度移动端关键词优化
  • 东莞公司网站品牌策划方案
  • 推销商务网站的途径有哪些新手如何做网上销售
  • 阿里巴巴做网站教程南昌seo排名外包
  • 北京王府井步行街上来往最多的是什么人seo推广代运营
  • 柳市网页设计百度seo关键词优化排行
  • 哪个企业的网站做的比较好网站推广seo教程
  • 做c2c网站的弊端西安网络推广营销公司
  • 做58推广网站找哪家好资源搜索
  • 有哪些做外贸网站app推广员怎么做
  • 企业展示网站如何建十个有创意的线上活动
  • 烟台开发区建设业联合会网站如何优化关键词的排名
  • 详述电子商务网站的建设韩国vs加纳分析比分
  • 有没有做.net面试题的网站seo是什么东西
  • 天河区做网站百度竞价怎么收费
  • 深圳个人形象设计工作室台州网站seo
  • 招聘网站可以同时做两份简历吗6seo服务商技术好的公司