当前位置: 首页 > news >正文

深圳西丽网站建设上海网站备案中心

深圳西丽网站建设,上海网站备案中心,哈尔滨短视频制作公司,网站防红怎么做的设计一个基于多个带标签Snowflake SQL语句作为json配置文件的C#代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结…

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的C#代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

步骤1:创建配置文件类

// AppConfig.cs
public class AppConfig
{public SnowflakeConnectionConfig SnowflakeConnection { get; set; }public List<QueryConfig> Queries { get; set; }
}public class SnowflakeConnectionConfig
{public string Account { get; set; }public string User { get; set; }public string Password { get; set; }public string Warehouse { get; set; }public string Database { get; set; }public string Schema { get; set; }public string Role { get; set; }
}public class QueryConfig
{public string Label { get; set; }public string Sql { get; set; }
}

步骤2:实现日志记录器

// Logger.cs
public class Logger
{private readonly string _logPath;private readonly object _lock = new object();public Logger(string logPath){_logPath = logPath;InitializeLogFile();}private void InitializeLogFile(){lock (_lock){File.AppendAllText(_logPath, $"{"Timestamp",-25}|{"Status",-8}|{"Label",-20}|{"StartTime",-20}|{"Duration(s)",-12}|{"Rows",-10}|{"Error"}\n");}}public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration){var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"SUCCESS",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n";WriteLogEntry(entry);}public void LogError(string label, DateTime startTime, string error, TimeSpan duration){var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"ERROR",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n";WriteLogEntry(entry);}private void WriteLogEntry(string entry){lock (_lock){File.AppendAllText(_logPath, entry);}Console.Write(entry);}
}

步骤3:实现数据导出处理器

// ExportProcessor.cs
using CsvHelper;
using Snowflake.Data.Client;
using System.Globalization;public class ExportProcessor
{private readonly string _connectionString;private readonly Logger _logger;public ExportProcessor(string connectionString, Logger logger){_connectionString = connectionString;_logger = logger;}public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default){var startTime = DateTime.UtcNow;var sw = Stopwatch.StartNew();long rowCount = 0;string filePath = Path.Combine(outputDir, $"{query.Label}.csv");try{using (var conn = new SnowflakeDbConnection()){conn.ConnectionString = _connectionString;await conn.OpenAsync(cancellationToken);using (var cmd = conn.CreateCommand()){cmd.CommandText = query.Sql;using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))using (var writer = new StreamWriter(filePath, append: false))using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture)){// Write headerfor (int i = 0; i < reader.FieldCount; i++){csv.WriteField(reader.GetName(i));}await csv.NextRecordAsync();// Write rowswhile (await reader.ReadAsync(cancellationToken)){for (int i = 0; i < reader.FieldCount; i++){csv.WriteField(reader.GetValue(i));}await csv.NextRecordAsync();rowCount++;}}}}sw.Stop();_logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed);}catch (Exception ex){sw.Stop();_logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed);SafeDeleteFile(filePath);throw; // Re-throw if using retry logic}}private void SafeDeleteFile(string path){try { File.Delete(path); }catch { /* Ignore deletion errors */ }}
}

步骤4:主程序实现

// Program.cs
using System.CommandLine;class Program
{static async Task Main(string[] args){var configOption = new Option<FileInfo>(name: "--config",description: "Path to configuration file");var outputOption = new Option<DirectoryInfo>(name: "--output",description: "Output directory for CSV files");var rootCommand = new RootCommand{configOption,outputOption};rootCommand.Description = "Snowflake Data Exporter";rootCommand.SetHandler(async (config, outputDir) => {await RunExport(config, outputDir);}, configOption, outputOption);await rootCommand.InvokeAsync(args);}static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir){// Read configurationvar config = JsonSerializer.Deserialize<AppConfig>(File.ReadAllText(configFile.FullName),new JsonSerializerOptions { PropertyNameCaseInsensitive = true });// Create output directoryDirectory.CreateDirectory(outputDir.FullName);// Initialize loggervar logger = new Logger(Path.Combine(outputDir.FullName, "export.log"));// Build connection stringvar connBuilder = new SnowflakeDbConnectionStringBuilder{Account = config.SnowflakeConnection.Account,User = config.SnowflakeConnection.User,Password = config.SnowflakeConnection.Password,Warehouse = config.SnowflakeConnection.Warehouse,Db = config.SnowflakeConnection.Database,Schema = config.SnowflakeConnection.Schema,Role = config.SnowflakeConnection.Role};// Initialize processorvar processor = new ExportProcessor(connBuilder.ToString(),logger);// Parallel execution with throttlingvar parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };await Parallel.ForEachAsync(config.Queries,parallelOptions,async (query, cancellationToken) =>{await processor.ExportQueryAsync(query,outputDir.FullName,cancellationToken);});}
}

