当前位置: 首页 > wzjs >正文

杭州设计公司网站排名株洲最新通知今天

杭州设计公司网站排名,株洲最新通知今天,免费主题软件app,网站页面设计 颜色 背景 要求一、引言 在大数据处理的实际应用场景中,数据的高效存储与处理至关重要。Flink 作为一款强大的流式计算框架,能够对海量数据进行实时处理;而 ClickHouse 作为高性能的列式数据库,擅长处理大规模数据分析任务。Flink ClickHouse 连…
一、引言

在大数据处理的实际应用场景中,数据的高效存储与处理至关重要。Flink 作为一款强大的流式计算框架,能够对海量数据进行实时处理;而 ClickHouse 作为高性能的列式数据库,擅长处理大规模数据分析任务。Flink ClickHouse 连接器则将二者的优势结合起来,允许用户将 Flink 处理后的数据高效地写入 ClickHouse 数据库。下面我们将深入剖析其数据写入的源码实现,探究其背后的工作原理和设计思路。

二、整体架构概述

Flink ClickHouse 连接器的数据写入主要围绕 AbstractClickHouseOutputFormat 及其子类展开。AbstractClickHouseOutputFormat 作为抽象基类,定义了写入数据的基本流程和核心方法,为后续的具体实现提供了统一的框架。具体的写入逻辑由其子类 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 实现,它们分别适用于不同的场景,以满足多样化的需求。

三、核心类及方法详细解析
1. ClickHouseConnectionOptions
// For testing.
@VisibleForTestingpublic ClickHouseConnectionOptions(String url) {this(url, null, null, null, null);}

这个构造函数是专门为测试目的而设计的。在测试环境中,为了简化测试用例的编写,我们可能只需要关注 URL 参数,而不需要设置其他复杂的配置。通过这个构造函数,我们可以方便地创建一个仅包含 URL 的 ClickHouseConnectionOptions 对象,从而更专注于对特定功能的测试。

2. AbstractClickHouseOutputFormat.Builder

AbstractClickHouseOutputFormat.Builder 类采用了建造者模式,用于构建 AbstractClickHouseOutputFormat 的实例。它提供了一系列的 withXXX 方法,允许用户通过链式调用的方式设置各种配置参数,最后通过 build 方法创建具体的输出格式实例。这种设计模式使得代码更加简洁易读,同时也提高了代码的可维护性。

