当前位置: 首页 > news >正文

如何优雅调整Doris key顺序

前言

当前Doris不支持直接修改已有表的Key顺序。在实际业务中,数据模型往往需要迭代优化,当需要对旧表Key顺序进行调整时,只能通过新建表结构并迁移数据实现。然而,TB级以上的数据迁移操作复杂且耗时,成为实际工作中的常见痛点。本文将针对该问题提供高效解决方案,帮助开发者快速完成此类数据迁移任务。

数据迁移痛点

在数据量较大的场景下,直接使用Doris原生的INSERT INTO tbl1 WITH LABEL进行全表迁移存在显著风险。虽然优先考虑采用Flink/Spark进行迁移,但全表操作会带来以下问题:

  1. 资源消耗‌:Spark/Flink任务需要极高计算资源;‌
  2. 集群稳定性‌:可能冲击Doris集群负载。

理论上可通过分区粒度迁移缓解(如单分区分批处理),但在Doris 2.X实际使用中发现:
Spark/Flink的Doris connector通过FE接口获取SQL查询计划(接口示例:/api/_query_plan),该机制导致分区过滤条件无法下推,最终仍会全表扫描
其flink的核心代码(spark实现原理都相同)截取如下:

public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger)throws DorisException {String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);String readFields =StringUtils.isBlank(readOptions.getReadFields())? "*": readOptions.getReadFields();String sql ="select "+ readFields+ " from `"+ tableIdentifiers[0]+ "`.`"+ tableIdentifiers[1]+ "`";if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {sql += " where " + readOptions.getFilterQuery();}logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);String entity = "{\"sql\": \"" + sql + "\"}";logger.debug("Post body Sending to Doris FE is: '{}'.", entity);StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);stringEntity.setContentEncoding("UTF-8");stringEntity.setContentType("application/json");httpPost.setEntity(stringEntity);String resStr = send(options, readOptions, httpPost, logger);logger.debug("Find partition response is '{}'.", resStr);QueryPlan queryPlan = getQueryPlan(resStr, logger);Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);return tabletsMapToPartition(options,readOptions,be2Tablets,queryPlan.getOpaquedQueryPlan(),tableIdentifiers[0],tableIdentifiers[1],logger);}

在Doris官网中发现其read connector可以通过参数透传到Doris直接在Doris侧就可以对数据进行,详细如下:
在这里插入图片描述
在此场景效果不佳,还是无法避免上述问题,实测了几次都不行,一启动迁移job整个Doris集群资源消耗过高,似乎该问题似乎无解,没有很好的办法解决该问题。

解决思路

其实该问题棘手的原因主要是因数据量过大的问题才导致上述的问题,本质可以通过分治的思路处理该问题,根据doris存储特点,其最小的单位为tablet,我们可以采取使用原生的Doris的insert into方式进行迁移:

insert into with lablel select * from tbl  tablet(tablet_id)