步骤5:配置文件示例(config.json)

{"snowflakeConnection": {"account": "your_account","user": "your_user","password": "your_password","warehouse": "COMPUTE_WH","database": "PROD_DB","schema": "PUBLIC","role": "SYSADMIN"},"queries": [{"label": "customers","sql": "SELECT * FROM CUSTOMERS"},{"label": "orders","sql": "SELECT * FROM ORDERS"}]
}

实现说明

  1. 并行处理

    • 使用Parallel.ForEachAsync进行并行查询处理
    • 默认并行度设置为处理器核心数
    • 每个查询独立使用自己的数据库连接
  2. 大文件处理

    • 使用CsvHelper进行流式写入
    • 采用异步I/O操作(ReadAsync/WriteAsync
    • 逐行处理避免内存爆炸
  3. 错误处理

    • 自动删除不完整文件
    • 详细错误日志记录
    • 异常传播与隔离设计
  4. 日志功能

    • 结构化日志格式
    • 线程安全写入
    • 包含关键性能指标
  5. 性能优化

    • 异步数据库操作
    • 并行查询执行
    • 流式结果集处理

使用说明

  1. 安装依赖:

    dotnet add package Snowflake.Data
    dotnet add package CsvHelper
    dotnet add package System.CommandLine
    
  2. 编译运行:

    dotnet run -- --config ./config.json --output ./exports
    
  3. 输出结构:

    exports/customers.csvorders.csvexport.log
    

日志示例

2023-09-20 14:30:45|SUCCESS |customers           |14:30:30|15.23      |1000000    |- 
2023-09-20 14:31:02|ERROR   |orders             |14:30:45|17.12      |-          |Timeout expired

此实现提供了:

  • 线程安全的并行处理
  • 完整错误处理机制
  • 详细的执行日志
  • 高效的大数据处理能力
  • 可配置的Snowflake连接参数
  • 清晰的命令行界面
http://www.dtcms.com/a/539107.html

相关文章:

  • thinkphp手机网站制作嘉兴网站排名优化公司
  • 杭州经济技术开发区建设局网站wordpress淘宝评论调用插件
  • 网站开发摊销年限api key域名是随便填写嘛
  • 公司企业网站程序手机营销型网站建设公司
  • 锐旗网站建设番禺建设网站服务
  • 好的app设计网站有哪些西部数码网站管理助手ftp
  • 建设企业网站就等于开展网络营销wordpress侧边联系方式
  • 网站开发 所有权服务器做网站FTP必要性大吗
  • 惠州建设工程质量监督站网站国内服务器做彩票网站安全吗
  • 网站建设中国十强wordpress 自定义摘要
  • app 官方网站 案例深圳专业专业网站设计
  • 云服务器建设网站软件阿里云轻量应用服务器wordpress
  • 房产网站建站广东佛山建网站
  • 徐州网站建设网站制作目前网站开发趋势
  • 徐州新沂网站建设php网站后台管理系统源码
  • 做移动网站多少钱网站建设公司中
  • 免费国外网站pro wordpress theme development
  • 淄博网站外包vs做的本地网站
  • 涿州做网站全屏响应式网站模板
  • 重庆网站建设外包哪家好对于高校类建设网站的要求
  • 做网站如何设计数据库互联网营销专业
  • 做老师讲课视频的教育网站专门用来制作网页的软件是什么
  • 微网站建设包括哪些内容宁波建网站选哪家好点
  • 海南省做购房合同网站分析网站建设的论文
  • 合肥网站建站报广告代理捷讯官网 网站建设
  • 网络教育网站建设方案苏宁易购
  • 网站制作策划建设大纲网站 侧边栏
  • 崇州园区营销网站建设广西省建设厅官方网站
  • 网站开发前后端工具组合有没有什么网站专门帮人做问卷
  • 网站管理与维护的优势设计网站的意义