当前位置: 首页 > news >正文

Doris 数据导入

Apache Doris 数据导入完全指南

目录

  • 1. Doris 数据导入概述
  • 2. Stream Load(流式导入)
  • 3. Broker Load(批量导入)
  • 4. Routine Load(例行导入)
  • 5. Insert Into(SQL 导入)
  • 6. 其他导入方式
  • 7. 导入方式对比与选择
  • 8. 性能优化与最佳实践
  • 9. 常见问题与解决方案

1. Doris 数据导入概述

1.1 导入方式分类

Apache Doris 提供了多种数据导入方式:

导入方式适用场景数据源导入模式数据量
Stream Load本地文件、实时流HTTP同步< 10GB
Broker LoadHDFS、S3 大文件Broker异步> 10GB
Routine LoadKafka 实时流Kafka持续流式
Insert IntoSQL 查询结果SQL同步< 1GB
MySQL Load本地文件MySQL同步< 10GB

1.2 导入流程

数据源 → 数据转换 → 数据校验 → 数据写入 → 导入完成

2. Stream Load(流式导入)

2.1 概述

Stream Load 是最常用的导入方式,通过 HTTP 协议导入数据。

特点:

  • 同步导入,实时返回结果
  • 支持 CSV、JSON 格式
  • 单次建议不超过 10GB

2.2 CSV 格式导入

示例 1:基本 CSV 导入

建表:

CREATE TABLE users (id INT,name VARCHAR(50),age INT,city VARCHAR(50)
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;

数据文件 (data.csv):

1,张三,25,北京
2,李四,30,上海
3,王五,28,广州

导入命令:

curl --location-trusted -u root:password \-H "label:user_load_001" \-H "column_separator:," \-T data.csv \http://localhost:8030/api/test_db/users/_stream_load

返回结果:

{"TxnId": 1234,"Label": "user_load_001","Status": "Success","NumberTotalRows": 3,"NumberLoadedRows": 3,"NumberFilteredRows": 0,"LoadBytes": 65,"LoadTimeMs": 156
}
示例 2:指定列映射
curl --location-trusted -u root:password \-H "label:user_load_002" \-H "column_separator:," \-H "columns:name,age,city,id=age*10" \-T data.csv \http://localhost:8030/api/test_db/users/_stream_load
示例 3:数据过滤
curl --location-trusted -u root:password \-H "label:user_load_003" \-H "column_separator:," \-H "where:age > 18 AND city != ''" \-T data.csv \http://localhost:8030/api/test_db/users/_stream_load

2.3 JSON 格式导入

示例 1:简单 JSON

数据文件 (data.json):

{"id": 1, "name": "张三", "age": 25, "city": "北京"}
{"id": 2, "name": "李四", "age": 30, "city": "上海"}

导入命令:

curl --location-trusted -u root:password \-H "label:json_load_001" \-H "format:json" \-H "read_json_by_line:true" \-T data.json \http://localhost:8030/api/test_db/users/_stream_load
示例 2:嵌套 JSON

数据文件:

{"user": {"id": 1, "name": "张三"}, "info": {"age": 25, "city": "北京"}}

导入命令:

curl --location-trusted -u root:password \-H "label:json_load_002" \-H "format:json" \-H "read_json_by_line:true" \-H "jsonpaths:[\"$.user.id\",\"$.user.name\",\"$.info.age\",\"$.info.city\"]" \-H "columns:id,name,age,city" \-T data.json \http://localhost:8030/api/test_db/users/_stream_load
示例 3:JSON 数组

数据文件:

[{"id": 1, "name": "张三", "age": 25},{"id": 2, "name": "李四", "age": 30}
]

导入命令:

curl --location-trusted -u root:password \-H "label:json_array_load" \-H "format:json" \-H "strip_outer_array:true" \-T data.json \http://localhost:8030/api/test_db/users/_stream_load

2.4 高级参数配置

curl --location-trusted -u root:password \-H "label:advanced_load" \-H "column_separator:," \-H "columns:id,name,age,city" \-H "where:age > 18" \-H "max_filter_ratio:0.1" \-H "timeout:3600" \-H "strict_mode:false" \-H "timezone:Asia/Shanghai" \-H "exec_mem_limit:2147483648" \-H "partial_columns:true" \-T data.csv \http://localhost:8030/api/test_db/users/_stream_load

参数说明:

  • label: 导入唯一标识,保证幂等性
  • column_separator: 列分隔符,默认 \t
  • columns: 列映射和转换
  • where: 过滤条件
  • max_filter_ratio: 最大容错率(0-1)
  • timeout: 超时时间(秒)
  • strict_mode: 严格模式
  • exec_mem_limit: 内存限制(字节)
  • partial_columns: 部分列更新

2.5 Python 程序示例

import requestsdef stream_load(file_path, database, table, label):"""Stream Load 导入数据"""url = f"http://localhost:8030/api/{database}/{table}/_stream_load"headers = {"label": label,"column_separator": ",","format": "csv"}with open(file_path, 'rb') as f:response = requests.put(url,headers=headers,auth=("root", "password"),data=f)result = response.json()if result.get("Status") == "Success":print(f"成功导入 {result['NumberLoadedRows']} 行")else:print(f"导入失败: {result.get('Message')}")return result# 使用
stream_load("data.csv", "test_db", "users", "py_load_001")

2.6 Java 程序示例

import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.File;
import java.util.Base64;public class StreamLoadDemo {public static void streamLoad(String filePath, String db, String table) {String url = String.format("http://localhost:8030/api/%s/%s/_stream_load", db, table);try (CloseableHttpClient client = HttpClients.createDefault()) {HttpPut put = new HttpPut(url);// 认证String auth = Base64.getEncoder().encodeToString("root:password".getBytes());put.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + auth);// Headersput.setHeader("label", "java_load_001");put.setHeader("column_separator", ",");// 文件put.setEntity(new FileEntity(new File(filePath)));// 执行var response = client.execute(put);System.out.println("导入完成");} catch (Exception e) {e.printStackTrace();}}
}

3. Broker Load(批量导入)

3.1 概述

Broker Load 用于从外部存储系统(HDFS、S3)导入大批量数据。

特点:

  • 异步导入
  • 支持 TB 级数据
  • 并行导入

3.2 HDFS 导入示例

示例 1:基本 HDFS 导入
LOAD LABEL test_db.hdfs_load_001
(DATA INFILE("hdfs://namenode:9000/user/data/users.csv")INTO TABLE usersCOLUMNS TERMINATED BY ","(id, name, age, city)
)
WITH BROKER "broker_name"
("username" = "hdfs","password" = ""
)
PROPERTIES
("timeout" = "3600"
);
示例 2:导入多个文件
LOAD LABEL test_db.hdfs_load_002
(DATA INFILE("hdfs://namenode:9000/user/data/users_*.csv")INTO TABLE usersCOLUMNS TERMINATED BY ","
)
WITH BROKER "broker_name";
示例 3:Parquet 格式
LOAD LABEL test_db.parquet_load
(DATA INFILE("hdfs://namenode:9000/data/*.parquet")INTO TABLE usersFORMAT AS "parquet"(id, name, age, city)
)
WITH BROKER "broker_name"
PROPERTIES
("timeout" = "7200"
);

3.3 S3 导入示例

LOAD LABEL test_db.s3_load_001
(DATA INFILE("s3://bucket/data/users.csv")INTO TABLE usersCOLUMNS TERMINATED BY ","
)
WITH BROKER "broker_name"
("fs.s3a.access.key" = "your_access_key","fs.s3a.secret.key" = "your_secret_key","fs.s3a.endpoint" = "s3.amazonaws.com"
);

3.4 查看和管理

-- 查看导入任务
SHOW LOAD WHERE LABEL = 'hdfs_load_001';-- 查看所有导入
SHOW LOAD FROM test_db;-- 取消导入
CANCEL LOAD FROM test_db WHERE LABEL = 'hdfs_load_001';

4. Routine Load(例行导入)

4.1 概述

Routine Load 用于从 Kafka 持续导入数据。

特点:

  • 持续导入
  • 自动消费 Kafka
  • exactly-once 语义
  • 自动故障恢复

4.2 创建 Routine Load

示例 1:JSON 格式
CREATE ROUTINE LOAD test_db.routine_load_users ON users
COLUMNS(id, name, age, city)
PROPERTIES
("desired_concurrent_number" = "3","max_batch_interval" = "20","max_batch_rows" = "300000","format" = "json"
)
FROM KAFKA
("kafka_broker_list" = "localhost:9092","kafka_topic" = "user_topic","kafka_partitions" = "0,1,2","kafka_offsets" = "OFFSET_BEGINNING"
);
示例 2:CSV 格式
CREATE ROUTINE LOAD test_db.routine_load_logs ON logs
COLUMNS TERMINATED BY ","
COLUMNS(timestamp, level, message)
PROPERTIES
("desired_concurrent_number" = "5","format" = "csv"
)
FROM KAFKA
("kafka_broker_list" = "broker1:9092,broker2:9092","kafka_topic" = "log_topic"
);
示例 3:指定消费组
CREATE ROUTINE LOAD test_db.routine_load_orders ON orders
PROPERTIES
("format" = "json","jsonpaths" = "[\"$.order_id\",\"$.amount\"]"
)
FROM KAFKA
("kafka_broker_list" = "localhost:9092","kafka_topic" = "order_topic","property.group.id" = "doris_group","property.client.id" = "doris_client"
);

4.3 管理 Routine Load

-- 查看任务
SHOW ROUTINE LOAD FOR test_db.routine_load_users;-- 暂停任务
PAUSE ROUTINE LOAD FOR test_db.routine_load_users;-- 恢复任务
RESUME ROUTINE LOAD FOR test_db.routine_load_users;-- 停止任务
STOP ROUTINE LOAD FOR test_db.routine_load_users;-- 修改配置
ALTER ROUTINE LOAD FOR test_db.routine_load_users
PROPERTIES
("desired_concurrent_number" = "5"
);

4.4 Kafka 安全配置

CREATE ROUTINE LOAD test_db.secure_load ON users
PROPERTIES ("format" = "json")
FROM KAFKA
("kafka_broker_list" = "broker:9092","kafka_topic" = "topic","property.security.protocol" = "SASL_PLAINTEXT","property.sasl.mechanism" = "PLAIN","property.sasl.username" = "user","property.sasl.password" = "pass"
);

5. Insert Into(SQL 导入)

5.1 基本用法

示例 1:单行插入
INSERT INTO users (id, name, age, city) 
VALUES (1, '张三', 25, '北京');
示例 2:多行插入
INSERT INTO users VALUES 
(1, '张三', 25, '北京'),
(2, '李四', 30, '上海'),
(3, '王五', 28, '广州');
示例 3:从查询插入
INSERT INTO users_backup
SELECT * FROM users WHERE age > 25;

5.2 Insert Overwrite

-- 覆盖整表
INSERT OVERWRITE TABLE users
SELECT * FROM users_temp;-- 覆盖分区
INSERT OVERWRITE TABLE orders PARTITION(dt='2025-01-20')
SELECT * FROM orders_temp WHERE dt = '2025-01-20';

5.3 批量插入(JDBC)

String sql = "INSERT INTO users VALUES (?, ?, ?, ?)";
PreparedStatement pstmt = conn.prepareStatement(sql);for (int i = 0; i < 10000; i++) {pstmt.setInt(1, i);pstmt.setString(2, "name" + i);pstmt.setInt(3, 20 + i % 50);pstmt.setString(4, "city" + i % 10);pstmt.addBatch();if (i % 1000 == 0) {pstmt.executeBatch();}
}
pstmt.executeBatch();

6. 其他导入方式

6.1 MySQL Load

LOAD DATA LOCAL INFILE '/path/to/data.csv'
INTO TABLE users
COLUMNS TERMINATED BY ','
LINES TERMINATED BY '\n'
(id, name, age, city);

6.2 S3 Load

LOAD LABEL test_db.s3_load
(DATA INFILE("s3://bucket/file.csv")INTO TABLE usersCOLUMNS TERMINATED BY ","
)
WITH S3
("AWS_ENDPOINT" = "s3.amazonaws.com","AWS_ACCESS_KEY" = "key","AWS_SECRET_KEY" = "secret","AWS_REGION" = "us-east-1"
);

7. 导入方式对比与选择

7.1 功能对比

特性Stream LoadBroker LoadRoutine LoadInsert
模式同步异步持续同步
数据量< 10GB> 10GB流式< 1GB
延迟秒级分钟级秒级毫秒级
并发

7.2 选择建议

Stream Load - 适用于:

  • 本地文件导入
  • 实时数据接口
  • 小批量高频导入

Broker Load - 适用于:

  • HDFS/S3 大文件
  • TB 级数据迁移
  • 定时批量导入

Routine Load - 适用于:

  • Kafka 实时流
  • 日志收集
  • CDC 数据同步

Insert Into - 适用于:

  • SQL 查询结果
  • 表间数据转换
  • 少量数据插入

8. 性能优化与最佳实践

8.1 Stream Load 优化

1. 批量大小
# 建议:100MB - 1GB 单次导入
# 太小:频繁请求,开销大
# 太大:超时风险,内存压力curl -u root:password \-H "timeout:3600" \-H "exec_mem_limit:4294967296" \-T data.csv \http://localhost:8030/api/db/table/_stream_load
2. 并行导入
from concurrent.futures import ThreadPoolExecutordef load_file(file, label):# 导入逻辑passfiles = ['f1.csv', 'f2.csv', 'f3.csv']
with ThreadPoolExecutor(max_workers=3) as executor:for i, f in enumerate(files):executor.submit(load_file, f, f"load_{i}")
3. 表结构优化
CREATE TABLE users (id BIGINT,name VARCHAR(50),age INT
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES ("replication_num" = "3","compression" = "LZ4"
);

8.2 Broker Load 优化

LOAD LABEL optimized_load
(DATA INFILE("hdfs://namenode/data/*.csv")INTO TABLE users
)
WITH BROKER "broker"
PROPERTIES
("timeout" = "7200","max_filter_ratio" = "0.1","load_parallelism" = "10"
);

8.3 Routine Load 优化

CREATE ROUTINE LOAD optimized_routine ON users
PROPERTIES
("desired_concurrent_number" = "5","max_batch_interval" = "10","max_batch_rows" = "500000","max_batch_size" = "104857600"
)
FROM KAFKA (...);

8.4 最佳实践

1. 选择合适的数据模型

-- Duplicate: 明细数据
-- Aggregate: 聚合数据
-- Unique: 主键去重

2. 合理设置分桶数

-- 分桶数 = BE 节点数 * 5-10
DISTRIBUTED BY HASH(id) BUCKETS 32

3. 启用压缩

PROPERTIES ("compression" = "LZ4")

4. 使用 Label 保证幂等

-H "label:unique_label_20250120"

5. 监控导入状态

SHOW LOAD;
SHOW ROUTINE LOAD;

9. 常见问题与解决方案

9.1 Stream Load 问题

问题 1:导入超时

# 解决:增加超时时间
-H "timeout:7200"

问题 2:内存不足

# 解决:限制内存使用
-H "exec_mem_limit:2147483648"

问题 3:Label 重复

{"Status": "Label Already Exists"
}
# 解决:更换 Label
-H "label:new_unique_label"

9.2 Broker Load 问题

问题 1:Broker 连接失败

-- 检查 Broker
SHOW BROKER;-- 测试连接
SHOW LOAD WHERE LABEL = 'xxx'\G

问题 2:文件路径错误

-- 使用正确的协议
hdfs://namenode:9000/path
s3://bucket/path

9.3 Routine Load 问题

问题 1:消费延迟

-- 增加并发数
ALTER ROUTINE LOAD FOR db.job
PROPERTIES ("desired_concurrent_number" = "10");

问题 2:任务暂停

-- 查看原因
SHOW ROUTINE LOAD FOR db.job\G-- 恢复任务
RESUME ROUTINE LOAD FOR db.job;

9.4 性能问题

问题 1:导入慢

  • 增加并行度
  • 优化表结构
  • 调整分桶数

问题 2:数据倾斜

-- 使用更均匀的分桶键
DISTRIBUTED BY HASH(better_key) BUCKETS 32

10. 实战案例

案例 1:电商订单数据导入

-- 建表
CREATE TABLE orders (order_id BIGINT,user_id BIGINT,amount DECIMAL(10,2),order_time DATETIME,status VARCHAR(20)
)
DUPLICATE KEY(order_id, order_time)
PARTITION BY RANGE(order_time)
(PARTITION p20250101 VALUES LESS THAN ("2025-01-02"),PARTITION p20250102 VALUES LESS THAN ("2025-01-03")
)
DISTRIBUTED BY HASH(order_id) BUCKETS 32;-- Stream Load 导入
curl -u root:password \-H "label:order_load_001" \-H "column_separator:," \-H "columns:order_id,user_id,amount,order_time,status" \-T orders.csv \http://localhost:8030/api/ecommerce/orders/_stream_load

案例 2:实时日志收集

-- 建表
CREATE TABLE access_logs (timestamp DATETIME,ip VARCHAR(50),url VARCHAR(500),status INT,response_time INT
)
DUPLICATE KEY(timestamp)
DISTRIBUTED BY HASH(ip) BUCKETS 16;-- Routine Load
CREATE ROUTINE LOAD ecommerce.log_routine ON access_logs
COLUMNS(timestamp, ip, url, status, response_time)
PROPERTIES
("desired_concurrent_number" = "3","format" = "json"
)
FROM KAFKA
("kafka_broker_list" = "localhost:9092","kafka_topic" = "access_log"
);

案例 3:数据仓库ETL

-- 从 HDFS 导入维度表
LOAD LABEL dw.dim_user_load
(DATA INFILE("hdfs://namenode:9000/dw/dim_user/*.parquet")INTO TABLE dim_userFORMAT AS "parquet"
)
WITH BROKER "hdfs_broker";-- 从事实表聚合到汇总表
INSERT INTO fact_order_summary
SELECT DATE(order_time) as dt,user_id,COUNT(*) as order_count,SUM(amount) as total_amount
FROM fact_order
WHERE DATE(order_time) = CURDATE()
GROUP BY DATE(order_time), user_id;

附录

A. 常用 SQL 命令

-- 查看导入任务
SHOW LOAD;
SHOW LOAD FROM database;
SHOW LOAD WHERE LABEL = 'xxx';-- 查看 Routine Load
SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD FOR database.job;-- 取消任务
CANCEL LOAD FROM database WHERE LABEL = 'xxx';
STOP ROUTINE LOAD FOR database.job;

B. 快速参考

命令说明
Stream Loadcurl -T file http://fe:8030/api/db/table/_stream_load
Broker LoadLOAD LABEL ... WITH BROKER
Routine LoadCREATE ROUTINE LOAD ... FROM KAFKA
InsertINSERT INTO table VALUES (...)

http://www.dtcms.com/a/507635.html

相关文章:

  • 网站建设+泰安saas建站平台有哪些
  • 动态规划之两个字符组/两个数组的dp问题
  • 【AI论文】UniVideo:面向视频的统一理解、生成与编辑
  • 获取resources目录下静态资源的两种方式
  • 一个域名可以做几个网站吗最好加盟网站建设
  • Android 自定义 View 如何设置默认尺寸
  • C#技术栈
  • 广东建设监理网站如何查企业的工商信息
  • INT301 Bio-computation 生物计算(神经网络)Pt.2 监督学习模型:感知器(Perceptron)
  • 机器学习(4)多特征与向量化
  • stripe/paypal
  • 机器学习(5)特征缩放与梯度下降收敛
  • 英飞凌推出首款100V aec合格GaN晶体管
  • 李宏毅机器学习笔记27
  • 机器学习作业七
  • openEuler安装jdk,nginx,redis
  • ffmpeg 交叉编译
  • Python编程之面向对象
  • 建设一个网站大概费用门户网站开发工具
  • OpenCV cv::Mat.type() 以及类型数据转换
  • Elasticsearch批量写入50万数据
  • 爬取GitHub开源项目信息并生成词云:从数据抓取到可视化实践
  • 做阀门的网站域名有了怎么建设网站
  • 西安交大Nat. Commun:749.276 cm²认证效率19.50%,通过IEC测试迈向产线
  • 百度站长平台登录网站图片自动轮换怎么做的
  • KuiklyUI 科普:UI 如何映射到 Android View 并完成渲染
  • 【2025-系统规划与管理师】第11章:信息系统治理
  • Python中如何实现数据库迁移
  • 第6部分:使用Netty的常见坑与注意事项
  • 广东企业品牌网站建设价格免费做网站的方法