C# 实现列式存储数据
C#实现列式存储数据指南
一、列式存储概述
列式存储(Columnar Storage)是一种数据存储方式,它将数据按列而非行组织。与传统的行式存储相比,列式存储在以下场景具有优势:
- 分析型查询:聚合计算、分组统计等操作效率更高
- 压缩率更高:相同数据列式存储通常占用更少空间
- I/O效率更高:只需读取相关列的数据
二、C#实现列式存储方案
方案1:使用数组实现简单列式存储
public class ColumnarStorage<T>
{private readonly List<List<T>> _columns = new List<List<T>>();private readonly Dictionary<string, int> _columnIndexes = new Dictionary<string, int>();public void AddColumn(string columnName, IEnumerable<T> values){if (_columnIndexes.ContainsKey(columnName))throw new ArgumentException($"Column '{columnName}' already exists");var column = values.ToList();_columns.Add(column);_columnIndexes[columnName] = _columns.Count - 1;}public void AddRow(IEnumerable<T> values){if (values.Count() != _columns.Count)throw new ArgumentException("Number of values doesn't match number of columns");for (int i = 0; i < values.Count(); i++){_columns[i].Add(values.ElementAt(i));}}public IEnumerable<T> GetColumn(string columnName){if (!_columnIndexes.TryGetValue(columnName, out int index))throw new ArgumentException($"Column '{columnName}' not found");return _columns[index];}public IEnumerable<T> GetRow(int rowIndex){return _columns.Select(c => c[rowIndex]);}// 其他方法如查询、聚合等...
}
使用示例:
var storage = new ColumnarStorage<int>();
storage.AddColumn("ID", new List<int> { 1, 2, 3 });
storage.AddColumn("Value", new List<int> { 10, 20, 30 });// 添加行(不推荐,因为列式存储通常先定义列再填充数据)
// storage.AddRow(new List<int> { 4, 40 });// 查询
var values = storage.GetColumn("Value"); // 返回 [10, 20, 30]
方案2:使用专业列式存储库 - Parquet.NET
安装NuGet包:
dotnet add package Parquet.Net
实现示例:
using Parquet;
using Parquet.Data;// 创建数据字段
var fields = new[]
{new DataField<int>("id"),new DataField<string>("name"),new DataField<double>("value")
};// 创建数据集
var dataSet = new DataSet(fields)
{new Row(1, "Item1", 10.5),new Row(2, "Item2", 20.3),new Row(3, "Item3", 30.7)
};// 写入Parquet文件
using (var stream = File.Create("data.parquet"))
{ParquetWriter.WriteFile(dataSet, stream);
}// 读取Parquet文件
using (var stream = File.OpenRead("data.parquet"))
{var reader = new ParquetReader(stream);var ds = reader.ReadDataSet();foreach (var row in ds[0]){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}
方案3:使用内存列式存储 - Apache Arrow
安装NuGet包:
dotnet add package Apache.Arrow
实现示例:
using Apache.Arrow;
using Apache.Arrow.Arrays;// 创建列
var idColumn = new Int32Array.Builder().Append(1).Append(2).Append(3).Build();var nameColumn = new BinaryArray.Builder(BinaryArrayBuilderOptions.Default).Append("Item1").Append("Item2").Append("Item3").Build();var valueColumn = new DoubleArray.Builder().Append(10.5).Append(20.3).Append(30.7).Build();// 创建表
var schema = new Schema(new[]
{new Field("id", new Int32Type()),new Field("name", new BinaryType()),new Field("value", new DoubleType())
});var table = new Table(schema, new[] { idColumn, nameColumn, valueColumn });// 序列化为Arrow格式
using (var stream = File.Create("data.arrow"))
{var writer = new ArrowStreamWriter(table, new MessageSerializer(), stream);writer.WriteTable(table);writer.Dispose();
}// 从Arrow文件读取
using (var stream = File.OpenRead("data.arrow"))
{var reader = new ArrowStreamReader(stream);var table = reader.ReadNextTable();foreach (var row in table.ToEnumerable()){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}
三、列式存储优化技巧
1. 数据压缩
使用Parquet的压缩选项:
var writerProperties = new WriterProperties(CompressionCodec.Snappy // 使用Snappy压缩
);using (var stream = File.Create("compressed.parquet"))
{ParquetWriter.WriteFile(dataSet, stream, writerProperties);
}
2. 列式查询优化
实现列式索引:
public class ColumnIndex<T>
{private readonly Dictionary<T, List<int>> _index = new Dictionary<T, List<int>>();public void Build(IEnumerable<T> column){int rowIndex = 0;foreach (var value in column){if (!_index.ContainsKey(value))_index[value] = new List<int>();_index[value].Add(rowIndex++);}}public IEnumerable<int> Lookup(T value){return _index.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>();}
}
3. 内存映射文件
使用MemoryMappedFile处理大文件:
using var mmf = MemoryMappedFile.CreateFromFile("large.parquet");
using var accessor = mmf.CreateViewAccessor();// 直接访问文件中的特定列数据
四、完整示例:实现简单的列式数据库
public class ColumnarDatabase
{private readonly Dictionary<string, List<object>> _columns = new();private readonly Dictionary<string, Type> _columnTypes = new();public void CreateTable(IEnumerable<string> columnNames, IEnumerable<Type> columnTypes){if (columnNames.Count() != columnTypes.Count())throw new ArgumentException("Column names and types count mismatch");for (int i = 0; i < columnNames.Count(); i++){_columns[columnNames.ElementAt(i)] = new List<object>();_columnTypes[columnNames.ElementAt(i)] = columnTypes.ElementAt(i);}}public void InsertRow(IEnumerable<object> values){if (values.Count() != _columns.Count)throw new ArgumentException("Values count doesn't match columns count");var enumerators = values.GetEnumerator();foreach (var column in _columns.Values){enumerators.MoveNext();column.Add(enumerators.Current);}}public IEnumerable<object> GetColumn(string columnName){return _columns[columnName];}public IEnumerable<object[]> GetRowsWhere(string columnName, object value){var columnIndex = _columns.Keys.ToList().IndexOf(columnName);if (columnIndex == -1) yield break;var column = _columns.Values.ElementAt(columnIndex);var indexes = new ColumnIndex<object>().Build(column).Where(kvp => kvp.Key.Equals(value)).SelectMany(kvp => kvp.Value);for (int i = 0; i < column.Count; i++){if (indexes.Contains(i)){yield return _columns.Values.Select(c => c[i]).ToArray();}}}// 更多方法...
}
五、性能对比测试
测试代码:
var columnar = new ColumnarStorage<int>();
var rowBased = new List<List<int>>();// 填充100万行数据
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1_000_000; i++)
{columnar.AddColumn("ID", new List<int> { i });columnar.AddColumn("Value", new List<int> { i * 2 });// 行式存储(仅演示,实际不推荐这样使用)if (i == 0) rowBased.Add(new List<int>());rowBased[0].Add(i);rowBased[0].Add(i * 2);
}
sw.Stop();
Console.WriteLine($"填充时间: {sw.ElapsedMilliseconds}ms");// 查询测试
sw.Restart();
var columnarValues = columnar.GetColumn("Value");
sw.Stop();
Console.WriteLine($"列式查询时间: {sw.ElapsedMilliseconds}ms");sw.Restart();
var rowBasedValues = rowBased.SelectMany(r => r).Where((v, i) => i % 2 == 1); // 模拟查询第二列
sw.Stop();
Console.WriteLine($"行式查询时间: {sw.ElapsedMilliseconds}ms");
预期结果:
- 列式存储在查询特定列时性能更好
- 行式存储在插入时可能更快(但实际列式存储优化后插入也很快)
六、实际应用场景
-
数据分析平台:
- 处理TB级数据
- 高效聚合计算
- 列式压缩节省存储空间
-
商业智能工具:
- 快速生成报表
- 复杂查询优化
- 多维分析支持
-
时序数据处理:
- 高效存储时间序列数据
- 快速聚合计算
- 支持降采样查询
七、扩展建议
-
实现列式压缩:
public byte[] CompressColumn<T>(List<T> column) {using var ms = new MemoryStream();using (var writer = new BinaryWriter(ms)){foreach (var item in column){// 实现自定义压缩逻辑writer.Write(item.ToString());}}return ms.ToArray(); }
-
添加索引支持:
public class ColumnIndex {private readonly Dictionary<object, List<int>> _valueToRows = new();public void Build<T>(List<T> column){for (int i = 0; i < column.Count; i++){if (!_valueToRows.ContainsKey(column[i]))_valueToRows[column[i]] = new List<int>();_valueToRows[column[i]].Add(i);}}public IEnumerable<int> GetRows(T value) => _valueToRows.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>(); }
-
支持列式存储格式:
- Parquet
- ORC
- Apache Arrow
- 自定义二进制格式