当前位置: 首页 > news >正文

C# 实现列式存储数据

C#实现列式存储数据指南

一、列式存储概述

列式存储(Columnar Storage)是一种数据存储方式,它将数据按列而非行组织。与传统的行式存储相比,列式存储在以下场景具有优势:

  1. ​分析型查询​​:聚合计算、分组统计等操作效率更高
  2. ​压缩率更高​​:相同数据列式存储通常占用更少空间
  3. ​I/O效率更高​​:只需读取相关列的数据

二、C#实现列式存储方案

方案1:使用数组实现简单列式存储

public class ColumnarStorage<T>
{private readonly List<List<T>> _columns = new List<List<T>>();private readonly Dictionary<string, int> _columnIndexes = new Dictionary<string, int>();public void AddColumn(string columnName, IEnumerable<T> values){if (_columnIndexes.ContainsKey(columnName))throw new ArgumentException($"Column '{columnName}' already exists");var column = values.ToList();_columns.Add(column);_columnIndexes[columnName] = _columns.Count - 1;}public void AddRow(IEnumerable<T> values){if (values.Count() != _columns.Count)throw new ArgumentException("Number of values doesn't match number of columns");for (int i = 0; i < values.Count(); i++){_columns[i].Add(values.ElementAt(i));}}public IEnumerable<T> GetColumn(string columnName){if (!_columnIndexes.TryGetValue(columnName, out int index))throw new ArgumentException($"Column '{columnName}' not found");return _columns[index];}public IEnumerable<T> GetRow(int rowIndex){return _columns.Select(c => c[rowIndex]);}// 其他方法如查询、聚合等...
}

​使用示例​​:

var storage = new ColumnarStorage<int>();
storage.AddColumn("ID", new List<int> { 1, 2, 3 });
storage.AddColumn("Value", new List<int> { 10, 20, 30 });// 添加行(不推荐,因为列式存储通常先定义列再填充数据)
// storage.AddRow(new List<int> { 4, 40 });// 查询
var values = storage.GetColumn("Value"); // 返回 [10, 20, 30]

方案2:使用专业列式存储库 - Parquet.NET

​安装NuGet包​​:

dotnet add package Parquet.Net

​实现示例​​:

