当前位置: 首页 > news >正文

Doris专题11- 数据导入概览

1. 数据导入概览

1.1 导入方式分类

数据导入方式
实时写入
流式同步
批量导入
外部数据源集成
JDBC INSERT
Stream Load
Group Commit
Flink Connector
Routine Load
Kafka Connector
Flink CDC
Broker Load
INSERT INTO SELECT
Streamloader
MySQL Load
Catalog外部表
INSERT INTO SELECT
异步JOB

1.2 导入方式对比

导入方式使用场景支持格式导入模式数据源特点
Stream Load本地文件/应用程序写入CSV, JSON, Parquet, ORC同步本地文件、HTTP流高性能、实时响应
Broker Load对象存储/HDFS导入CSV, JSON, Parquet, ORC异步S3, HDFS, OSS大数据量、后台执行
Routine LoadKafka实时流CSV, JSON异步Kafka持续消费、Exactly-Once
Flink ConnectorFlink流处理任意Flink格式同步/异步Flink DataStream流处理集成
MySQL Load本地CSV文件CSV同步本地文件MySQL协议兼容
INSERT INTO SELECT外部表/文件SQL同步外部表、对象存储灵活查询导入
Group Commit高频小批量多种格式同步应用程序高并发优化

2. 本地文件导入

2.1 Stream Load

-- 1. 创建目标表
CREATE TABLE testdb.test_streamload(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 使用curl执行Stream Load
curl --location-trusted -u username:password \-H "column_separator:," \-H "columns:user_id,name,age" \-T streamload_example.csv \-XPUT http://fe_host:fe_http_port/api/testdb/test_streamload/_stream_load-- 3. 返回结果示例
{"TxnId": 3,"Label": "123","Status": "Success","Message": "OK","NumberTotalRows": 10,"NumberLoadedRows": 10,"NumberFilteredRows": 0,"LoadBytes": 118,"LoadTimeMs": 173
}

2.2 Streamloader工具(多文件并发)

# 使用streamloader工具(基于Stream Load)
doris-streamloader \--source_file="streamloader_example.csv" \--url="http://localhost:8030" \--header="column_separator:," \--db="testdb" \--table="test_streamloader"# 返回结果
Load Result: {"Status": "Success","TotalRows": 10,"LoadedRows": 10,"FilteredRows": 0,"LoadBytes": 118,"LoadTimeMs": 623
}

2.3 MySQL Load(兼容MySQL协议)

-- 1. 连接MySQL客户端(必须使用--local-infile)
mysql --local-infile -h fe_host -P fe_query_port -u root -D testdb-- 2. 创建目标表
CREATE TABLE testdb.t1 (pk INT,v1 INT SUM
) AGGREGATE KEY (pk)
DISTRIBUTED BY hash (pk);-- 3. 执行MySQL Load
LOAD DATA LOCAL
INFILE 'client_local.csv'
INTO TABLE testdb.t1
COLUMNS TERMINATED BY ','
LINES TERMINATED BY '\n';-- 4. 返回结果
Query OK, 6 row affected (0.17 sec)
Records: 6 Deleted: 0 Skipped: 0 Warnings: 0

3. HDFS导入

3.1 HDFS Load(异步)

-- 1. 创建目标表
CREATE TABLE test_hdfsload(user_id BIGINT NOT NULL COMMENT "user id",name VARCHAR(20) COMMENT "name",age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 执行HDFS Load
LOAD LABEL hdfs_load_2022_04_01
(DATA INFILE("hdfs://127.0.0.1:8020/tmp/hdfsload_example.csv")INTO TABLE test_hdfsloadCOLUMNS TERMINATED BY ","FORMAT AS "CSV"(user_id, name, age)
)
WITH HDFS
("fs.defaultFS" = "hdfs://127.0.0.1:8020","hadoop.username" = "user"
)
PROPERTIES
("timeout" = "3600"
);-- 3. 检查导入结果
SELECT * FROM test_hdfsload;

3.2 TVF方式(同步)

-- 使用Table-Valued Function同步导入
INSERT INTO test_hdfsload
SELECT * FROM hdfs ("uri" = "hdfs://127.0.0.1:8020/tmp/hdfsload_example.csv","fs.defaultFS" = "hdfs://127.0.0.1:8020","hadoop.username" = "doris","format" = "csv","csv_schema" = "user_id:int;name:string;age:int"
);

4. Kafka实时导入

4.1 Routine Load(推荐)

Kafka Topic
Routine Load Job
Doris Table
持续监控
自动恢复
数据查询
4.1.1 单表导入
-- 1. 创建目标表
CREATE TABLE testdb.test_routineload_tbl(user_id BIGINT NOT NULL COMMENT "user_id",name VARCHAR(20) COMMENT "name",age INT COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 创建Routine Load Job
CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ","
COLUMNS(user_id, name, age)
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-routine-load-csv","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);-- 3. 检查导入数据
SELECT * FROM test_routineload_tbl;
4.1.2 多表导入
-- 1. 创建多张目标表
CREATE TABLE test_multi_table_load1(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;CREATE TABLE test_multi_table_load2(user_id BIGINT NOT NULL COMMENT "用户ID",name VARCHAR(20) COMMENT "用户姓名",age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;-- 2. 创建多表Routine Load Job
CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-multi-table-load","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);-- Kafka数据格式要求:table_name|val1,val2,val3
-- 示例数据:
-- test_multi_table_load1|1,Emily,25
-- test_multi_table_load2|2,Benjamin,35

4.2 Kafka Connector(高级格式)

# 1. 以Distributed模式启动Kafka Connect
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties# 2. 创建Doris Kafka Connector任务
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "test-doris-sink-cluster","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max": "10","topics": "test-data-topic","doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl","buffer.count.records": "10000","buffer.flush.time": "120","buffer.size.bytes": "5000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test_db","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}'# 3. 管理Connector状态
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
4.2.1 Debezium CDC数据导入
# 导入Debezium采集的MySQL CDC数据
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "test-debezium-doris-sink","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max": "10","topics": "mysql_debezium.test.test_user","doris.topic2table.map": "mysql_debezium.test.test_user:test_user","buffer.count.records": "10000","buffer.flush.time": "120","buffer.size.bytes": "5000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test_db","converter.mode": "debezium_ingestion","enable.delete": "true","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter"}
}'
4.2.2 Avro格式数据导入
# 导入Avro序列化格式数据
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name": "doris-avro-test","config": {"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector","topics": "avro_topic","tasks.max": "10","doris.topic2table.map": "avro_topic:avro_tab","buffer.count.records": "100000","buffer.flush.time": "120","buffer.size.bytes": "10000000","doris.urls": "10.10.10.1","doris.user": "root","doris.password": "","doris.http.port": "8030","doris.database": "test","key.converter": "io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url": "http://127.0.0.1:8081","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://127.0.0.1:8081"}
}'

