三、Sqoop 全量导入核心命令
作者:IvanCodes
日期:2025年6月3日
专栏:Sqoop教程
Sqoop 的全量导入 (Full Import) 是指将关系型数据库中整个表或特定查询结果集的所有数据一次性完整地导入到 Hadoop 生态系统 (通常是 HDFS 或 Hive) 中。
一、核心命令结构 (sqoop import
)
执行全量导入的基本命令框架如下:
sqoop import \
--connect <jdbc-connect-string> \
--username <db-username> \
--password <db-password> \
# (或使用 --password-file / -P)
# --- 数据源指定 ---
[--table <db-table-name>] \
[--query <sql-query> --target-dir <hdfs-path> -m <num-mappers> [--split-by <column-name>]] \
# --- 导入目标指定 ---
[--target-dir <hdfs-path>] \
[--warehouse-dir <hdfs-base-path-for-hive>] \
[--hive-import --hive-table <hive-db.table_name> [--create-hive-table] [--hive-overwrite]] \
# --- 并行度与分割 ---
[-m <num-mappers>] \
[--split-by <column-name>] \
# --- 文件格式与压缩 ---
[--fields-terminated-by '<char>'] \
[--as-textfile | --as-sequencefile | --as-avrodatafile | --as-parquetfile] \
[--compress --compression-codec <codec>] \
# --- 其他常用选项 ---
[--delete-target-dir] \
[--verbose]
二、关键参数详解 (带示例)
- 连接参数 (Connection Parameters)
--connect <jdbc-connect-string>
: 数据库的JDBC连接URL- 示例 (MySQL):
--connect jdbc:mysql://localhost:3306/mydatabase
--username <db-username>
: 数据库用户名- 示例:
--username retail_user
--password <db-password>
: 数据库密码。生产环境强烈建议使用--password-file /path/to/passwordfile
或-P
(交互式输入密码)。- 示例 (不推荐直接写密码):
--password 123456
- 示例 (推荐):
-P
(执行时会提示输入)
- 示例 (不推荐直接写密码):
- 数据源指定 (Data Source) - 至少提供一种
--table <db-table-name>
: 要导入的数据库表名。- 示例:
--table products
--query "<sql-query>"
: 使用自定义的SQL查询来选择要导入的数据。- 示例:
--query "SELECT id, name, price FROM items WHERE status = 'active' AND \$CONDITIONS"
- 注意:使用
--query
时,必须同时提供--target-dir <hdfs-path>
(如果导入到HDFS) 和-m <num-mappers>
。如果-m > 1
,查询中必须包含\$CONDITIONS
占位符,并通常需要--split-by
。如果-m 1
,则\$CONDITIONS
不是必需的。
- 导入目标 (Import Target) - 至少提供一种
--target-dir <hdfs-path>
: 指定数据导入到HDFS的目录。- 示例:
--target-dir /user/cloudera/imported_data/products_text
--warehouse-dir <hdfs-base-path-for-hive>
: 指定Hive仓库的根目录。当与--hive-import
结合使用时,Sqoop 会在此目录下创建与表名对应的子目录。- 示例:
--warehouse-dir /user/hive/warehouse
--hive-import
: 指示Sqoop将数据导入到Hive表中。--hive-table <hive-db.table_name>
: 指定目标Hive数据库和表名。- 示例:
--hive-table retail_stage.products_from_mysql
--create-hive-table
: (可选) 如果目标Hive表不存在,Sqoop会根据源表结构自动创建它。--hive-overwrite
: (可选) 如果目标Hive表已存在,此选项会覆盖表中的现有数据。
- 并行度与数据分割 (Parallelism & Splitting)
-
-m <num-mappers>
(或--num-mappers <num-mappers>
): 指定用于导入数据的Map任务数量。默认通常是4。 -
--split-by <column-name>
: 指定用于分割数据以供并行Map任务处理的列。该列最好是数值类型、日期类型或具有均匀分布的字符类型,并且有索引。
- 文件格式与压缩 (File Format & Compression)
--fields-terminated-by '<char>'
: 指定字段分隔符。- 示例:
--fields-terminated-by '\t' # Tab分隔
--fields-terminated-by '|' # 线分隔
-
--as-textfile
|--as-sequencefile
|--as-avrodatafile
|--as-parquetfile
: 指定存储格式。 -
--compress --compression-codec <codec>
: 启用压缩并指定压缩编解码器。- 示例:
--compress --compression-codec snappy
- 其他常用选项
--delete-target-dir
: 如果目标HDFS目录已存在,则先删除它再导入。谨慎使用!--verbose
: 输出更详细的执行日志。--direct
: (仅支持部分数据库如MySQL, PostgreSQL) 使用数据库特定的批量加载工具,绕过MapReduce,通常速度更快。
三、全量导入MySQL表数据示例
场景: 假设我们有一个MySQL数据库 testdb
,其中有一个表 products
(id INT PRIMARY KEY, name VARCHAR(255), price DECIMAL(10,2), category VARCHAR(100))。
1. 全量导入MySQL表数据到HDFS
(A) 导入为默认文本文件 (逗号分隔) 到HDFS
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-password 123456 \
--table products \
--target-dir /user/cloudera/products_hdfs_text_default \
-m 2 \
--split-by id
- 说明:数据将以逗号分隔的文本文件形式存储在HDFS的
/user/cloudera/products_hdfs_text_default
目录下。使用2个mapper并行导入,并根据id
列进行数据切分。
(B) 导入为制表符分隔的文本文件到HDFS,并启用Snappy压缩
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--target-dir /user/cloudera/products_hdfs_tsv_snappy \
--fields-terminated-by '\t' \
--compress \
--compression-codec snappy \
-m 4 \
--split-by id
- 说明:数据将以制表符分隔,并使用Snappy压缩。
© 导入为Avro数据文件到HDFS
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--target-dir /user/cloudera/products_hdfs_avro \
--as-avrodatafile \
-m 2 \
--split-by id
- 说明:数据将以Avro格式存储,Sqoop会自动生成相应的Avro schema。
2. 全量导入MySQL表数据到Hive
(A) 导入到Hive,如果表不存在则创建 (默认文本格式)
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--hive-import \
--hive-table default.products_from_mysql_text \
--create-hive-table \
-m 2 \
--split-by id
- 说明:数据将导入到
default
数据库下的products_from_mysql_text
Hive表中。如果表不存在,Sqoop会根据MySQL表结构自动创建。HDFS上的数据文件将位于默认的Hive仓库路径下(通常是/user/hive/warehouse/products_from_mysql_text/
)。
(B) 导入到Hive并覆盖现有表,指定存储为Parquet格式
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--hive-import \
--hive-table default.products_from_mysql_parquet \
--create-hive-table \
--hive-overwrite \
--as-parquetfile \
-m 4 \
--split-by id
- 说明:数据将以Parquet列式存储格式导入到Hive。如果
products_from_mysql_parquet
表已存在,其内容将被覆盖。
© 使用自定义查询导入特定数据到Hive
sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--query "SELECT id, name, category FROM products WHERE price > 100.00 AND \$CONDITIONS" \
--split-by id \
--hive-import \
--hive-table default.expensive_products \
--create-hive-table \
--target-dir /user/cloudera/temp/expensive_products_staging \
# --query时,若hive-import,也建议指定一个临时的--target-dir
-m 2
- 说明:仅导入
products
表中价格大于100的产品的id
,name
, 和category
列。\$CONDITIONS
允许Sqoop并行化这个自定义查询。为--query
导入到Hive时,显式指定--target-dir
作为数据在HDFS上的暂存位置通常是个好习惯,Sqoop随后会将这些数据加载到Hive表中。
四、执行流程与核心点
- 元数据探查:Sqoop连接数据库,获取表结构信息。
- 代码生成:自动生成Java类以映射表记录。
- MapReduce作业:(非
--direct
模式) 将导入任务转化为MapReduce作业。Map任务并行读取数据库数据分片并写入HDFS/Hive。 - 数据一致性:全量导入期间,源表若持续写入新数据,导入结果可能非严格事务快照。建议在业务低峰期操作。
- 数据库压力:高并发导入会增加源数据库负载,需合理设置
-m
。 - 类型映射:关注数据库类型到Hadoop类型的自动映射,特殊类型可能需要手动调整。
练习题与解析
假设环境:
- MySQL数据库
sales_db
,包含表transactions
(trans_id INT PRIMARY KEY, product_sku VARCHAR(50), sale_amount DECIMAL(12,2), trans_date DATE, region_id INT)。 - Hadoop集群已配置。
- MySQL连接信息:
jdbc:mysql://dbserver.mycompany.com:3306/sales_db
,用户sales_importer
,密码通过密码文件/user/sqoop_user/sales.password
获取。
题目:
-
练习题1:HDFS导入 - 指定分隔符和压缩
请编写Sqoop命令,将sales_db.transactions
表的所有数据全量导入到HDFS的/data_lake/raw/sales_transactions
目录下。使用4个Map任务,并以trans_id
列进行数据分割。文件字段间使用逗号,
分隔,并且使用Gzip压缩。 -
练习题2:Hive导入 - 创建新表并指定数据类型
请编写Sqoop命令,将sales_db.transactions
表中region_id
为 5 且sale_amount
大于 500 的所有交易数据全量导入到Hive。目标Hive表为processed_sales.high_value_region5_txns
。如果该Hive表不存在,则自动创建它,并确保sale_amount
在Hive中映射为DOUBLE
类型,trans_date
映射为DATE
类型。如果表已存在,则覆盖其内容。数据在Hive中以Avro格式存储。
解析:
- 练习题1答案与解析:
sqoop import \
--connect jdbc:mysql://dbserver.mycompany.com:3306/sales_db \
--username sales_importer \
--password-file /user/sqoop_user/sales.password \
--table transactions \
--target-dir /data_lake/raw/sales_transactions \
-m 4 \
--split-by trans_id \
--fields-terminated-by ',' \
--compress \
--compression-codec gzip
--password-file /user/sqoop_user/sales.password
: 使用HDFS上的密码文件,这比直接在命令行写密码更安全。--table transactions
: 指定导入源表。--target-dir /data_lake/raw/sales_transactions
: HDFS目标路径。-m 4 --split-by trans_id
: 使用4个mapper并行处理,按trans_id
分割。--fields-terminated-by ','
: 字段以逗号分隔。--compress --compression-codec gzip
: 启用Gzip压缩。
- 练习题2答案与解析:
sqoop import \
--connect jdbc:mysql://dbserver.mycompany.com:3306/sales_db \
--username sales_importer \
--password-file /user/sqoop_user/sales.password \
--query "SELECT trans_id, product_sku, sale_amount, trans_date, region_id FROM transactions WHERE region_id = 5 AND sale_amount > 500 AND \$CONDITIONS" \
--split-by trans_id \
--map-column-hive sale_amount=DOUBLE,trans_date=DATE \
--hive-import \
--hive-table processed_sales.high_value_region5_txns \
--create-hive-table \
--hive-overwrite \
--as-avrodatafile \
--target-dir /user/hive/warehouse/processed_sales.db/high_value_region5_txns_temp \
-m 2
--query "..."
: 使用自定义查询筛选数据,并包含\$CONDITIONS
以支持并行。--split-by trans_id
: 指定查询结果集如何分割给mappers。--map-column-hive sale_amount=DOUBLE,trans_date=DATE
: 关键参数,用于在创建Hive表时,将源表中的sale_amount
列映射为Hive的DOUBLE
类型,trans_date
列映射为Hive的DATE
类型。--hive-import --hive-table ... --create-hive-table --hive-overwrite
: 导入到Hive,不存在则创建,存在则覆盖。--as-avrodatafile
: 指定数据以Avro格式存储。--target-dir ..._temp
: 为--query
导入到Hive指定一个临时的HDFS暂存目录。-m 2
: 使用2个mapper。对于自定义查询,mapper数量需要根据数据量和--split-by
列的特性来合理设置。