当前位置: 首页 > news >正文

Linux下GCC的C++实现Hive到Snowflake数据迁移

程序结构

├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/

核心代码实现 (main.cpp)

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;// 全局锁用于日志和队列同步
mutex log_mutex, queue_mutex;// 配置文件结构
struct Config {string hive_jdbc;string hql_output_dir;string parquet_output_dir;string sql_script_dir;string snowflake_cfg;int export_threads;int import_threads;
};// 日志记录函数
void log_message(const string& message, const string& log_path) {lock_guard<mutex> guard(log_mutex);ofstream log_file(log_path, ios::app);if (log_file) {time_t now = time(nullptr);log_file << "[" << put_time(localtime(&now), "%F %T") << "] " << message << endl;}
}// 解析配置文件
Config load_config(const string& config_path) {ifstream config_file(config_path);if (!config_file) throw runtime_error("Config file not found");json j;config_file >> j;return {j["hive_jdbc"],j["directories"]["hql_output"],j["directories"]["parquet_output"],j["directories"]["sql_scripts"],j["snowflake"]["config_path"],j["threads"]["export"],j["threads"]["import"]};
}// 导出Hive建表语句
void export_hql(const Config& cfg, const string& log_path) {string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";system(cmd.c_str());ifstream db_file("databases.txt");string db;while (getline(db_file, db)) {cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";system(cmd.c_str());ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {fs::path dir = fs::path(cfg.hql_output_dir) / db;fs::create_directories(dir);string hql_path = (dir / (table + ".hql")).string();cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | ""awk 'NR>2' | head -n -1 > " + hql_path;if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);} else {log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);}}}
}// 导出Parquet数据(线程任务)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {while (true) {string task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}size_t pos = task.find('.');string db = task.substr(0, pos);string table = task.substr(pos + 1);fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;fs::create_directories(out_dir);string cmd = "hive -e \"SET hive.exec.compress.output=false; ""INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' ""STORED AS PARQUET SELECT * FROM " + task + ";\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported Parquet for " + task, log_path);} else {log_message("ERROR: Failed to export Parquet for " + task, log_path);}}
}// 多线程导出Parquet
void export_parquet(const Config& cfg, const string& log_path) {ifstream db_file("databases.txt");queue<string> tasks;string db;while (getline(db_file, db)) {ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {tasks.push(db + "." + table);}}vector<thread> threads;for (int i = 0; i < cfg.export_threads; ++i) {threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}// 执行SnowSQL脚本
void run_snowsql(const Config& cfg, const string& log_path) {for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {if (entry.path().extension() == ".sql") {string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);} else {log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);}}}
}// 导入Parquet到Snowflake(线程任务)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {while (true) {fs::path task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}string db = task.parent_path().filename();string table = task.stem();string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \"""COPY INTO " + db + "." + table + " ""FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " ""FILE_FORMAT = (TYPE = PARQUET);\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);} else {log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);}}
}// 多线程导入Parquet
void import_parquet(const Config& cfg, const string& log_path) {queue<fs::path> tasks;for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {tasks.push(table_entry.path());}}vector<thread> threads;for (int i = 0; i < cfg.import_threads; ++i) {threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}int main() {try {// 初始化配置和日志Config cfg = load_config("config.json");string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";fs::create_directories("logs");// 执行全流程export_hql(cfg, log_path);export_parquet(cfg, log_path);run_snowsql(cfg, log_path);import_parquet(cfg, log_path);log_message("ALL OPERATIONS COMPLETED", log_path);} catch (const exception& e) {cerr << "CRITICAL ERROR: " << e.what() << endl;return 1;}return 0;
}

配置文件示例 (config.json)

{"hive_jdbc": "jdbc:hive2://hive-server:10000","directories": {"hql_output": "hive_export","parquet_output": "parquet_data","sql_scripts": "sql_scripts"},"snowflake": {"config_path": "~/.snowsql/config"},"threads": {"export": 8,"import": 8}
}

关键功能说明

  1. HQL导出

    • 使用beeline连接Hive获取所有数据库和表
    • 数据库/表名.hql格式存储建表语句
    • 自动跳过系统表(通过tailawk过滤)
  2. Parquet导出

    • 使用Hive的INSERT OVERWRITE DIRECTORY导出为Parquet格式
    • 多线程处理不同表(线程数由配置控制)
    • 输出路径:parquet_data/数据库/表名/
  3. SnowSQL执行

    • 遍历指定目录的所有.sql文件
    • 使用snowsql -c执行配置文件中的连接
    • 支持认证文件自动加载(需预先配置)
  4. Parquet导入

    • 使用Snowflake的COPY INTO命令
    • 多线程并发导入不同表
    • 自动匹配目录结构与表名
  5. 日志系统

    • 按天分割日志文件(文件名含时间戳)
    • 记录操作类型、状态和时间
    • 线程安全的日志写入
  6. 异常处理

    • 配置文件缺失检测
    • 命令执行状态码检查
    • 目录创建失败处理
    • JSON解析异常捕获

编译与运行

  1. 安装依赖
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
  1. 编译程序
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
  1. 运行程序
./hive2snowflake

注意事项

  1. 需要预先配置:

    • Hive的beeline客户端
    • SnowSQL及认证配置
    • Hive表访问权限
    • Snowflake表结构匹配
  2. 性能调整:

    • 通过config.json调整线程数
    • 大表建议单独处理
    • 可添加重试机制应对网络波动
  3. 安全增强建议:

    • 配置文件加密(如使用jq解密)
    • 敏感信息使用环境变量
    • 添加操作审计日志
http://www.dtcms.com/a/320800.html

相关文章:

  • 在Java中,守护线程(Daemon Thread)和用户线程(User Thread)以及本地线程(Native Thread)的区别
  • 豆包新模型+PromptPilot:AI应用开发全流程实战指南
  • 深入掌握Prompt工程:高效构建与管理智能模型提示词全流程实战
  • Flutter Packge - 组件应用
  • [链表]142. 环形链表 II
  • 【洛谷题单】--分支结构(二)
  • 为什么需要锁升级?从CPU缓存到JVM的优化艺术
  • Autosar AP中Promise和Future的异步消息通信的详细解析
  • Kotlin 数据容器 - MutableList(MutableList 概述、MutableList 增删改查、MutableList 遍历元素)
  • 【JVM】流程汇总
  • OpenSCA开源社区每日安全漏洞及投毒情报资讯—2025年8月7日
  • OCC 主要库和功能模块
  • AI对互联网公司职位改变?
  • Android 系统的基本安全属性
  • 恒科持续低迷:新能源汽车股下跌成拖累,销量担忧加剧
  • ZCC3094--30V,-500mA超低噪声线性稳压电源
  • HFSS许可证常见问题及解决方案
  • 分享超图提供的、很不错的WebGIS学习资源
  • 分布式微服务--GateWay的断言以及如何自定义一个断言
  • 【昇腾】基于RK3588 arm架构Ubuntu22.04系统上适配Atlas 200I A2加速模块安装EP模式下的驱动固件包_20250808
  • simulink tlc如何通过tlc写数据入文件
  • 三种 SSE 对比
  • 秋招笔记-8.8
  • Django模型开发全解析:字段、元数据与继承的实战指南
  • C++简单项目跟练【通讯录管理系统000】
  • 持中文的 TXT 合并 PDF 工具 —— GUI + ReportLab 实战
  • 基于定制开发开源AI智能名片S2B2C商城小程序的定价策略与市场定位研究
  • UniApp Vue3 TypeScript项目中使用xgplayer播放m3u8视频的显示问题
  • AI学习笔记三十五:实时传输视频
  • webrtc弱网-EncodeUsageResource类源码分析及算法原理