public Builder withOptions(ClickHouseDmlOptions options) {this.options = options;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

这些 withXXX 方法通过将传入的参数赋值给 Builder 类的成员变量,并返回 this 指针,实现了链式调用的效果。例如,用户可以这样使用:

AbstractClickHouseOutputFormat.Builder builder = new AbstractClickHouseOutputFormat.Builder();
builder.withOptions(options).withConnectionProperties(connectionProperties);
public AbstractClickHouseOutputFormat build() {Preconditions.checkNotNull(options);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(primaryKeys);Preconditions.checkNotNull(partitionKeys);if (primaryKeys.length > 0) {LOG.warn("If primary key is specified, connector will be in UPSERT mode.");LOG.warn("The data will be updated / deleted by the primary key, you will have significant performance loss.");}ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(options, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),options.getDatabaseName(),options.getTableName());boolean isDistributed = engineFullSchema != null;return isDistributed && options.isUseLocal()? createShardOutputFormat(connectionProvider.getOrCreateConnection(), engineFullSchema): createBatchOutputFormat();} catch (Exception exception) {throw new RuntimeException("Build ClickHouse output format failed.", exception);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先会对必要的参数进行非空检查,确保所有必需的配置都已正确设置。如果指定了主键,会发出警告,因为使用主键会使连接器进入 UPSERT 模式,这可能会导致性能下降。接着,会创建 ClickHouseConnectionProvider 对象,用于管理与 ClickHouse 数据库的连接。然后,尝试获取分布式引擎的完整信息,判断当前表是否为分布式表。根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseShardOutputFormatClickHouseBatchOutputFormat 实例。最后,无论创建过程是否成功,都会关闭 ClickHouseConnectionProvider 以释放连接资源。

3. ClickHouseBatchOutputFormat 和 ClickHouseShardOutputFormat

ClickHouseBatchOutputFormat 用于批量写入数据,它将多条记录打包成一个批次,一次性发送到 ClickHouse 数据库,从而减少了与数据库的交互次数,提高了写入性能。而 ClickHouseShardOutputFormat 用于分片写入数据,适用于分布式表。在分布式环境中,数据会被分散存储在多个分片上,ClickHouseShardOutputFormat 会根据分片策略将数据正确地分发到相应的分片上。

private ClickHouseBatchOutputFormat createBatchOutputFormat() {return new ClickHouseBatchOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),fieldNames,primaryKeys,partitionKeys,logicalTypes,options);
}private ClickHouseShardOutputFormat createShardOutputFormat(ClickHouseConnection connection, DistributedEngineFull engineFullSchema)throws SQLException {SinkShardingStrategy shardingStrategy;List<FieldGetter> fieldGetters = null;if (options.isShardingUseTableDef()) {Expression shardingKey = engineFullSchema.getShardingKey();if (shardingKey instanceof FieldExpr) {shardingStrategy = SinkShardingStrategy.VALUE;FieldGetter fieldGetter =getFieldGetterOfShardingKey(((FieldExpr) shardingKey).getColumnName());fieldGetters = singletonList(fieldGetter);} else if (shardingKey instanceof FunctionExpr&& "rand()".equals(shardingKey.explain())) {shardingStrategy = SinkShardingStrategy.SHUFFLE;fieldGetters = emptyList();} else if (shardingKey instanceof FunctionExpr&& "javaHash".equals(((FunctionExpr) shardingKey).getFunctionName())&& ((FunctionExpr) shardingKey).getArguments().stream().allMatch(expression -> expression instanceof FieldExpr)) {shardingStrategy = SinkShardingStrategy.HASH;fieldGetters = parseFieldGetters((FunctionExpr) shardingKey);} else {throw new RuntimeException("Unsupported sharding key: " + shardingKey.explain());}} else {shardingStrategy = options.getShardingStrategy();if (shardingStrategy.shardingKeyNeeded) {fieldGetters =options.getShardingKey().stream().map(this::getFieldGetterOfShardingKey).collect(toList());}}ClusterSpec clusterSpec = getClusterSpec(connection, engineFullSchema.getCluster());return new ClickHouseShardOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),clusterSpec,engineFullSchema,fieldNames,primaryKeys,partitionKeys,logicalTypes,shardingStrategy.provider.apply(fieldGetters),options);
}

createShardOutputFormat 方法中,会根据配置选择不同的分片策略,如 VALUESHUFFLEHASH。对于不同的分片策略,会解析相应的分片键,并创建 FieldGetter 列表。例如,如果分片策略为 VALUE,会根据分片键的字段名创建一个 FieldGetter;如果为 SHUFFLE,则不需要 FieldGetter;如果为 HASH,会解析函数表达式中的字段名并创建相应的 FieldGetter 列表。最后,会获取集群信息,并创建 ClickHouseShardOutputFormat 实例。

四、写入流程总结
  1. 配置参数:使用 AbstractClickHouseOutputFormat.BuilderwithXXX 方法设置写入选项、连接属性、字段信息等参数。这些参数将决定数据写入的行为和方式。
  2. 构建输出格式:调用 build 方法,根据是否为分布式表以及是否使用本地表,选择创建 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 实例。这个过程中会进行参数检查、连接创建和分片策略解析等操作。
  3. 数据写入:通过创建的输出格式实例,将数据批量或分片写入 ClickHouse 数据库。在写入过程中,会根据配置的批量大小和刷新间隔进行数据的缓存和批量提交,以提高写入性能。
  4. 资源管理:在写入完成后,关闭 ClickHouseConnectionProvider 以释放连接资源,避免资源泄漏。