using Parquet;
using Parquet.Data;// 创建数据字段
var fields = new[]
{new DataField<int>("id"),new DataField<string>("name"),new DataField<double>("value")
};// 创建数据集
var dataSet = new DataSet(fields)
{new Row(1, "Item1", 10.5),new Row(2, "Item2", 20.3),new Row(3, "Item3", 30.7)
};// 写入Parquet文件
using (var stream = File.Create("data.parquet"))
{ParquetWriter.WriteFile(dataSet, stream);
}// 读取Parquet文件
using (var stream = File.OpenRead("data.parquet"))
{var reader = new ParquetReader(stream);var ds = reader.ReadDataSet();foreach (var row in ds[0]){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}

方案3:使用内存列式存储 - Apache Arrow

​安装NuGet包​​:

dotnet add package Apache.Arrow

​实现示例​​:

using Apache.Arrow;
using Apache.Arrow.Arrays;// 创建列
var idColumn = new Int32Array.Builder().Append(1).Append(2).Append(3).Build();var nameColumn = new BinaryArray.Builder(BinaryArrayBuilderOptions.Default).Append("Item1").Append("Item2").Append("Item3").Build();var valueColumn = new DoubleArray.Builder().Append(10.5).Append(20.3).Append(30.7).Build();// 创建表
var schema = new Schema(new[]
{new Field("id", new Int32Type()),new Field("name", new BinaryType()),new Field("value", new DoubleType())
});var table = new Table(schema, new[] { idColumn, nameColumn, valueColumn });// 序列化为Arrow格式
using (var stream = File.Create("data.arrow"))
{var writer = new ArrowStreamWriter(table, new MessageSerializer(), stream);writer.WriteTable(table);writer.Dispose();
}// 从Arrow文件读取
using (var stream = File.OpenRead("data.arrow"))
{var reader = new ArrowStreamReader(stream);var table = reader.ReadNextTable();foreach (var row in table.ToEnumerable()){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}

三、列式存储优化技巧

1. 数据压缩

​使用Parquet的压缩选项​​:

var writerProperties = new WriterProperties(CompressionCodec.Snappy // 使用Snappy压缩
);using (var stream = File.Create("compressed.parquet"))
{ParquetWriter.WriteFile(dataSet, stream, writerProperties);
}

2. 列式查询优化

​实现列式索引​​:

public class ColumnIndex<T>
{private readonly Dictionary<T, List<int>> _index = new Dictionary<T, List<int>>();public void Build(IEnumerable<T> column){int rowIndex = 0;foreach (var value in column){if (!_index.ContainsKey(value))_index[value] = new List<int>();_index[value].Add(rowIndex++);}}public IEnumerable<int> Lookup(T value){return _index.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>();}
}

3. 内存映射文件

​使用MemoryMappedFile处理大文件​​:

using var mmf = MemoryMappedFile.CreateFromFile("large.parquet");
using var accessor = mmf.CreateViewAccessor();// 直接访问文件中的特定列数据

四、完整示例:实现简单的列式数据库

public class ColumnarDatabase
{private readonly Dictionary<string, List<object>> _columns = new();private readonly Dictionary<string, Type> _columnTypes = new();public void CreateTable(IEnumerable<string> columnNames, IEnumerable<Type> columnTypes){if (columnNames.Count() != columnTypes.Count())throw new ArgumentException("Column names and types count mismatch");for (int i = 0; i < columnNames.Count(); i++){_columns[columnNames.ElementAt(i)] = new List<object>();_columnTypes[columnNames.ElementAt(i)] = columnTypes.ElementAt(i);}}public void InsertRow(IEnumerable<object> values){if (values.Count() != _columns.Count)throw new ArgumentException("Values count doesn't match columns count");var enumerators = values.GetEnumerator();foreach (var column in _columns.Values){enumerators.MoveNext();column.Add(enumerators.Current);}}public IEnumerable<object> GetColumn(string columnName){return _columns[columnName];}public IEnumerable<object[]> GetRowsWhere(string columnName, object value){var columnIndex = _columns.Keys.ToList().IndexOf(columnName);if (columnIndex == -1) yield break;var column = _columns.Values.ElementAt(columnIndex);var indexes = new ColumnIndex<object>().Build(column).Where(kvp => kvp.Key.Equals(value)).SelectMany(kvp => kvp.Value);for (int i = 0; i < column.Count; i++){if (indexes.Contains(i)){yield return _columns.Values.Select(c => c[i]).ToArray();}}}// 更多方法...
}

五、性能对比测试

​测试代码​​:

var columnar = new ColumnarStorage<int>();
var rowBased = new List<List<int>>();// 填充100万行数据
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1_000_000; i++)
{columnar.AddColumn("ID", new List<int> { i });columnar.AddColumn("Value", new List<int> { i * 2 });// 行式存储(仅演示,实际不推荐这样使用)if (i == 0) rowBased.Add(new List<int>());rowBased[0].Add(i);rowBased[0].Add(i * 2);
}
sw.Stop();
Console.WriteLine($"填充时间: {sw.ElapsedMilliseconds}ms");// 查询测试
sw.Restart();
var columnarValues = columnar.GetColumn("Value");
sw.Stop();
Console.WriteLine($"列式查询时间: {sw.ElapsedMilliseconds}ms");sw.Restart();
var rowBasedValues = rowBased.SelectMany(r => r).Where((v, i) => i % 2 == 1); // 模拟查询第二列
sw.Stop();
Console.WriteLine($"行式查询时间: {sw.ElapsedMilliseconds}ms");

​预期结果​​:

  • 列式存储在查询特定列时性能更好
  • 行式存储在插入时可能更快(但实际列式存储优化后插入也很快)

六、实际应用场景

  1. ​数据分析平台​​:

    • 处理TB级数据
    • 高效聚合计算
    • 列式压缩节省存储空间
  2. ​商业智能工具​​:

    • 快速生成报表
    • 复杂查询优化
    • 多维分析支持
  3. ​时序数据处理​​:

    • 高效存储时间序列数据
    • 快速聚合计算
    • 支持降采样查询

七、扩展建议

  1. ​实现列式压缩​​:

    public byte[] CompressColumn<T>(List<T> column)
    {using var ms = new MemoryStream();using (var writer = new BinaryWriter(ms)){foreach (var item in column){// 实现自定义压缩逻辑writer.Write(item.ToString());}}return ms.ToArray();
    }
  2. ​添加索引支持​​:

    public class ColumnIndex
    {private readonly Dictionary<object, List<int>> _valueToRows = new();public void Build<T>(List<T> column){for (int i = 0; i < column.Count; i++){if (!_valueToRows.ContainsKey(column[i]))_valueToRows[column[i]] = new List<int>();_valueToRows[column[i]].Add(i);}}public IEnumerable<int> GetRows(T value) => _valueToRows.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>();
    }
  3. ​支持列式存储格式​​:

    • Parquet
    • ORC
    • Apache Arrow
    • 自定义二进制格式

相关文章:

  • 如何正确使用日程表
  • Docker搜索镜像报错
  • 字符串模式匹配之KMP算法的理解和应用
  • ​​智能制造中的预测性维护:基于深度学习的设备故障预测​​
  • day006-实战练习题-参考答案
  • spring中的@Configuration注解详解
  • 操作系统学习
  • 小米MiMo推理大模型开源:7B参数规模超越更大规模模型
  • 电子制造业智能化转型:APS高级排程软件如何破局效率革命
  • x-cmd install | Tewi - 终端里的 Transmission 掌控者,功能全面的 BT 下载管理工具!
  • VSCode Auto Rename Tag插件不生效
  • 一套SaaS ERP管理系统源码,支持项目二开商用,SpringBoot+Vue+ElementUI+UniAPP
  • MicroPython for esp32s3开发HX711称重模块指南
  • 管家婆易指开单如何设置零售开单
  • Git从入门到精通-第二章-工具配置
  • 在TensorFlow中,`Dense`和`Activation`是深度学习模型构建里常用的层
  • Socket-UDP
  • [Unity]设置自动打包脚本
  • [Survey] Image Segmentation in Foundation Model Era: A Survey
  • VBA代码解决方案第二十四讲:EXCEL中,如何删除重复数据行
  • 马上评|启动最高层级医政调查,维护医学一方净土
  • 来论|受美国“保护”,日本民众要付出什么代价?
  • 澎湃读报丨解放日报9个版聚焦:上海,加快建成具有全球影响力的科技创新高地
  • 农行一季度净利润719亿元增2.2%,不良率微降至1.28%
  • 人社部:将制定提前领取个人养老金相关办法
  • 海尔·2025青岛马拉松两选手被终身禁赛:违规转让号码、穿戴他人号码