其实现的细节如下:

  1. 查询doris待迁移表中所有分区(SHOW PARTITIONS FROM tbl;
  2. 根据查询到分区名称去查询分区下所有tablet( SHOW TABLETS FROM tbl PARTITIONS(p_20250418) order by TabletId),需要特别注意的是该查语句会把所有tablet都会查询出来(包含副本的talet),如果不进行去重直接将查询出的tablets进行迁移最终导致大量的数据重复问题,为此需要根据tablet_id进行去重仅选择一个有效的tablet进行迁移即可。
  3. 根据1、2步逐步对数据进行迁移,可以有效的节约资源,迁移的过程也不会导致Doris集群资源消耗多大的问题,迁移的过程中可以若某一个分区出现问题直接重建该分区,然后对该分区再次迁移即可,不会因为某一个分区迁移有问题导致,重新drop 然后在建的繁琐的过程,需要注意的是对有问题的分区重建的时候需要关闭自动创建分区的功能,将有问题分区快熟重建然后及时恢复自动创建分区属性,避免导致部分分区缺失的问题,相关语句如下:

ALTER TABLE tbl SET ("dynamic_partition.enable" = "false");
ALTER TABLE tbl DROP PARTITION p_20250527;
ALTER TABLE tbl
ADD PARTITION p_20250527 VALUES [("2025-05-27 00:00:00"), ("2025-05-28 00:00:00"));
ALTER TABLE tbl SET ("dynamic_partition.enable" = "true");

具体实现细节

经过上述的验证其方案是可行的,但是如果需要我们手工一个个的迁移进行迁移操作,也是比较的痛苦,为此我们将其实现通过python代码实现上述的实现,为了提高Doris的集群的问题,我们对自动迁移的程序迁移tablet 和partiton后均进行相应的sleep操作,同时为了减少迁移的工作直接将其迁移过程的元数据保存到mysql中,我们仅需要关注mysql的中的迁移过程即可,其mysql的迁移元数据表ddl如下:

CREATE TABLE `tbl_olap_migrate_info` (`db_name` varchar(100) DEFAULT NULL COMMENT '表名称',`tbl_name` varchar(100) DEFAULT NULL COMMENT '表名称',`p_name` varchar(100) DEFAULT NULL COMMENT '分区id',`tablet` int(11) DEFAULT NULL COMMENT '迁移的tablet主副本id',`task_code` varchar(11000) DEFAULT NULL COMMENT '具体执行的code信息',`consume_time` float DEFAULT NULL COMMENT '任务执行消耗时间',`error_msg` varchar(1000) DEFAULT NULL COMMENT '任务失败具体信息',`task_status` int(11) DEFAULT 0 COMMENT '0: 成功 1:失败',`source_tablet_recoder_total` int(11) DEFAULT NULL COMMENT '源目标的tablet总数',`sink_tablet_recoder_total` int(11) DEFAULT NULL COMMENT '目标表的tablet迁移总数',UNIQUE KEY `tbl_olap_migrate_info_un` (`db_name`,`tbl_name`,`p_name`,`tablet`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='doris迁移任务相关元数据迁移信息记录';

其详细的python代码实现如下:

import pymysql
import time
from typing import Dict, List, Optional, Tuple
import datetime warehouse_config = {'host': 'xxxxx','port': xxx,  # 修正:端口应为整数类型'user': 'xxxx','password': 'xxxxxxxxx','database': 'xxxxxxxx','connect_timeout': 50
}
sync_config = {'source_table': 'xxxxx','target_table': 'xxxxxx','label_prefix': 'xxxxxxx','batch_size': 20   # 添加:分批次数量
}
meta_config = {'host': 'xxxxxx','port': xxxx,  # 修正:端口应为整数类型'user': 'xxxxxx','password': 'xxxxxxx','database': 'xxxxxx','connect_timeout': 50
}dsn = f"mysql://{warehouse_config['user']}:{warehouse_config['password']}@{warehouse_config['host']}:{warehouse_config['port']}/{warehouse_config['database']}?connect_timeout={warehouse_config['connect_timeout']}"def get_table_count(cursor, table, partition):"""获取指定表的记录数"""query = f"SELECT COUNT(*) FROM {table} PARTITION ({partition}) "cursor.execute(query)count = cursor.fetchone()[0]return count
def get_table_count_by_tablet(cursor, table, tablet):"""获取指定表的记录数"""query = f"SELECT COUNT(*) FROM {table} tablet ({tablet}) "cursor.execute(query)count = cursor.fetchone()[0]return countdef get_db_connection(warehouse_config):"""Obtain the database connection object:param warehouse_config: Dictionary, including database connection configuration:return: pymysql.Connection: Object"""try:conn = pymysql.connect(host=warehouse_config['host'],port=warehouse_config['port'],user=warehouse_config['user'],password=warehouse_config['password'],database=warehouse_config['database'],connect_timeout=warehouse_config['connect_timeout'],read_timeout=1800,              charset='utf8mb4')print("Database connection successful.")return connexcept pymysql.MySQLError as e:print(f"Error connecting to the database: {e}")return Nonedef get_doris_table_partitions(cursor,table):"""获取指定表的分区名称"""query = (f"SHOW PARTITIONS FROM  {table}")print("query table partition:", query)cursor.execute(query)partitions = [row[1] for row in cursor.fetchall()]print(f"Number of partitions: {len(partitions)}")return partitionsdef get_doris_tablets(cursor,table, partition):"""获取指定表的tabletID"""query_tablet = (f"SHOW TABLETS FROM {table} PARTITIONS({partition}) order by TabletId")print("query table tablets:", query_tablet)cursor.execute(query_tablet)tablets = [row[0] for row in cursor.fetchall()]print(f"Number of partitions: {len(tablets)}")seen = set()unique_tablets = []for tablet in tablets:if tablet not in seen:seen.add(tablet)unique_tablets.append(tablet)print("===============================")print(f"Number of tablets: {len(tablets)}")print(f"去重后的 Tablet 数量: {len(unique_tablets)}")return unique_tabletsdef generate_select_sql(cursor, source_table, target_table, database_name, sort_key=None):"""生成指定表的 SELECT 列名 SQL 语句(替换 SELECT *),并支持按主键排序参数:cursor: 数据库游标source_table: 源表名target_table: 目标表名database_name: 数据库名sort_key: 用于排序的主键列名,None表示不排序返回:生成的 SELECT 语句(字符串),失败时返回错误信息"""try:# Query all column names of the table (sorted in the order of the table structure)query = """SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s   ORDER BY ORDINAL_POSITION"""query_key_col = """SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s  AND  (COLUMN_KEY IS NOT NULL and COLUMN_KEY != '' and LENGTH(COLUMN_KEY) > 0)ORDER BY ORDINAL_POSITION"""# Use parametric queries to prevent SQL injectioncursor.execute(query, (database_name, target_table))columns = [row[0] for row in cursor.fetchall()]if not columns:return f"-- 错误:表 {target_table} 不存在或无列信息"# Spliced column names (Wrap column names in backticks to prevent keyword conflicts)columns_str = "\n   , ".join([f"`{col}`" for col in columns])select_sql = f"SELECT {columns_str} FROM `{source_table}`"print("generate_select_sql:" + select_sql) return select_sqlexcept Exception as e:return f"-- 数据库错误:{str(e)}"def insert_data_from_partition(cursor, meta_cursor, source_table, target_table, label_prefix, partition,sub_sql,database_name,meta_conn):"""Insert data from the specified partition of the source table into the target table"""if not (partition.startswith('p_') and len(partition) == 10):print(f"⚠️ partition: {partition} The format does not conform. p_YYYYMMDD,Skip")returntoday = datetime.date.today()date_str = partition[2:]try:partition_date = datetime.datetime.strptime(date_str, '%Y%m%d').date()if partition_date >= today:returnexcept ValueError:returntablets  = get_doris_tablets(cursor, source_table, partition)insert_meta_sql = """INSERT  INTO secloud.tbl_olap_migrate_info (db_name, tbl_name, p_name, tablet, task_code, consume_time, error_msg, task_status,source_tablet_recoder_total,sink_tablet_recoder_total)VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""for tablet in tablets:pre_source_total = get_table_count(cursor, source_table, partition)pre_target_total = get_table_count(cursor, target_table, partition)if pre_source_total == pre_target_total:print(f"source table count equal target table count, count val {pre_source_total}, partition: {partition}")returnelif pre_target_total > 100:print("The partition data is inconsistent. Do not re-import. Mark the partition and handle it separately later. The partition name is:" + partition)elif pre_source_total == 0:print("The source partition is empty. Skip this partition and do not perform subsequent migration work . The partition name is:" + partition)returnstart_time = time.time()error_msg = Nonetask_status = 0  # 默认为成功source_tablet_recoder_total = 0  # 初始化源表计数sink_tablet_recoder_total = 0    # 初始化目标表计数try:insert_query = (f"INSERT INTO {target_table} WITH LABEL {label_prefix + partition + '_' + tablet}  \n"f"{sub_sql} tablet ({tablet})")sql = (f"Submit the migration task. The execution code is:{insert_query}")print(sql)cursor.execute(insert_query)source_tablet_recoder_total = get_table_count_by_tablet(cursor, source_table, tablet)sink_tablet_recoder_total = get_table_count(cursor, target_table, partition)end_time = time.time()execution_time = end_time - start_timeprint(f"consume time: {execution_time:.2f} seconds") print(f"Processed partition: {partition}")except Exception as sql_ex:error_msg = str(sql_ex)task_status = 1  # 异常时状态改为失败print(f"❌ Failed to insert partition {partition}, error: {str(sql_ex)}")print(f"❌ Executed SQL:\n{insert_query}")finally:# 计算耗时check_sql = """SELECT 1 FROM secloud.tbl_olap_migrate_info WHERE `db_name` = %s AND `tbl_name` = %s AND `p_name` = %s AND `tablet` = %s"""meta_cursor.execute(check_sql, (database_name, target_table, partition, tablet))exists = meta_cursor.fetchone() is not Noneif exists:print(f"⚠️ 元数据记录已存在,跳过:{database_name}.{target_table} {partition} Tablet {tablet}")continue  # 跳过插入consume_time = time.time() - start_time# -------------------------- 新增:插入元数据记录 --------------------------meta_params = (database_name,          # 数据库名target_table,         # 表名partition,        # 分区名tablet,           # Tablet ID(转为整数,匹配表结构)insert_query,     # 执行的SQL语句round(consume_time, 2),  # 耗时(保留2位小数)error_msg,        # 错误信息(成功为None)task_status,       # 任务状态(0成功,1失败)source_tablet_recoder_total, # 源目标的tablet总数sink_tablet_recoder_total # 目标表的tablet迁移总数)try:meta_cursor.execute(insert_meta_sql, meta_params)meta_conn.commit()  # 提交事务(关键!否则记录不会写入)print(f"✅ 元数据记录插入成功")except Exception as e:meta_conn.rollback()  # 插入失败时回滚print(f"❌ 元数据插入失败:{str(e)}(参数:{meta_params})")time.sleep(30)def main():print("doris data warehouse migration by partition, task begins")print("Prepare to link the data warehouse. The link information is: ",warehouse_config)print("The migration configuration is",sync_config)try:print("Data warehouse Server start!")olap_conn = get_db_connection(warehouse_config)meta_conn = get_db_connection(meta_config)cursor = olap_conn.cursor()meta_cursor = meta_conn.cursor()print("Data warehouse Server complete!")except Exception as err:print(f"Database connection failed:{err}")returntry:s_tbl = sync_config['source_table']t_tbl = sync_config['target_table']database_name = warehouse_config['database']# 获取所有分区partitions = get_doris_table_partitions(cursor, s_tbl)sub_sql = generate_select_sql(cursor, s_tbl, t_tbl, database_name)for partition in partitions[:1]:start_time = time.time()try:insert_data_from_partition(cursor, meta_cursor, s_tbl,t_tbl,sync_config['label_prefix'], partition,sub_sql,database_name,meta_conn)time.sleep(60)except Exception as e:print(f"Exception when handling partition {partition}:{e}")e.with_tracebackbreakfinally:consume_time = time.time() - start_timeoperation_info = (f"migrate data info: table: {s_tbl}  partition:{partition} consume_time:{consume_time}")print(operation_info)except Exception as e:print(f"An exception occurred during the execution of the script:{str(e)}")import tracebacktraceback.print_exc()finally:print("doris data warehouse migration by partition, task  complete!")try:if cursor:cursor.close()if olap_conn:olap_conn.close()if meta_conn:meta_conn.close()if meta_cursor:meta_cursor.close()except Exception:passif __name__ == "__main__":print("doris data warehouse migration by partition, task preparation!")main()

实测分区平均数据大小为300GB,以tablet数据块的方式迁移,平均每个数据块为一分多钟,效果十分显著,其Doris集群的资源消耗也是趋于平稳,不会对线上的业务有什么影响

http://www.dtcms.com/a/288284.html

相关文章:

  • linux--------------------BlockQueue的生产者消费模型
  • 【Docker基础】深入解析Docker-compose核心配置:Services服务配置详解
  • Gitee 提交信息的规范
  • 算法基础知识总结
  • GoC 图片指令
  • BeanFactory 和 FactoryBean 的区别
  • 架构探索笔记【1】
  • 如何快速学习一门新技术
  • 实用的文件和文件夹批量重命名工具
  • 手撕Spring底层系列之:注解驱动的魔力与实现内幕
  • 【Linux】重生之从零开始学习运维之Nginx
  • 【服务器与部署 14】消息队列部署:RabbitMQ、Kafka生产环境搭建指南
  • Linux中添加重定向(Redirection)功能到minishell
  • 中小机构如何低成本搭建教育培训平台?源码开发+私有化部署攻略
  • 什么是帕累托最优,帕累托最优如何运用在组相联映像中
  • AspectJ 表达式中常见符号说明
  • GoogleBenchmark用法
  • 环形区域拉普拉斯方程傅里叶级数解
  • 电阻耐压参数学习总结
  • 再谈进程-控制
  • 敏感词 v0.27.0 新特性之词库独立拆分
  • 5-大语言模型—理论基础:注意力机制优化
  • 关于个人博客系统的测试报告
  • Typecho评论系统集成Markdown编辑器完整教程
  • Windows事件查看器完整指南
  • 最少标记点问题:贪心算法解析
  • 深入了解 find_element 方法:Web 自动化定位元素的核心​
  • Linux某个进程CPU占用率高原因定位手段
  • Vue基础(前端教程①-路由)
  • 从 C# 转 Python 第三天:文件操作、异常处理与错误日志实践