当前位置: 首页 > news >正文

Linux下GCC和C++实现带多组标签的Snowflake SQL查询批量数据导出程序

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Linux下GCC的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

C++程序实现,该程序使用多线程和分页查询从Snowflake导出数据到CSV文件,并记录详细日志:

此代码实现了需求中的核心功能,实际部署时可能需要根据具体环境调整:

  1. ODBC连接字符串参数
  2. 分页策略(可能需要优化分页方式)
  3. 数据类型转换(当前按字符串处理所有类型)
  4. 错误处理策略(增加重试机制等)
  5. 线程池实现(当前为每个任务创建独立线程)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <nlohmann/json.hpp>
#include <sql.h>
#include <sqlext.h>

using namespace std;
using json = nlohmann::json;

// 配置结构
struct ExportTask {
    string label;
    string sql;
    int batch_size;
};

// 日志类(线程安全)
class Logger {
private:
    mutex log_mutex;
    ofstream log_file;

    string current_time() {
        auto now = chrono::system_clock::now();
        time_t now_time = chrono::system_clock::to_time_t(now);
        tm tm = *localtime(&now_time);
        stringstream ss;
        ss << put_time(&tm, "%Y-%m-%d %H:%M:%S");
        return ss.str();
    }

public:
    Logger(const string& filename) {
        log_file.open(filename, ios::app);
    }

    ~Logger() {
        log_file.close();
    }

    void log(const string& label, const string& status, int rows, 
            const string& error_msg, long duration_ms) {
        lock_guard<mutex> lock(log_mutex);
        log_file << current_time() << " | "
                << label << " | "
                << status << " | "
                << rows << " | "
                << duration_ms << "ms | "
                << error_msg << endl;
    }
};

// Snowflake导出器
class SnowflakeExporter {
private:
    string conn_str;

    void handle_odbc_error(SQLHANDLE handle, SQLSMALLINT type) {
        SQLCHAR sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH];
        SQLINTEGER native_error;
        SQLSMALLINT length;
        SQLGetDiagRec(type, handle, 1, sqlstate, &native_error, 
                     message, SQL_MAX_MESSAGE_LENGTH, &length);
        throw runtime_error(string((char*)message));
    }

public:
    SnowflakeExporter(const string& conn_str) : conn_str(conn_str) {}

    vector<vector<string>> execute_paged_query(const string& base_sql, 
                                              int limit, int offset) {
        SQLHENV env;
        SQLHDBC dbc;
        SQLHSTMT stmt;
        vector<vector<string>> results;

        try {
            // 初始化ODBC环境
            SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
            SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, 
                         (void*)SQL_OV_ODBC3, 0);
            
            // 建立连接
            SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
            SQLCHAR out_str[1024];
            SQLSMALLINT out_str_len;
            if (SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,
                                out_str, sizeof(out_str), &out_str_len,
                                SQL_DRIVER_COMPLETE) != SQL_SUCCESS) {
                handle_odbc_error(dbc, SQL_HANDLE_DBC);
            }

            // 构造分页SQL
            string sql = base_sql + " LIMIT " + to_string(limit) + 
                        " OFFSET " + to_string(offset);

            // 执行查询
            SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
            if (SQLExecDirect(stmt, (SQLCHAR*)sql.c_str(), SQL_NTS) != SQL_SUCCESS) {
                handle_odbc_error(stmt, SQL_HANDLE_STMT);
            }

            // 获取结果列数
            SQLSMALLINT num_cols;
            SQLNumResultCols(stmt, &num_cols);

            // 绑定列并获取数据
            while (SQLFetch(stmt) == SQL_SUCCESS) {
                vector<string> row;
                for (int i = 1; i <= num_cols; ++i) {
                    SQLCHAR val[4096];
                    SQLLEN indicator;
                    SQLGetData(stmt, i, SQL_C_CHAR, val, sizeof(val), &indicator);
                    row.emplace_back(indicator == SQL_NULL_DATA ? 
                                   "" : string((char*)val));
                }
                results.push_back(row);
            }

            // 清理资源
            SQLFreeHandle(SQL_HANDLE_STMT, stmt);
            SQLDisconnect(dbc);
            SQLFreeHandle(SQL_HANDLE_DBC, dbc);
            SQLFreeHandle(SQL_HANDLE_ENV, env);
        }
        catch (...) {
            SQLFreeHandle(SQL_HANDLE_STMT, stmt);
            SQLDisconnect(dbc);
            SQLFreeHandle(SQL_HANDLE_DBC, dbc);
            SQLFreeHandle(SQL_HANDLE_ENV, env);
            throw;
        }

        return results;
    }
};

// CSV写入器
class CSVWriter {
private:
    ofstream file;
    mutex file_mutex;

    string escape_csv(const string& s) {
        if (s.find('"') != string::npos || s.find(',') != string::npos) {
            return "\"" + regex_replace(s, regex("\""), "\"\"") + "\"";
        }
        return s;
    }

public:
    CSVWriter(const string& filename) {
        file.open(filename, ios::out | ios::trunc);
    }

    ~CSVWriter() {
        file.close();
    }

