Doris专题11- 数据导入概览
1. 数据导入概览
1.1 导入方式分类
1.2 导入方式对比
导入方式 | 使用场景 | 支持格式 | 导入模式 | 数据源 | 特点 |
---|---|---|---|---|---|
Stream Load | 本地文件/应用程序写入 | CSV, JSON, Parquet, ORC | 同步 | 本地文件、HTTP流 | 高性能、实时响应 |
Broker Load | 对象存储/HDFS导入 | CSV, JSON, Parquet, ORC | 异步 | S3, HDFS, OSS | 大数据量、后台执行 |
Routine Load | Kafka实时流 | CSV, JSON | 异步 | Kafka | 持续消费、Exactly-Once |
Flink Connector | Flink流处理 | 任意Flink格式 | 同步/异步 | Flink DataStream | 流处理集成 |
MySQL Load | 本地CSV文件 | CSV | 同步 | 本地文件 | MySQL协议兼容 |
INSERT INTO SELECT | 外部表/文件 | SQL | 同步 | 外部表、对象存储 | 灵活查询导入 |
Group Commit | 高频小批量 | 多种格式 | 同步 | 应用程序 | 高并发优化 |
2. 本地文件导入
2.1 Stream Load
-- 1. 创建目标表
CREATE TABLE testdb.test_streamload(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 使用curl执行Stream Load
curl --location-trusted -u username:password \-H "column_separator:," \-H "columns:user_id,name,age" \-T streamload_example.csv \-XPUT http://fe_host:fe_http_port/api/testdb/test_streamload/_stream_load-- 3. 返回结果示例
{"TxnId": 3,"Label": "123","Status": "Success","Message": "OK","NumberTotalRows": 10,"NumberLoadedRows": 10,"NumberFilteredRows": 0,"LoadBytes": 118,"LoadTimeMs": 173
}
2.2 Streamloader工具(多文件并发)
# 使用streamloader工具(基于Stream Load)
doris-streamloader \--source_file="streamloader_example.csv" \--url="http://localhost:8030" \--header="column_separator:," \--db="testdb" \--table="test_streamloader"# 返回结果
Load Result: {"Status": "Success","TotalRows": 10,"LoadedRows": 10,"FilteredRows": 0,"LoadBytes": 118,"LoadTimeMs": 623
}
2.3 MySQL Load(兼容MySQL协议)
-- 1. 连接MySQL客户端(必须使用--local-infile)
mysql --local-infile -h fe_host -P fe_query_port -u root -D testdb-- 2. 创建目标表
CREATE TABLE testdb.t1 (pk INT,v1 INT SUM
) AGGREGATE KEY (pk)
DISTRIBUTED BY hash (pk);-- 3. 执行MySQL Load
LOAD DATA LOCAL
INFILE 'client_local.csv'
INTO TABLE testdb.t1
COLUMNS TERMINATED BY ','
LINES TERMINATED BY '\n';-- 4. 返回结果
Query OK, 6 row affected (0.17 sec)
Records: 6 Deleted: 0 Skipped: 0 Warnings: 0
3. HDFS导入
3.1 HDFS Load(异步)
-- 1. 创建目标表
CREATE TABLE test_hdfsload(user_id BIGINT NOT NULL COMMENT "user id",name VARCHAR(20) COMMENT "name",age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 执行HDFS Load
LOAD LABEL hdfs_load_2022_04_01
(DATA INFILE("hdfs://127.0.0.1:8020/tmp/hdfsload_example.csv")INTO TABLE test_hdfsloadCOLUMNS TERMINATED BY ","FORMAT AS "CSV"(user_id, name, age)
)
WITH HDFS
("fs.defaultFS" = "hdfs://127.0.0.1:8020","hadoop.username" = "user"
)
PROPERTIES
("timeout" = "3600"
);-- 3. 检查导入结果
SELECT * FROM test_hdfsload;
3.2 TVF方式(同步)
-- 使用Table-Valued Function同步导入
INSERT INTO test_hdfsload
SELECT * FROM hdfs ("uri" = "hdfs://127.0.0.1:8020/tmp/hdfsload_example.csv","fs.defaultFS" = "hdfs://127.0.0.1:8020","hadoop.username" = "doris","format" = "csv","csv_schema" = "user_id:int;name:string;age:int"
);
4. Kafka实时导入
4.1 Routine Load(推荐)
4.1.1 单表导入
-- 1. 创建目标表
CREATE TABLE testdb.test_routineload_tbl(user_id BIGINT NOT NULL COMMENT "user_id",name VARCHAR(20) COMMENT "name",age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 创建Routine Load Job
CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ","
COLUMNS(user_id, name, age)
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-routine-load-csv","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);-- 3. 检查导入数据
SELECT * FROM test_routineload_tbl;
4.1.2 多表导入
-- 1. 创建多张目标表
CREATE TABLE test_multi_table_load1(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;CREATE TABLE test_multi_table_load2(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 创建多表Routine Load Job
CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-multi-table-load","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);-- Kafka数据格式要求:table_name|val1,val2,val3
-- 示例数据:
-- test_multi_table_load1|1,Emily,25
-- test_multi_table_load2|2,Benjamin,35
4.2 Kafka Connector(高级格式)
# 1. 以Distributed模式启动Kafka Connect
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties# 2. 创建Doris Kafka Connector任务
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "test-doris-sink-cluster","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max": "10","topics": "test-data-topic","doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl","buffer.count.records": "10000","buffer.flush.time": "120","buffer.size.bytes": "5000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test_db","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}'# 3. 管理Connector状态
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
4.2.1 Debezium CDC数据导入
# 导入Debezium采集的MySQL CDC数据
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "test-debezium-doris-sink","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max": "10","topics": "mysql_debezium.test.test_user","doris.topic2table.map": "mysql_debezium.test.test_user:test_user","buffer.count.records": "10000","buffer.flush.time": "120","buffer.size.bytes": "5000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test_db","converter.mode": "debezium_ingestion","enable.delete": "true","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}'
4.2.2 Avro格式数据导入
# 导入Avro序列化格式数据
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "doris-avro-test","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","topics": "avro_topic","tasks.max": "10","doris.topic2table.map": "avro_topic:avro_tab","buffer.count.records": "100000","buffer.flush.time": "120","buffer.size.bytes": "10000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test","key.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://127.0.0.1:8081","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://127.0.0.1:8081"}
}'
5. Flink流式导入
5.1 Flink Doris Connector
-- 1. 在Doris中创建目标表
CREATE TABLE `students` (`id` INT NULL,`name` VARCHAR(256) NULL,`age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");-- 2. 在Flink SQL中创建sink表
CREATE TABLE student_sink (id INT,name STRING,age INT
) WITH ('connector' = 'doris','fenodes' = '10.16.10.6:8030','table.identifier' = 'test.students','username' = 'root','password' = '','sink.label-prefix' = 'doris_label'
);-- 3. 插入数据到Doris
INSERT INTO student_sink VALUES (1, 'zhangsan', 123);-- 4. 在Doris中验证数据
SELECT * FROM test.students;
6. 导入策略选择指南
6.1 场景化导入方案
6.2 性能优化建议
6.2.1 Stream Load优化
# 使用并行Stream Load提升吞吐量
# 示例:同时上传多个文件
curl -T file1.csv http://fe1:8030/api/db/tbl/_stream_load &
curl -T file2.csv http://fe2:8030/api/db/tbl/_stream_load &
curl -T file3.csv http://fe3:8030/api/db/tbl/_stream_load &# 等待所有任务完成
wait
6.2.2 Routine Load配置调优
-- 优化Routine Load参数
CREATE ROUTINE LOAD example_optimized ON tbl
PROPERTIES ("max_batch_interval" = "10", -- 最大间隔10秒"max_batch_rows" = "200000", -- 每批最大行数"max_batch_size" = "100000000", -- 每批最大字节数"exec_mem_limit" = "2147483648" -- 内存限制2GB
)
FROM KAFKA(...);
6.2.3 Group Commit高并发写入
-- 启用Group Commit优化高并发INSERT
SET enable_group_commit = true;-- 小批量频繁INSERT场景
INSERT INTO tbl VALUES (1, 'A'), (2, 'B'), (3, 'C');
INSERT INTO tbl VALUES (4, 'D'), (5, 'E'), (6, 'F');
6.3 错误处理与监控
6.3.1 导入状态监控
-- 查看Stream Load任务
SHOW STREAM LOAD;-- 查看Routine Load任务
SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD TASK;-- 查看Broker Load任务
SHOW LOAD;-- 查看导入错误信息
SHOW LOAD WARNINGS FROM `label`;
6.3.2 数据质量保障
-- 1. 设置最大容忍错误率
LOAD LABEL example_label
(...)
PROPERTIES ("max_filter_ratio" = "0.1" -- 最大容忍10%错误
);-- 2. 严格模式(零容忍错误)
PROPERTIES ("strict_mode" = "true"
);-- 3. 设置超时时间
PROPERTIES ("timeout" = "3600" -- 1小时超时
);
7. 最佳实践总结
7.1 导入方式选择矩阵
场景特征 | 推荐方案 | 备选方案 | 注意事项 |
---|---|---|---|
实时应用写入 | Stream Load | JDBC INSERT + Group Commit | 控制并发数,避免FE压力 |
Kafka实时流 | Routine Load | Kafka Connector | 根据数据格式选择 |
Flink流处理 | Flink Connector | - | 原生集成,性能最佳 |
本地大文件 | Streamloader | Stream Load | 多文件并发上传 |
HDFS批量数据 | Broker Load | TVF方式 | 适合TB级数据 |
CDC数据同步 | Debezium + Kafka Connector | Flink CDC | 支持DELETE操作 |
外部数据源 | INSERT INTO SELECT + Catalog | - | 灵活查询过滤 |
7.2 性能调优检查清单
-
资源规划
- FE节点数量满足并发导入需求
- BE节点存储和计算资源充足
- 网络带宽满足数据吞吐要求
-
参数优化
- 合理设置batch大小和间隔
- 根据数据特征调整内存限制
- 配置适当的超时时间
-
监控告警
- 监控导入任务状态和进度
- 设置错误率阈值告警
- 定期检查存储空间
-
容错处理
- 实现导入失败重试机制
- 设置数据验证检查点
- 准备数据回滚方案