五、优化建议
  1. 合理配置批量大小和刷新间隔:根据实际的业务场景和硬件资源,合理调整 sink.batch-sizesink.flush-interval 参数,以平衡写入性能和内存使用。
  2. 避免使用主键进行 UPSERT 操作:如果不是必要情况,尽量避免指定主键,因为 UPSERT 操作会带来较大的性能开销。
  3. 选择合适的分片策略:根据数据的特点和分布情况,选择合适的分片策略,如 VALUESHUFFLEHASH,以确保数据均匀分布在各个分片上。
六、结论

通过对 Flink ClickHouse 连接器数据写入源码的深入分析,我们了解了其核心类和方法的实现细节,以及数据写入的整体流程。这有助于我们在实际应用中更好地配置和优化数据写入过程,提高写入性能和可靠性。同时,我们也可以根据具体的业务需求对源码进行扩展和定制,以满足更多复杂的场景。


文章转载自:

http://jzBCic89.Ldhyh.cn
http://66b651Uu.Ldhyh.cn
http://GbMB9B1W.Ldhyh.cn
http://D5LXnWcr.Ldhyh.cn
http://I0Fa0Xh5.Ldhyh.cn
http://e9E24IZt.Ldhyh.cn
http://TDIl5BMq.Ldhyh.cn
http://8kHYwS2A.Ldhyh.cn
http://IwH74qwc.Ldhyh.cn
http://yFuHHiXB.Ldhyh.cn
http://Cbao3thS.Ldhyh.cn
http://CbhvEFsj.Ldhyh.cn
http://j5mYboFF.Ldhyh.cn
http://7lhJTDe6.Ldhyh.cn
http://xpy1lwIX.Ldhyh.cn
http://20y3nrmf.Ldhyh.cn
http://pppQKVUL.Ldhyh.cn
http://N04F638F.Ldhyh.cn
http://Sa7HkVOh.Ldhyh.cn
http://gMXiYD0v.Ldhyh.cn
http://LtiZ3C21.Ldhyh.cn
http://nzY5KZpP.Ldhyh.cn
http://idRKMQre.Ldhyh.cn
http://Y5NFBIww.Ldhyh.cn
http://udvANAFX.Ldhyh.cn
http://5OlMJgEa.Ldhyh.cn
http://wqQ9vQ7Y.Ldhyh.cn
http://ivvWQhGO.Ldhyh.cn
http://AK1xXvSo.Ldhyh.cn
http://NrVpOgKP.Ldhyh.cn
http://www.dtcms.com/wzjs/699505.html

相关文章:

  • 网站后台管理系统哪个好如何用wd做网站设计
  • 杭州开发网站的公司哪家好灯塔seo
  • 企业网站备案要求win2003做网站
  • 东莞九江网站制作好的网站或网页推荐
  • 网站建设版面分几页合适网站开发开源程序
  • 太原网站建设与维护财务软件开发公司简介
  • 非官方网站建设综合类网站怎么做
  • 网站漏洞郫都区规划建设局网站
  • 墨刀做的网站设计dw做网站乱码
  • 怎么搜索网站内容网站备案流程审核单
  • 刷赞网站空间免费网站营销咨询顾问
  • 网站开发服务流程烟台网站建设在哪
  • 网站建设部分费用会计科目空调网站模版
  • 怎么黑网站用代码做一号店网站怎么做
  • 电子商务网站主要功能海口网站建设加q.479185700
  • 婚纱网站手机网站世界上做的最后的网站
  • 门头沟富阳网站建设wordpress 更换模板
  • 玉泉路网站建设网站建设售后服务合同
  • 南平建设集团网站全网推广软件
  • 自媒体横行还有做网站手机企业网站
  • 小白建站软件淄博英文网站建设专业
  • 用什么做网站的访问量统计公示专家的定义
  • 制作网站哪家专业实验中心网站建设
  • 怎样在谷歌做网站做的比较好的时尚网站
  • 烟台品牌网站建设校园网站建设检查自评报告
  • 宁波网站建设wordpress nginx 重写规则
  • 怀化网站建设设计网站刚做怎么做seo优化
  • 西seo优化排名专业网站优化外包
  • 整站多关键词优化电子工程师证怎么考
  • 郑州联通网站备案网站建设的一般要素