Hudi、Iceberg、Delta Lake、Paimon 建表语法与场景示例

以下针对四种数据湖的核心表类型,提供基于主流计算引擎(Spark/Flink)的建表SQL示例,包含关键参数说明、插入/查询操作,并关联实际业务场景,确保语法可直接复用。
一、Hudi 建表(基于 Spark SQL,主流表类型:COW/MOR)
Hudi 建表需指定 表类型(COW/MOR)、主键、分区字段,核心适配实时CDC同步、近实时更新场景。
1. 核心表类型1:MOR 表(Merge-On-Read,实时写入优先)
适用场景:实时CDC同步(如MySQL订单表实时入湖)、高频更新(如用户画像动态更新)
建表SQL(Spark SQL)
-- 1. 启用Hudi配置(Spark环境需加载Hudi依赖包)
SET hoodie.table.name = order_hudi_mor; -- Hudi表名
SET hoodie.datasource.write.table.type = MERGE_ON_READ; -- 表类型:MOR
SET hoodie.datasource.write.recordkey.field = order_id; -- 主键字段(唯一标识一条记录)
SET hoodie.datasource.write.precombine.field = update_time; -- 预合并字段(同主键时,取update_time大的记录)
SET hoodie.datasource.write.partitionpath.field = dt; -- 分区字段(按日期分区,格式yyyyMMdd)
SET hoodie.datasource.write.hive_style_partitioning = true; -- 启用Hive风格分区(路径含dt=20250101)-- 2. 创建Hudi MOR表(外部表,数据存储在HDFS/OSS)
CREATE TABLE IF NOT EXISTS order_hudi_mor (order_id BIGINT COMMENT '订单ID(主键)',user_id BIGINT COMMENT '用户ID',amount DECIMAL(10,2) COMMENT '订单金额',order_status STRING COMMENT '订单状态(待支付/已支付/取消)',update_time TIMESTAMP COMMENT '最后更新时间(预合并字段)',dt STRING COMMENT '分区字段(yyyyMMdd)'
)
USING HUDI -- 指定使用Hudi格式
LOCATION '/user/hudi/order_hudi_mor' -- 数据存储路径(HDFS/OSS/S3)
COMMENT 'Hudi MOR表:实时同步MySQL订单CDC数据';-- 3. 插入/更新数据(支持INSERT和UPSERT)
-- 插入新数据
INSERT INTO order_hudi_mor
SELECT 1001, 2001, 99.9, '已支付', '2025-01-01 10:30:00', '20250101';-- Upsert(更新已有记录,同order_id时,按update_time覆盖)
INSERT OVERWRITE order_hudi_mor
SELECT 1001, 2001, 109.9, '已支付', '2025-01-01 11:30:00', '20250101';-- 4. 查询数据(支持时间旅行,查看历史版本)
-- 查看最新数据
SELECT * FROM order_hudi_mor WHERE dt = '20250101';-- 时间旅行:查看2025-01-01 11:00前的版本(需开启时间旅行配置)
SET hoodie.time.travel.enabled = true;
SELECT * FROM order_hudi_mor TIMESTAMP AS OF '2025-01-01 11:00:00' WHERE dt = '20250101';
关键参数说明:
hoodie.datasource.write.table.type = MERGE_ON_READ:MOR表类型,写入时追加Avro日志,读取时合并日志与Parquet基础文件,低写入延迟。precombine.field = update_time:解决同主键冲突,保留更新时间最新的记录(CDC同步必备)。hive_style_partitioning = true:生成Hive兼容的分区路径,方便Hive/Trino等引擎查询。
2. 核心表类型2:COW 表(Copy-On-Write,读取优先)
适用场景:离线报表分析(如每日销售汇总)、读多写少(如历史订单查询)
建表SQL(Spark SQL)
-- 1. 配置COW表参数
SET hoodie.table.name = order_hudi_cow;
SET hoodie.datasource.write.table.type = COPY_ON_WRITE; -- 表类型:COW
SET hoodie.datasource.write.recordkey.field = order_id;
SET hoodie.datasource.write.partitionpath.field = dt;-- 2. 创建COW表
CREATE TABLE IF NOT EXISTS order_hudi_cow (order_id BIGINT,user_id BIGINT,amount DECIMAL(10,2),dt STRING
)
USING HUDI
LOCATION '/user/hudi/order_hudi_cow'
COMMENT 'Hudi COW表:离线订单报表分析';-- 3. 插入数据(更新时重写Parquet文件,读取效率高)
INSERT INTO order_hudi_cow
SELECT order_id, user_id, amount, dt FROM order_raw WHERE dt = '20250101';
关键差异:
- COW表无
precombine.field,更新时直接重写整个Parquet文件,写入延迟高但读取速度比MOR快30%+,适合离线场景。
二、Iceberg 建表(支持 Spark/Flink,主流表类型:基础表/隐藏分区表)
Iceberg 建表强调 元数据管理、多引擎兼容,核心适配PB级离线分析、复杂Schema演进场景。
1. 核心表类型1:基础分区表(显式分区,Hive兼容)
适用场景:离线用户行为分析(按日期+地区分区)、多引擎查询(Spark+Trino+Hive)
建表SQL(Spark SQL)
-- 1. 启用Iceberg配置
SET spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
SET spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog;
SET spark.sql.catalog.spark_catalog.type = hive; -- 兼容Hive Metastore-- 2. 创建Iceberg基础分区表
CREATE TABLE IF NOT EXISTS spark_catalog.iparking_dev.user_behavior_iceberg (user_id BIGINT COMMENT '用户ID',behavior_type STRING COMMENT '行为类型(点击/购买/浏览)',behavior_time TIMESTAMP COMMENT '行为时间',region STRING COMMENT '用户地区',dt STRING COMMENT '分区字段(yyyyMMdd)'
)
USING ICEBERG -- 指定Iceberg格式
PARTITIONED BY (dt, region) -- 复合分区:按日期+地区
LOCATION '/user/iceberg/user_behavior_iceberg' -- 存储路径
COMMENT 'Iceberg基础表:离线用户行为分析';-- 3. 插入数据(支持批量插入,兼容Spark SQL语法)
INSERT INTO user_behavior_iceberg
SELECT user_id, behavior_type, behavior_time, region, dt
FROM user_behavior_raw
WHERE dt BETWEEN '20250101' AND '20250107';-- 4. 查询数据(支持元数据剪枝,仅扫描目标分区)
SELECT region, COUNT(*) AS behavior_cnt
FROM user_behavior_iceberg
WHERE dt = '20250101'
GROUP BY region;-- 5. Schema演进(添加新列,无需重写历史数据)
ALTER TABLE user_behavior_iceberg
ADD COLUMN device_type STRING COMMENT '设备类型(iOS/Android)';
关键参数说明:
spark.sql.extensions:加载Iceberg Spark扩展,支持Schema演进、时间旅行等特性。PARTITIONED BY (dt, region):复合分区,Iceberg会自动优化分区元数据,查询时仅加载目标分区的Manifest文件,PB级数据查询效率提升50%+。
2. 核心表类型2:隐藏分区表(分区透明,动态分区管理)
适用场景:动态分区策略调整(如从“按日分区”改为“按小时分区”)、分区字段无感知查询
建表SQL(Flink SQL)
-- 1. 配置Iceberg Flink Catalog
CREATE CATALOG iceberg_catalog WITH ('type' = 'iceberg','catalog-type' = 'hive', -- 基于Hive Metastore'uri' = 'thrift://hive-metastore:9083', -- Hive Metastore地址'warehouse' = '/user/iceberg/warehouse' -- Iceberg数据仓库路径
);USE CATALOG iceberg_catalog;
USE iparking_dev;-- 2. 创建隐藏分区表(分区字段behavior_date由behavior_time自动提取)
CREATE TABLE IF NOT EXISTS user_behavior_iceberg_hidden (user_id BIGINT,behavior_type STRING,behavior_time TIMESTAMP,-- 隐藏分区字段:由behavior_time的日期部分自动生成,查询时无需显式指定behavior_date DATE GENERATED ALWAYS AS DATE(behavior_time)
)
USING ICEBERG
PARTITIONED BY (behavior_date) -- 分区字段为生成列(隐藏)
COMMENT 'Iceberg隐藏分区表:动态分区管理';-- 3. 插入数据(无需手动指定分区字段,自动提取behavior_date)
INSERT INTO user_behavior_iceberg_hidden
SELECT user_id, behavior_type, behavior_time
FROM user_behavior_kafka;-- 4. 查询数据(无需关心分区字段,直接按原始时间过滤)
SELECT COUNT(*) AS click_cnt
FROM user_behavior_iceberg_hidden
WHERE behavior_time BETWEEN '2025-01-01 00:00:00' AND '2025-01-01 23:59:59';
关键优势:
- 隐藏分区字段
behavior_date由behavior_time自动生成,查询时无需显式写PARTITION (behavior_date='2025-01-01'),后续调整分区粒度(如改为小时)无需修改查询语句。
三、Delta Lake 建表(基于 Spark,主流表类型:标准表/外部表)
Delta Lake 建表强依赖 Spark生态、事务日志,核心适配流批一体、强一致性需求场景。
1. 核心表类型1:标准表(Managed Table,事务优先)
适用场景:实时推荐系统(流批一体处理用户行为)、数据合规审计(事务日志追溯)
建表SQL(Spark SQL)
-- 1. 启用Delta Lake配置
SET spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension;
SET spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog;-- 2. 创建Delta标准表(托管表,元数据与数据由Delta管理)
CREATE TABLE IF NOT EXISTS user_behavior_delta (user_id BIGINT,item_id BIGINT,behavior_type STRING,behavior_time TIMESTAMP,dt STRING COMMENT '分区字段'
)
USING DELTA -- 指定Delta格式
PARTITIONED BY (dt)
COMMENT 'Delta标准表:实时用户行为分析';-- 3. 流批一体写入(支持Spark Streaming实时写入+批量插入)
-- 批量插入历史数据
INSERT INTO user_behavior_delta
SELECT user_id, item_id, behavior_type, behavior_time, dt
FROM user_behavior_hist WHERE dt = '20250101';-- 实时写入Kafka流数据
CREATE STREAMING TABLE user_behavior_stream
AS SELECT user_id, item_id, behavior_type, behavior_time, DATE_FORMAT(behavior_time, 'yyyyMMdd') AS dt
FROM STREAM read_kafka('bootstrapServers' = 'kafka:9092','topic' = 'user_behavior_topic','startingOffsets' = 'latest'
);-- 将流数据写入Delta表
INSERT STREAMING INTO user_behavior_delta
SELECT * FROM user_behavior_stream;-- 4. 时间旅行查询(查看历史版本,追溯数据变更)
-- 查看表的所有版本
DESCRIBE HISTORY user_behavior_delta;-- 查看版本2的数据(版本号从DESCRIBE HISTORY获取)
SELECT * FROM user_behavior_delta VERSION AS OF 2 WHERE dt = '20250101';-- 5. 数据删除(支持ACID删除,事务日志记录操作)
DELETE FROM user_behavior_delta WHERE behavior_type = '无效行为';
关键特性:
INSERT STREAMING:支持Spark Streaming实时写入,实现“流批一体”,无需区分实时/离线数据链路。- 事务日志(
_delta_log目录):记录所有增删改操作,支持版本回滚、数据追溯,满足合规审计需求。
2. 核心表类型2:外部表(External Table,数据共享)
适用场景:跨团队数据共享(如数据湖原始数据共享给业务团队)、Hive表迁移(复用现有Hive数据)
建表SQL(Spark SQL)
-- 创建Delta外部表(数据存储在指定路径,删除表时不删除数据)
CREATE EXTERNAL TABLE IF NOT EXISTS user_behavior_delta_external (user_id BIGINT,item_id BIGINT,behavior_type STRING,dt STRING
)
USING DELTA
LOCATION '/user/delta/user_behavior_external' -- 复用现有数据路径
COMMENT 'Delta外部表:跨团队数据共享';-- 插入数据(数据存储在指定LOCATION,与其他引擎共享)
INSERT INTO user_behavior_delta_external
SELECT user_id, item_id, behavior_type, dt FROM user_behavior_raw;
关键差异:
- 外部表通过
EXTERNAL关键字声明,删除表时仅删除元数据,数据保留在LOCATION路径,可被Hive/Trino等其他引擎复用。
四、Paimon 建表(基于 Flink,主流表类型:主键表/Append-Only表)
Paimon 建表基于 LSM-Tree架构、CDC原生支持,核心适配高频实时更新、整库CDC同步场景。
1. 核心表类型1:主键表(Primary Key Table,实时更新优先)
适用场景:电商库存实时同步(高频更新库存数量)、CDC整库迁移(MySQL→Paimon实时同步)
建表SQL(Flink SQL)
-- 1. 配置Paimon Catalog
CREATE CATALOG paimon_catalog WITH ('type' = 'paimon','warehouse' = '/user/paimon/warehouse', -- Paimon数据仓库路径'hive-metastore-uri' = 'thrift://hive-metastore:9083' -- 对接Hive Metastore
);USE CATALOG paimon_catalog;
USE iparking_dev;-- 2. 创建Paimon主键表(支持部分列更新、主键合并)
CREATE TABLE IF NOT EXISTS inventory_paimon (product_id BIGINT COMMENT '商品ID(主键)',warehouse_id INT COMMENT '仓库ID(主键)',stock_num INT COMMENT '库存数量(支持部分更新)',update_time TIMESTAMP COMMENT '更新时间',dt STRING COMMENT '分区字段'
)
WITH ('primary-key' = 'product_id,warehouse_id', -- 复合主键(唯一标识库存记录)'partition' = 'dt', -- 按日期分区'merge-engine' = 'deduplicate', -- 合并策略:去重(保留update_time最新的记录)'changelog-producer' = 'input' -- 生成变更日志,支持下游流引擎消费
)
COMMENT 'Paimon主键表:电商库存实时同步';-- 3. CDC实时写入(Flink CDC直接同步MySQL库存表)
CREATE TABLE inventory_mysql_cdc (product_id BIGINT,warehouse_id INT,stock_num INT,update_time TIMESTAMP,dt STRING,PRIMARY KEY (product_id, warehouse_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql-host','port' = '3306','username' = 'root','password' = '123456','database-name' = 'iparking_dev','table-name' = 'inventory'
);-- 将CDC数据写入Paimon主键表(自动处理更新/删除)
INSERT INTO inventory_paimon
SELECT product_id, warehouse_id, stock_num, update_time, DATE_FORMAT(update_time, 'yyyyMMdd') AS dt
FROM inventory_mysql_cdc;-- 4. 查询数据(支持实时查询最新库存)
SELECT product_id, warehouse_id, stock_num
FROM inventory_paimon
WHERE dt = '20250101' AND stock_num < 10; -- 查询库存不足10的商品
关键参数说明:
primary-key:复合主键,解决同商品+同仓库的库存更新冲突,Paimon基于LSM-Tree合并日志,更新延迟低至毫秒级。merge-engine = deduplicate:合并策略(支持deduplicate去重、partial-update部分列更新、aggregation聚合),库存场景用去重保留最新值。changelog-producer = input:生成CDC变更日志,下游Flink流任务可直接消费Paimon的更新/删除事件。
2. 核心表类型2:Append-Only表(仅追加,日志场景优先)
适用场景:操作日志存储(如APP埋点日志)、传感器数据流(仅追加无更新)
建表SQL(Flink SQL)
-- 创建Paimon Append-Only表(仅支持追加写入,无更新/删除)
CREATE TABLE IF NOT EXISTS app_log_paimon (log_id BIGINT,user_id BIGINT,log_content STRING,log_time TIMESTAMP,dt STRING
)
WITH ('partition' = 'dt', -- 按日期分区'write-mode' = 'append-only', -- 仅追加模式'file.format' = 'parquet' -- 存储格式(Parquet压缩率高)
)
COMMENT 'Paimon Append-Only表:APP埋点日志存储';-- 写入日志数据(仅支持INSERT,不支持UPDATE/DELETE)
INSERT INTO app_log_paimon
SELECT log_id, user_id, log_content, log_time, DATE_FORMAT(log_time, 'yyyyMMdd') AS dt
FROM app_log_kafka;
关键优势:
- 仅追加模式无合并开销,写入吞吐量比主键表高50%+,适合日志等无更新场景,存储成本低。
五、建表核心差异总结
| 数据湖 | 主流建表引擎 | 核心表类型 | 必配参数 | 适配场景 |
|---|---|---|---|---|
| Hudi | Spark | MOR表/COW表 | 主键(recordkey)、分区(partitionpath) | 实时CDC、近实时更新 |
| Iceberg | Spark/Flink | 基础分区表/隐藏分区表 | 分区字段、Catalog配置 | PB级离线分析、动态分区管理 |
| Delta Lake | Spark | 标准表/外部表 | 事务日志(_delta_log) | 流批一体、数据合规审计 |
| Paimon | Flink | 主键表/Append-Only表 | 主键(primary-key)、合并策略(merge-engine) | 高频更新、CDC整库同步、日志存储 |
建表选型建议:
- 实时更新场景:优先Paimon主键表(LSM-Tree低延迟)或Hudi MOR表(CDC适配成熟)。
- 离线分析场景:优先Iceberg基础表(多引擎兼容、元数据优化)。
- 流批一体场景:优先Delta Lake标准表(Spark生态无缝衔接)。
- 日志存储场景:优先Paimon Append-Only表(高吞吐量、低成本)。
