ClickHouse迁移Starrocks脚本工具
使用方法:执行sh process_inspect_event_dist.sh
#!/bin/bash# 表名 分区字段 分区
sh export.sh m_event "toYYYYMM(operation_time)" 202509# 表名 分区 表内字段
sh import.sh m_event 202509 batch_id,business_type,tenant_id,create_time
先导出ClickHouse数据至CSV文件:
#!/bin/bash# 定义表名数组
tables=("$1")
partition="$2"
dt=$3# 定义IP地址数组
ips=("198.0.0.1""198.0.0.2"
)# 清理csv文件
rm -rf /data1/export/out* if [ "$1" = "" ]; thenecho "错误: table name变量未设置"exit 1
fiif [ "$2" = "" ]; thenecho "错误: partition变量未设置"exit 1
fiif [ "$dt" = "" ]; thenecho "错误: dt变量未设置"exit 1
fiecho "Export for dt $dt:"# 循环遍历每张表
for table in "${tables[@]}"; doecho "Processing table: $table"# 循环遍历每个IP地址for i in "${!ips[@]}"; doip=${ips[$i]}echo "Executing query on IP: $ip"# 构建输出文件名output_file="out-${table}-$dt-$((i+1)).csv"# 执行ClickHouse查询并将结果保存到CSV文件clickhouse-client -h "$ip" -u admin --password 123456 -m --query "SELECT * FROM test.$table WHERE $2 = '$3' ;" > "$output_file"# 检查命令是否成功if [ $? -eq 0 ]; thenecho "Query executed successfully. Output saved to $output_file"elseecho "Failed to execute query on IP: $ip"fidoneecho "Finished processing table: $table"echo "----------------------------------------"
doneecho "All queries have been executed."
然后导入StarRocks中:
#!/bin/bash# StarRocks Stream Load配置
USER="root"
PASSWORD="123456"
HOST="192.168.0.3"
PORT="8030"
DATABASE="test"
TABLE="$1"dt=$2# 列定义
COLUMNS="$3"# 文件列表(按数字顺序排序)
FILES=(
"out-$1-$dt-1.csv"
)# 创建日志目录
LOG_DIR="/data1/export"if [ "$1" = "" ]; thenecho "错误: table name变量未设置"exit 1
fiif [ "$dt" = "" ]; thenecho "错误: dt变量未设置"exit 1
fiif [ "$3" = "" ]; thenecho "错误: columns变量未设置"exit 1
fi# 遍历所有文件并执行Stream Load
for FILE in "${FILES[@]}"; do# 检查文件是否存在if [ ! -f "$FILE" ]; thenecho "文件 $FILE 不存在,跳过..."continuefi# 生成唯一label(带时间戳)TIMESTAMP=$(date +%Y%m%d%H%M%S)LABEL="load_${FILE%.*}_$TIMESTAMP"# 输出当前处理信息echo "正在处理文件: $FILE (Label: $LABEL)"echo "文件大小: $(du -h "$FILE" | cut -f1)"# 执行Stream LoadSTART_TIME=$(date +%s)curl --location-trusted -u "$USER:$PASSWORD" \-H "label:$LABEL" \-H "column_separator: " \-H "columns:$COLUMNS" \-T "$FILE" -XPUT \"http://$HOST:$PORT/api/$DATABASE/$TABLE/_stream_load" > "$LOG_DIR/${LABEL}.log" 2>&1# 检查curl命令执行结果CURL_EXIT_CODE=$?END_TIME=$(date +%s)DURATION=$((END_TIME - START_TIME))if [ $CURL_EXIT_CODE -eq 0 ]; then# 解析响应日志检查是否成功if grep -q '"Status": "Success"' "$LOG_DIR/${LABEL}.log"; thenecho "成功导入 $FILE (耗时: ${DURATION}秒)"# 可选:导入成功后移动或删除文件# mv "$FILE" "processed/$FILE"elseecho "导入 $FILE 失败,请查看日志: $LOG_DIR/${LABEL}.log"fielseecho "执行curl命令失败 (退出码: $CURL_EXIT_CODE),文件: $FILE"fi# 避免短时间内发送过多请求(根据需要调整)sleep 2
doneecho "所有文件处理完成。日志保存在 $LOG_DIR 目录中。"