5. Flink流式导入

5.1 Flink Doris Connector

-- 1. 在Doris中创建目标表
CREATE TABLE `students` (`id` INT NULL,`name` VARCHAR(256) NULL,`age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");-- 2. 在Flink SQL中创建sink表
CREATE TABLE student_sink (id INT,name STRING,age INT
) WITH ('connector' = 'doris','fenodes' = '10.16.10.6:8030','table.identifier' = 'test.students','username' = 'root','password' = '','sink.label-prefix' = 'doris_label'
);-- 3. 插入数据到Doris
INSERT INTO student_sink VALUES (1, 'zhangsan', 123);-- 4. 在Doris中验证数据
SELECT * FROM test.students;

6. 导入策略选择指南

6.1 场景化导入方案

导入需求分析
数据源类型
本地文件
消息队列
流处理引擎
外部存储系统
文件大小
小文件: Stream Load
大文件: Streamloader
CSV文件: MySQL Load
消息格式
CSV/JSON: Routine Load
Avro/Protobuf: Kafka Connector
CDC数据: Debezium + Kafka Connector
Flink: Flink Connector
HDFS: Broker Load/TVF
对象存储: Broker Load
外部表: INSERT INTO SELECT
实时响应
并发优化
Exactly-Once
格式丰富
流处理集成
批量高效

6.2 性能优化建议

6.2.1 Stream Load优化
# 使用并行Stream Load提升吞吐量
# 示例:同时上传多个文件
curl -T file1.csv http://fe1:8030/api/db/tbl/_stream_load &
curl -T file2.csv http://fe2:8030/api/db/tbl/_stream_load &
curl -T file3.csv http://fe3:8030/api/db/tbl/_stream_load &# 等待所有任务完成
wait
6.2.2 Routine Load配置调优
-- 优化Routine Load参数
CREATE ROUTINE LOAD example_optimized ON tbl
PROPERTIES ("max_batch_interval" = "10",           -- 最大间隔10秒"max_batch_rows" = "200000",           -- 每批最大行数"max_batch_size" = "100000000",        -- 每批最大字节数"exec_mem_limit" = "2147483648"        -- 内存限制2GB
)
FROM KAFKA(...);
6.2.3 Group Commit高并发写入
-- 启用Group Commit优化高并发INSERT
SET enable_group_commit = true;-- 小批量频繁INSERT场景
INSERT INTO tbl VALUES (1, 'A'), (2, 'B'), (3, 'C');
INSERT INTO tbl VALUES (4, 'D'), (5, 'E'), (6, 'F');

6.3 错误处理与监控

6.3.1 导入状态监控
-- 查看Stream Load任务
SHOW STREAM LOAD;-- 查看Routine Load任务
SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD TASK;-- 查看Broker Load任务  
SHOW LOAD;-- 查看导入错误信息
SHOW LOAD WARNINGS FROM `label`;
6.3.2 数据质量保障
-- 1. 设置最大容忍错误率
LOAD LABEL example_label
(...)
PROPERTIES ("max_filter_ratio" = "0.1"  -- 最大容忍10%错误
);-- 2. 严格模式(零容忍错误)
PROPERTIES ("strict_mode" = "true"
);-- 3. 设置超时时间
PROPERTIES ("timeout" = "3600"  -- 1小时超时
);

7. 最佳实践总结

7.1 导入方式选择矩阵

场景特征推荐方案备选方案注意事项
实时应用写入Stream LoadJDBC INSERT + Group Commit控制并发数,避免FE压力
Kafka实时流Routine LoadKafka Connector根据数据格式选择
Flink流处理Flink Connector-原生集成,性能最佳
本地大文件StreamloaderStream Load多文件并发上传
HDFS批量数据Broker LoadTVF方式适合TB级数据
CDC数据同步Debezium + Kafka ConnectorFlink CDC支持DELETE操作
外部数据源INSERT INTO SELECT + Catalog-灵活查询过滤

7.2 性能调优检查清单

  1. 资源规划

    • FE节点数量满足并发导入需求
    • BE节点存储和计算资源充足
    • 网络带宽满足数据吞吐要求
  2. 参数优化

    • 合理设置batch大小和间隔
    • 根据数据特征调整内存限制
    • 配置适当的超时时间
  3. 监控告警

    • 监控导入任务状态和进度
    • 设置错误率阈值告警
    • 定期检查存储空间
  4. 容错处理

    • 实现导入失败重试机制
    • 设置数据验证检查点
    • 准备数据回滚方案
http://www.dtcms.com/a/457154.html

相关文章:

  • 厦门优化网站排名合肥网站改版
  • 详解Spring Security+OAuth2.0 和 sa-token
  • 临沂企业建站程序德国网站的后缀名
  • Day14_内核编译安装
  • 全面SEO优化指南:网站运营与前端开发的协同策略
  • 网站整站优化公司赣州平面设计公司
  • DAY03:【DL 第一弹】神经网络
  • 2018年下半年试题四:论NoSQL数据库技术及其应用
  • 如何检查网站死链网站建设技术咨询协议
  • 【MATLAB技巧】contour|等高线图绘制,使用示例和使用技巧
  • matlab计算算法的运行时间
  • 有人情味的网站北京大数据公司排行
  • 时间箭头 量子信息的不可逆扩散
  • iPhone 17 Pro Max 的影像升级全解:从长焦、前置聊到 ProRes RAW
  • 阿里巴巴国际站费用淘宝装修做代码的网站
  • 解决comet等待网络连接的问题
  • 【开题答辩全过程】以 爱宠宠物商店管理系统的设计与实现为例,包含答辩的问题和答案
  • Spring Boot Actuator+Micrometer:高并发下 JVM 监控体系的轻量化实践
  • 建设网站 深圳石家庄企业网站建设
  • 4分15秒!高质量超长视频生成取得颠覆突破!字节Self-Forcing++超基线50倍,效果炸裂!
  • 沈阳网站建设小志企业网站备案提供什么资料
  • 机器人再冲港交所,优艾智合能否破行业困局?
  • Chromium 138 编译指南 - Android 篇:从Linux版切换到Android版(六)
  • Hugging face Transformers(5)—— Datasets
  • 大良营销网站建设精英做外贸都有哪些网站
  • 对于SpringBoot的三层缓存的思考
  • Spring Boot 中使用 Caffeine 缓存详解与案例
  • 零基础网站建设教学服务wordpress点击图片不显示不出来
  • 大模型原理与实践:第六章-大模型训练流程实践_第1部分-模型预训练(Trainer、DeepSeed)
  • 分析DAO组织如何重构开发者协作关系