Clickhouse 迁移到 Doris 的最佳实践
一、引言
在将数据从 Clickhouse 迁移到 Apache Doris / SelectDB Cloud 的过程中,涉及表结构迁移、查询语句迁移以及数据迁移等多个关键环节。每个环节都有其复杂性和需要注意的细节,本文将详细介绍这些内容及对应的最佳实践方法。
二、表结构迁移
(二)表结构创建要点
了解表引擎与数据模型对应关系
对于普通表引擎迁移且不包含特殊数据类型(如枚举、嵌套数据类型)时,需要清楚 Clickhouse 不同的表引擎与 Doris 的数据模型的对应关系。例如,Clickhouse 的 MergeTree 表引擎,在 Doris 中可根据业务场景选择合适的数据模型,如用于原始日志、操作记录等分析场景时,可对应 Doris 的 Duplicate 模型。
合理设置分桶相关参数
分桶数:不要使用 auto 分桶,应根据表的数据量通过计算公式得出合理的分桶数。例如,可以先预估表的未来数据增长规模,结合 Doris 集群的节点数量和性能,通过一定的测试和计算来确定分桶数,以确保数据在集群中能够均匀分布,提升查询性能。
分桶字段:充分考虑业务场景,选择经常在查询条件中出现的字段作为分桶字段,这样可以在查询时快速定位到相关数据,避免后期对表结构进行二次优化(分桶确定后,后期不可更改。除非是新建分区的分桶)。
处理特殊数据类型
Clickhouse 中有许多 Doris 不具备的数据类型,如无符号整型数据类型、嵌套数据类型等。在替换时,优先选择 Doris 中数据类型范围比 Clickhouse 原始字段范围更大的数据类型。例如,Clickhouse 中的 UInt32 类型,在 Doris 中可转换为 bigint 类型。
如果实在没有与之对应的 Doris 数据类型,可以考虑使用 Doris 的 text 数据类型,但这可能会对查询性能产生一定影响,需要谨慎使用。
(三)获取 CK 表结构
可以通过脚本一次性批量获取 Clickhouse 的表结构,核心 SQL 如下:
select create_table_query from system.tables where database = 'default' or database='tpch';
或者直接通过脚本的方式通过 client 遍历执行show create table xxx
来获取 Clickhouse 的表结构。
三、查询语句迁移
(一)工具辅助
查询语句迁移目前有sql-glot工具提供帮助,将 Clickhouse 的查询语句从 Clickhouse 的日志中全部获取出来后,将SQL过一遍sql-glot,就能自动转换为Doris的查询语句,sql - glot 使用网站。 但可能存在一些不兼容的问题,导致转换失败。故还需要人为手动参与进去做一些修改:
(二)人工参与
梳理函数列表
梳理 Clickhouse 使用了哪些查询函数。
梳理 Doris 这边与之对应的函数列表。对于有对应的 Doris 函数,直接进行替换;对于不存在相应函数的情况,及时联系Doris官方人员做支持。
四、数据迁移
(一)迁移方式选择准则
从已有的用户迁移实践来看,应尽可能选用 Doris 能提供的原生导入方式来进行数据导入。现阶段,从 Clickhouse 迁移到 Doris/Cloud 有多种方式,如:
- 在 Clickhouse 上导出为文件,然后调用 stream load 或者 S3 load 的方式
- Spark connector
- datax
(二)各工具可能存在的问题
Datax:其 Clickhouse reader 支持的 Clickhouse 的数据类型比较少,很多数据类型不支持,使用时需要慎重考虑。
Spark connector 和 Flink connector:如果直接读取 CK,会存在一些特殊数据类型不支持的问题(如 BITMAP)。
(三)推荐迁移方式
优先考虑将 Clickhouse 的数据导出成文件(如 parquet)到 HDFS 或者对象存储,然后使用 S3 LOAD 等方式把数据导入 Doris。具体操作如下:
环境配置
搭建 Doris 集群,确保集群环境稳定且各项配置正确。
准备好 HDFS / 对象存储,确保存储环境可用且权限设置正确。
检查环境中是否安装有 python3,部分导出工具可能依赖 python3 环境。
在 Doris 建表
建表时需将字段与 Clickhouse 对应,表模型对应可参考 Apache Doris 与 Clickhouse 字段及表模型对应关系。
注意 ORC 文件迁移要求 Clickhouse 和 Doris 表字段大小写相同。例如,Clickhouse 表的 DDL 如下:
CREATE TABLE ssb.lineorder
(`LO_ORDERKEY` UInt32,`LO_LINENUMBER` UInt8,`LO_CUSTKEY` UInt32,`LO_PARTKEY` UInt32,`LO_SUPPKEY` UInt32,`LO_ORDERDATE` Date,`LO_ORDERPRIORITY` LowCardinality(String),`LO_SHIPPRIORITY` UInt8,`LO_QUANTITY` UInt8,`LO_EXTENDEDPRICE` UInt32,`LO_ORDTOTALPRICE` UInt32,`LO_DISCOUNT` UInt8,`LO_REVENUE` UInt32,`LO_SUPPLYCOST` UInt32,`LO_TAX` UInt8,`LO_COMMITDATE` Date,`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
SETTINGS index_granularity = 8192;
在 Doris 中的 DDL 如下:
CREATE TABLE `lineorder` (`LO_ORDERKEY` bigint NOT NULL COMMENT "",`LO_LINENUMBER` SMALLINT NOT NULL COMMENT "",`LO_CUSTKEY` bigint NOT NULL COMMENT "",`LO_PARTKEY` bigint NOT NULL COMMENT "",`LO_SUPPKEY` bigint NOT NULL COMMENT "",`LO_ORDERDATE` date NOT NULL COMMENT "",`LO_ORDERPRIORITY` varchar(16) NOT NULL COMMENT "",`LO_SHIPPRIORITY` SMALLINT NOT NULL COMMENT "",`LO_QUANTITY` SMALLINT NOT NULL COMMENT "",`LO_EXTENDEDPRICE` bigint NOT NULL COMMENT "",`LO_ORDTOTALPRICE` bigint NOT NULL COMMENT "",`LO_DISCOUNT` SMALLINT NOT NULL COMMENT "",`LO_REVENUE` bigint NOT NULL COMMENT "",`LO_SUPPLYCOST` bigint NOT NULL COMMENT "",`LO_TAX` SMALLINT NOT NULL COMMENT "",`LO_COMMITDATE` date NOT NULL COMMENT "",`LO_SHIPMODE` varchar(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`LO_ORDERKEY`)
PARTITION BY RANGE(`LO_ORDERDATE`)
(PARTITION p1 VALUES [('0000-01-01'), ('1993-01-01')),
PARTITION p2 VALUES [('1993-01-01'), ('1994-01-01')),
PARTITION p3 VALUES [('1994-01-01'), ('1995-01-01')),
PARTITION p4 VALUES [('1995-01-01'), ('1996-01-01')),
PARTITION p5 VALUES [('1996-01-01'), ('1997-01-01')),
PARTITION p6 VALUES [('1997-01-01'), ('1998-01-01')),
PARTITION p7 VALUES [('1998-01-01'), ('1999-01-01')))
DISTRIBUTED BY HASH(`LO_ORDERKEY`) BUCKETS 48
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
配置导出工具
此方案为分布式导出方案,如果单表数据量小于两亿条,可直接采用 clickhouse - client 导出 orc 文件。直接导出语法参考如下:
clickhouse - client --password yourpassword --query="select * from ssb.lineorder FORMAT ORC" > lineorder.ORC
分布式导出工具配置如下:
下载 ClickHouse 导出工具:
git clone https://github.com/LOVEGISER/clickhouse_export.git
进入目录:
cd clickhouse_export/clickhouse_python_sink/
打开配置文件:
vim config.py
在配置文件中进行如下配置:
# -*- encoding=utf8 -*-"""
-------------------------------------------------
@author: "xxxx"
@file: config.py
@time: xxxx
@desc: clickhouse_python_sink Server Config
-------------------------------------------------
"""
from log_utils import logger
#1. which table should want to been export
export_table_list = [{"db":"ssb", #指定数据库"table":"lineorder", #指定表"format":"ORC", #ORC/Parquet/CSVWithNames #指定导出格式"filenameExtension":"ORC", #ORC/Parquet/csv #指定导出文件后缀名"mode":"partition",#导出模式,可选按照partition分区导出或者all全量导出"partition_expr":"toYear(LO_ORDERDATE)",#指定的分区字段"upper_condition":"toYear(LO_ORDERDATE)<=999912",#结束分区"lower_condition":"toYear(LO_ORDERDATE)>=000001",#开始分区"partition_split_filed": "LO_TAX",#对数据量过大的partition按照split_filed再次切分"partition_split_filed_model": "continuous",#continuous:连续型(数据按照字段线性增长),discrete:离散型"partition_split_filed_type": "long" #datetime/long/date}# 可以同时指定多个表# ,{# "db":"default",# "table":"trips_np",# "format": "Parquet",# "mode":"all",# "partition_expr":"",# "upper_condition":"",# "lower_condition":"",# "partition_split_filed": "",# "partition_split_filed_model": "",# "partition_split_filed_type": ""# }
]#例子'''
example:
export_table_list = [{"db":"default","table":"trips","mode":"all/partition","partition_expr":"toYYYYMM(pickup_date)","upper_condition":"toYYYYMM(pickup_date)<=201508","lower_condition":"toYYYYMM(pickup_date)>=201506",}
]'''
#example :./clickhouse-client --host=<host> --port=<port> --user=<user> --password=<password>;
clickhouse_connect_command = "clickhouse-client --host=*.*.*.* --port=9000"
#2.thread number use
process_number = 10
sub_partition_max_size=200000
#指定clickhouse的userfiles路径,导出的文件在此
user_files_path = "/mnt/data/clickhouse/user_files"
运行导出
执行以下命令运行导出:
python3 scheduler.py
工具会自动监测导出线程状态以及导出数据条数准确性。
- 数据上传 HDFS / 对象存储
上传对象存储:以 OSS 为例,先找到 ClickHouse 的 user_file 目录或者在 python 脚本中自定义的数据文件目录,然后用 ossutil64 上传整个目录到存储桶。例如:
./ossutil64 cp -r /mnt/data/clickhouse/user_files/ssb/lineorder oss://bucket/ssb/lineorder
腾讯 COS 和阿里 OSS 等对象存储的上传操作可参考对应官方文档,如
腾讯 COS:https://cloud.tencent.com/document/product/436/10976;
阿里 OSS:https://help.aliyun.com/document_detail/50451.html。
上传 HDFS:使用 HDFS 命令上传,例如:
hdfs dfs -put lineorder /lineorder
Doris Load 数据
S3 Load:针对 ClickHouse 导出的数据存到对象存储上,参考: s3 load
LOAD LABEL ssb.lineorder_oss(DATA INFILE("s3://bucket/filer/*")INTO TABLE lineorderFORMAT AS "orc")
WITH S3("AWS_ENDPOINT" = "*","AWS_ACCESS_KEY" = "*","AWS_SECRET_KEY"="*","AWS_REGION" = "*")
PROPERTIES("timeout" = "3600" -- 如果数据量过大请合理配置超时时间);
数据验证
可以通过运行count(*)
或者常用分析 SQL 对比 ClickHouse 验证结果,确保迁移的数据准确无误。
五、其他数据迁移方式补充
(一)ClickHouse->Apache Doris (Stream Load File)
此方案仅适用于数据量小的维度表(不建议超过 200M)且表数据不复杂的场景(需注意 csv 文件的分隔符与双引号问题),具体操作步骤如下:
环境配置:搭建好 Doris 环境,确保其正常运行。
在 Doris 建表:把字段与 ClickHouse 对应,保证表模型匹配,ORC 文件迁移要求 ClickHouse 和 Doris 表字段大小写相同。
导出数据
CSV 格式:运行clickhouse-client --password yourpassword --format_csv_delimiter="|" --query="select * from ssb.customer FORMAT CSV" > customer.csv
导出数据。由于 ClickHouse 导出的 csv 文件中 string 和 date 等类型字段会带双引号,需使用sed -i 's/"//g' customer.csv
命令处理。
JSON 格式:执行clickhouse-client --password yourpassword --query="select * from ssb.customer FORMAT JSON" > customer.json
导出数据。因 clickhouse 导出的 json 文件带元数据信息,仅需 data 部分,可通过yum install -y jq
安装 jq 工具,再执行cat customer.json | jq ".data" > customer_f.json
处理。
Stream Load 导入 Doris
CSV 导入:执行curl --location-trusted -u root:****** -T customer.csv -H "column_separator:|" -H "label:customer_csv" http://host:port/api/ssb/customer/_stream_load
,根据返回结果确认导入是否成功。
JSON 导入:Doris 默认支持最大导入 100MB 的 Json 文件,可修改相关参数增大限制。执行curl --location-trusted -u root:****** -H "format: json" -H "strip_outer_array: true" -T customer_f.json http://host:port/api/ssb/customer/_stream_load
完成导入。
数据验证:通过运行count(*)
或者常用分析 SQL 对比 ClickHouse 验证数据准确性。
(二)ClickHouse->Apache Doris (Spark-SQL)
Spark-SQL 迁移方案分为 http/grpc 两种,适用于不同的 ClickHouse 版本,ClickHouse 21.1.2.15 及以上版本使用 grpc 协议效率更高,具体流程如下:
环境准备:搭建 Doris;下载 Spark(版本需为 3.2 以上),并配置好相关环境变量,如SPARK_WORKER_CORES
、SPARK_WORKER_MEMORY
、SPARK_MASTER_HOST
、SPARK_MASTER_PORT
、JAVA_HOME
等。
在 Doris 建表:将字段与 ClickHouse 对应,构建合适的表结构。
Spark 配置
下载 ClicHouse 相关 jar 包(根据 http 或 grpc 版本选择),放入 spark/jars/ 目录。
下载 spark-doris-connector,放入 spark/jars/ 目录。
编辑$SPARK_HOME/conf/spark-defaults.conf
,添加 ClickHouse 相关配置;编辑$SPARK_HOME/conf/``spark-env.sh
,配置 Spark 集群相关参数。
配置 Clickhouse:若使用 grpc 协议做迁移,需在 ClickHouse 配置文件/etc/clickhouse-server/config.xml
中打开 grpc 协议相关配置,并重启 ClickHouse 服务。
启动 Spark-SQL:先启动 Spark 集群(本次使用 standalone 模式),执行$SPARK_HOME/sbin/``start-all.sh
;再启动 spark-sql,执行$SPARK_HOME/bin/spark-sql --executor-memory 24G --executor-cores 4
,可通过执行select * from clickhouse.ssb.lineorder limit 10;
测试连接。
Spark 中建 Doris 映射表:参考相关文档,执行CREATE TEMPORARY VIEW lineorder_sink USING doris OPTIONS("table.identifier"="ssb.lineorder","fenodes"="*.*.*.*:8030","user"="root","password"="*","sink.batch.size" = "100000");
创建映射表。
迁移数据:根据 CK 数据分区情况,分批次迁移数据,如insert into lineorder_sink SELECT * FROM clickhouse.ssb.lineorder where LO_ORDERDATE >= '1998-01-01' and LO_ORDERDATE <= '1998-12-31';
或 insert into lineorder_sink SELECT * FROM clickhouse.ssb.lineorder;
,注意若迁移失败,需先在 Doris 删除对应数据再重新导入。
数据验证:通过运行count(*)
或者常用分析 SQL 对比 ClickHouse 验证结果。
六、迁移后优化与注意事项
(一)性能优化
索引优化:根据业务查询需求,在 Doris 中合理创建索引,如前缀索引、Bloom Filter 索引、Bitmap 倒排索引等,提高查询效率。例如,对于经常用于过滤条件的字段,可创建相应的索引。
物化视图:对于频繁查询的固定维度或聚合结果,创建物化视图。Doris 会自动维护物化视图的数据,查询时可直接从物化视图中读取数据,大幅提升查询性能。
数据分区与分桶调整:如果发现数据分布不均匀或查询性能未达预期,可根据实际数据情况和查询模式,重新调整数据分区和分桶策略,确保数据在集群中均匀分布,减少数据扫描范围。
(二)数据一致性保障
定期校验:迁移完成后,定期对 ClickHouse 和 Doris 中的数据进行一致性校验,可通过对比关键数据的统计信息(如记录数、聚合结果等),确保数据在迁移后及后续使用过程中保持一致。
增量同步:如果业务存在数据持续更新的情况,建立增量同步机制,确保 ClickHouse 中的新增或更新数据能够及时、准确地同步到 Doris 中,保证数据的实时性和一致性。
(三)监控与维护
集群监控:搭建完善的监控体系,对 Doris 集群的资源使用情况(CPU、内存、磁盘等)、查询性能、任务执行状态等进行实时监控,及时发现并解决潜在问题。
日志管理:合理管理 Doris 和相关工具的日志,通过分析日志信息,快速定位和排查迁移过程中或后续使用过程中出现的问题。
这份文档已涵盖多种迁移方式及后续优化等内容。若还想对某个迁移方式补充更多细节,或增加其他方面的内容,欢迎讨论。