Linux下GCC和C++实现带多组标签的Snowflake SQL查询批量数据导出程序
设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Linux下GCC的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
C++程序实现,该程序使用多线程和分页查询从Snowflake导出数据到CSV文件,并记录详细日志:
此代码实现了需求中的核心功能,实际部署时可能需要根据具体环境调整:
- ODBC连接字符串参数
- 分页策略(可能需要优化分页方式)
- 数据类型转换(当前按字符串处理所有类型)
- 错误处理策略(增加重试机制等)
- 线程池实现(当前为每个任务创建独立线程)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <nlohmann/json.hpp>
#include <sql.h>
#include <sqlext.h>
using namespace std;
using json = nlohmann::json;
// 配置结构
struct ExportTask {
string label;
string sql;
int batch_size;
};
// 日志类(线程安全)
class Logger {
private:
mutex log_mutex;
ofstream log_file;
string current_time() {
auto now = chrono::system_clock::now();
time_t now_time = chrono::system_clock::to_time_t(now);
tm tm = *localtime(&now_time);
stringstream ss;
ss << put_time(&tm, "%Y-%m-%d %H:%M:%S");
return ss.str();
}
public:
Logger(const string& filename) {
log_file.open(filename, ios::app);
}
~Logger() {
log_file.close();
}
void log(const string& label, const string& status, int rows,
const string& error_msg, long duration_ms) {
lock_guard<mutex> lock(log_mutex);
log_file << current_time() << " | "
<< label << " | "
<< status << " | "
<< rows << " | "
<< duration_ms << "ms | "
<< error_msg << endl;
}
};
// Snowflake导出器
class SnowflakeExporter {
private:
string conn_str;
void handle_odbc_error(SQLHANDLE handle, SQLSMALLINT type) {
SQLCHAR sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH];
SQLINTEGER native_error;
SQLSMALLINT length;
SQLGetDiagRec(type, handle, 1, sqlstate, &native_error,
message, SQL_MAX_MESSAGE_LENGTH, &length);
throw runtime_error(string((char*)message));
}
public:
SnowflakeExporter(const string& conn_str) : conn_str(conn_str) {}
vector<vector<string>> execute_paged_query(const string& base_sql,
int limit, int offset) {
SQLHENV env;
SQLHDBC dbc;
SQLHSTMT stmt;
vector<vector<string>> results;
try {
// 初始化ODBC环境
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION,
(void*)SQL_OV_ODBC3, 0);
// 建立连接
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
SQLCHAR out_str[1024];
SQLSMALLINT out_str_len;
if (SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,
out_str, sizeof(out_str), &out_str_len,
SQL_DRIVER_COMPLETE) != SQL_SUCCESS) {
handle_odbc_error(dbc, SQL_HANDLE_DBC);
}
// 构造分页SQL
string sql = base_sql + " LIMIT " + to_string(limit) +
" OFFSET " + to_string(offset);
// 执行查询
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
if (SQLExecDirect(stmt, (SQLCHAR*)sql.c_str(), SQL_NTS) != SQL_SUCCESS) {
handle_odbc_error(stmt, SQL_HANDLE_STMT);
}
// 获取结果列数
SQLSMALLINT num_cols;
SQLNumResultCols(stmt, &num_cols);
// 绑定列并获取数据
while (SQLFetch(stmt) == SQL_SUCCESS) {
vector<string> row;
for (int i = 1; i <= num_cols; ++i) {
SQLCHAR val[4096];
SQLLEN indicator;
SQLGetData(stmt, i, SQL_C_CHAR, val, sizeof(val), &indicator);
row.emplace_back(indicator == SQL_NULL_DATA ?
"" : string((char*)val));
}
results.push_back(row);
}
// 清理资源
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
SQLDisconnect(dbc);
SQLFreeHandle(SQL_HANDLE_DBC, dbc);
SQLFreeHandle(SQL_HANDLE_ENV, env);
}
catch (...) {
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
SQLDisconnect(dbc);
SQLFreeHandle(SQL_HANDLE_DBC, dbc);
SQLFreeHandle(SQL_HANDLE_ENV, env);
throw;
}
return results;
}
};
// CSV写入器
class CSVWriter {
private:
ofstream file;
mutex file_mutex;
string escape_csv(const string& s) {
if (s.find('"') != string::npos || s.find(',') != string::npos) {
return "\"" + regex_replace(s, regex("\""), "\"\"") + "\"";
}
return s;
}
public:
CSVWriter(const string& filename) {
file.open(filename, ios::out | ios::trunc);
}
~CSVWriter() {
file.close();
}
void write_rows(const vector<vector<string>>& rows) {
lock_guard<mutex> lock(file_mutex);
for (const auto& row : rows) {
string line;
for (size_t i = 0; i < row.size(); ++i) {
line += escape_csv(row[i]);
if (i != row.size() - 1) line += ",";
}
file << line << "\n";
}
}
};
// 导出任务处理函数
void process_export_task(const ExportTask& task,
const SnowflakeExporter& exporter,
CSVWriter& writer,
Logger& logger) {
auto start_time = chrono::high_resolution_clock::now();
int total_rows = 0;
string error_msg;
try {
int offset = 0;
while (true) {
auto data = exporter.execute_paged_query(task.sql,
task.batch_size,
offset);
if (data.empty()) break;
writer.write_rows(data);
total_rows += data.size();
offset += task.batch_size;
}
}
catch (const exception& e) {
error_msg = e.what();
}
auto end_time = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::milliseconds>(
end_time - start_time).count();
logger.log(task.label,
error_msg.empty() ? "SUCCESS" : "FAILED",
total_rows,
error_msg,
duration);
}
int main(int argc, char* argv[]) {
// 加载配置文件
ifstream config_file("config.json");
json config = json::parse(config_file);
// 解析连接字符串
string conn_str = "DRIVER=SnowflakeDSIIDriver;"
"SERVER=" + config["server"].get<string>() + ";"
"DATABASE=" + config["database"].get<string>() + ";"
"SCHEMA=" + config["schema"].get<string>() + ";"
"UID=" + config["user"].get<string>() + ";"
"PWD=" + config["password"].get<string>() + ";";
// 初始化组件
Logger logger("export.log");
SnowflakeExporter exporter(conn_str);
// 创建线程池
vector<thread> threads;
vector<unique_ptr<CSVWriter>> writers;
for (auto& task_json : config["tasks"]) {
ExportTask task{
task_json["label"],
task_json["sql"],
task_json["batch_size"]
};
writers.emplace_back(make_unique<CSVWriter>(task.label + ".csv"));
threads.emplace_back(process_export_task,
task,
ref(exporter),
ref(*writers.back()),
ref(logger));
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
return 0;
}
编译说明:
- 需要安装Snowflake ODBC驱动
- 需要链接ODBC库(-lodbc)
- 需要nlohmann/json库
- 推荐使用C++17或更高标准编译
配置文件示例(config.json):
{
"server": "your_account.snowflakecomputing.com",
"database": "your_db",
"schema": "your_schema",
"user": "your_user",
"password": "your_password",
"tasks": [
{
"label": "orders",
"sql": "SELECT * FROM orders ORDER BY order_date",
"batch_size": 10000
},
{
"label": "customers",
"sql": "SELECT * FROM customers WHERE status = 'ACTIVE'",
"batch_size": 5000
}
]
}
程序特点:
- 多线程处理多个导出任务
- 分页查询处理大数据集
- CSV文件自动覆盖写入
- 线程安全的日志记录
- 详细的错误处理
- CSV特殊字符转义
- 性能指标记录(耗时、行数)
日志格式示例:
2024-03-20 14:30:45 | orders | SUCCESS | 250000 | 4500ms |
2024-03-20 14:31:02 | customers | FAILED | 12000 | 17000ms | Network connection error