    void write_rows(const vector<vector<string>>& rows) {
        lock_guard<mutex> lock(file_mutex);
        for (const auto& row : rows) {
            string line;
            for (size_t i = 0; i < row.size(); ++i) {
                line += escape_csv(row[i]);
                if (i != row.size() - 1) line += ",";
            }
            file << line << "\n";
        }
    }
};

// 导出任务处理函数
void process_export_task(const ExportTask& task, 
                        const SnowflakeExporter& exporter,
                        CSVWriter& writer,
                        Logger& logger) {
    auto start_time = chrono::high_resolution_clock::now();
    int total_rows = 0;
    string error_msg;

    try {
        int offset = 0;
        while (true) {
            auto data = exporter.execute_paged_query(task.sql, 
                                                    task.batch_size, 
                                                    offset);
            if (data.empty()) break;

            writer.write_rows(data);
            total_rows += data.size();
            offset += task.batch_size;
        }
    }
    catch (const exception& e) {
        error_msg = e.what();
    }

    auto end_time = chrono::high_resolution_clock::now();
    auto duration = chrono::duration_cast<chrono::milliseconds>(
        end_time - start_time).count();

    logger.log(task.label, 
              error_msg.empty() ? "SUCCESS" : "FAILED",
              total_rows,
              error_msg,
              duration);
}

int main(int argc, char* argv[]) {
    // 加载配置文件
    ifstream config_file("config.json");
    json config = json::parse(config_file);

    // 解析连接字符串
    string conn_str = "DRIVER=SnowflakeDSIIDriver;"
                     "SERVER=" + config["server"].get<string>() + ";"
                     "DATABASE=" + config["database"].get<string>() + ";"
                     "SCHEMA=" + config["schema"].get<string>() + ";"
                     "UID=" + config["user"].get<string>() + ";"
                     "PWD=" + config["password"].get<string>() + ";";

    // 初始化组件
    Logger logger("export.log");
    SnowflakeExporter exporter(conn_str);

    // 创建线程池
    vector<thread> threads;
    vector<unique_ptr<CSVWriter>> writers;

    for (auto& task_json : config["tasks"]) {
        ExportTask task{
            task_json["label"],
            task_json["sql"],
            task_json["batch_size"]
        };

        writers.emplace_back(make_unique<CSVWriter>(task.label + ".csv"));
        threads.emplace_back(process_export_task, 
                            task, 
                            ref(exporter),
                            ref(*writers.back()), 
                            ref(logger));
    }

    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

编译说明:

  1. 需要安装Snowflake ODBC驱动
  2. 需要链接ODBC库(-lodbc)
  3. 需要nlohmann/json库
  4. 推荐使用C++17或更高标准编译

配置文件示例(config.json):

{
    "server": "your_account.snowflakecomputing.com",
    "database": "your_db",
    "schema": "your_schema",
    "user": "your_user",
    "password": "your_password",
    "tasks": [
        {
            "label": "orders",
            "sql": "SELECT * FROM orders ORDER BY order_date",
            "batch_size": 10000
        },
        {
            "label": "customers",
            "sql": "SELECT * FROM customers WHERE status = 'ACTIVE'",
            "batch_size": 5000
        }
    ]
}

程序特点:

  1. 多线程处理多个导出任务
  2. 分页查询处理大数据集
  3. CSV文件自动覆盖写入
  4. 线程安全的日志记录
  5. 详细的错误处理
  6. CSV特殊字符转义
  7. 性能指标记录(耗时、行数)

日志格式示例:

2024-03-20 14:30:45 | orders | SUCCESS | 250000 | 4500ms | 
2024-03-20 14:31:02 | customers | FAILED | 12000 | 17000ms | Network connection error

相关文章:

  • Linux常用命令速查手册
  • 一键爬取b站视频
  • 「JavaScript深入」理解 JavaScript 中的不可变对象(Immutable Object)
  • Android 高版本 DownloadManager 封装工具类,支持 APK 断点续传与自动安装
  • 玩转python:通俗易懂掌握高级数据结构-collections模块之Counter
  • 利用委托用户控件、窗体之间传值 c#
  • 响应式编程-基于Reactor模式WebFlux框架的Spring Gateway
  • 生成省市区JSON
  • http 405 Not Allowed
  • 2018年全国职业院校技能大赛-高职组计算机网络应用竞赛竞赛样题A卷
  • 一文讲通锁标记对象std::adopt_lock盲点
  • OpenAI与谷歌DeepMind新品同日竞技,谁能引领机器人现实任务新潮流?
  • C#-委托delegate
  • C++设计模式-观察者模式:从基本介绍,内部原理、应用场景、使用方法,常见问题和解决方案进行深度解析
  • 网络视频监控平台在医疗领域的应用
  • 浏览器中输入 URL 到显示主页的完整过程
  • 【后端】【django】Django 自带的用户系统与 RBAC 机制
  • 历次科技泡沫对人工智能发展的启示与规避措施
  • containerd 拉取镜像的工具以及优劣
  • Python----计算机视觉处理(opencv:图片灰度化)
  • 郎朗也来了,在辰山植物园“轻松听古典”
  • 宜昌全域高质量发展:机制创新与产业重构的双向突围
  • 巴防空系统击落印度无人机,印称巴方违反停火协议
  • 梅花奖在上海|朱洁静:穿越了人生暴风雨,舞台是最好良药
  • 印度证实印巴已同意停火
  • 巴基斯坦关闭全部领空