Abp+ShardingCore+EFCore.BulkExtensions使用案例
工程模板:Abp Vnext
分表组件:Sharding Core
批量插入组件:EFCore.BulkExtensions
此组件对MSSQL(SQL Server)数据库的批量操作使用MERGE语句,非SqlBulkCopy,SqlBulkCopy处理临时表时/单表插入使用;
MERGE语句用于根据源表和目标表的匹配情况,执行插入、更新和删除操作。
优点:
(1)灵活性: MERGE可以同时处理插入和更新,适合需要根据条件合并数据的场景。
(2)简化代码: 一个语句可以处理多种操作,减少了多条 SQL 语句的需求。
缺点:
(1)性能较低: 对于大批量数据,MERGE 的性能通常不如 SqlBulkCopy,因为它需要执行匹配检查,并且在处理每一行时都会增加额外开销。
(2)复杂性: 语法相对复杂,理解和维护可能比较困难。
目的:向分表或者普通表中批量插入数据,提高写入效率
1、由于EFCore.BulkExtensions批量插入主从表关联数据有缺陷,因此对源码进行了调整;
2、在SPC.XXX.EntityFrameworkCore 工程中引入自定义打包文件EFCore.BulkExtensions 包或者直接引入源码;
3、自定义文件目录
BulkExtensionsEfCoreBulkOperationProvider.cs 文件代码
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Extensions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;namespace SPC.PlantDataService.EntityFrameworkCore
{public class BulkExtensionsEfCoreBulkOperationProvider : IEfCoreBulkOperationProvider{private readonly ILogger<BulkExtensionsEfCoreBulkOperationProvider> _logger;public BulkExtensionsEfCoreBulkOperationProvider(ILogger<BulkExtensionsEfCoreBulkOperationProvider> logger){_logger = logger;}public async Task DeleteManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkDeleteAsync(dataSourceMap.Value.ToList(), cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}public async Task InsertManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkInsertAsync(dataSourceMap.Value.ToList(), options =>{options.PreserveInsertOrder = false;options.IncludeGraph = IsIncludeGraph(dataSourceMap.Key, dataSourceMap.Value.ToList());options.BatchSize = 10000;}, cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}public async Task UpdateManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkUpdateAsync(dataSourceMap.Value.ToList(), options =>{options.PreserveInsertOrder = false;options.IncludeGraph = IsIncludeGraph(dataSourceMap.Key, dataSourceMap.Value.ToList());options.BatchSize = 10000;}, cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}private bool IsIncludeGraph(DbContext dbContext, IEnumerable<object> entities){// 创建一个 Stopwatch 实例Stopwatch stopwatch = new Stopwatch();stopwatch.Start();var graphNodes = GraphUtil.GetTopologicallySortedGraph(dbContext, entities);if (graphNodes != null){var graphNodesGroupedByType = graphNodes.GroupBy(y => y.Entity.GetType());foreach (var graphNode in graphNodesGroupedByType)_logger.LogInformation($"Data Count: {graphNode.Count()}");// 停止计时stopwatch.Stop();// 显示经过的时间_logger.LogInformation($"Elapsed Time: {stopwatch.ElapsedMilliseconds} ms");_logger.LogInformation($"Elapsed Time: {stopwatch.Elapsed}");if (graphNodesGroupedByType.Count() > 1)return true;}return false;}#regionprivate bool OnlyPrimitiveTypes(object? obj, bool innerFlag = false){if (obj == null)return true;Type objType = obj.GetType();if (IsPrimitiveType(objType))return true;// 如果是自定义类,直接返回 falseif (objType.IsClass && !objType.FullName.Contains("Volo.Abp") && innerFlag == true){return false;}// 如果是集合类型,直接返回falseif (typeof(IEnumerable).IsAssignableFrom(objType) && !objType.FullName.Contains("Volo.Abp")){return false;}// 对于任何非基本类型的自定义对象,检查其属性foreach (PropertyInfo property in objType.GetProperties()){if (!OnlyPrimitiveTypes(property.GetValue(obj), true)){return false;}}return true;}private bool IsPrimitiveType(Type type){// 判断基本类型,包括枚举和字符串if (type.IsPrimitive ||type.IsEnum ||type == typeof(string) ||type == typeof(decimal) ||type == typeof(DateTime) ||type == typeof(TimeSpan) ||type == typeof(Guid) ||type == typeof(byte) ||type == typeof(sbyte) ||type == typeof(short) ||type == typeof(ushort) ||type == typeof(int) ||type == typeof(uint) ||type == typeof(long) ||type == typeof(ulong) ||type == typeof(float) ||type == typeof(double) ||type == typeof(char)){return true;}// 检查是否为可空类型if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>)){return IsPrimitiveType(Nullable.GetUnderlyingType(type));}return false;}#endregion}
}
AbstractShardingAbpDbContext.cs 文件代码
using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Auditing;
using Volo.Abp.Domain.Entities;
using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.Reflection;namespace SPC.SIMDataService.EntityFrameworkCore
{public abstract class AbstractShardingAbpDbContext<TDbContext> : AbpDbContext<TDbContext>, IShardingDbContextwhere TDbContext : DbContext{private bool _createExecutor = false;protected AbstractShardingAbpDbContext(DbContextOptions<TDbContext> options) : base(options){}private IShardingDbContextExecutor _shardingDbContextExecutor;public IShardingDbContextExecutor GetShardingExecutor(){if (!_createExecutor){_shardingDbContextExecutor = this.DoCreateShardingDbContextExecutor();_createExecutor = true;}return _shardingDbContextExecutor;}private IShardingDbContextExecutor DoCreateShardingDbContextExecutor(){var shardingDbContextExecutor = this.CreateShardingDbContextExecutor();if (shardingDbContextExecutor != null){shardingDbContextExecutor.EntityCreateDbContextBefore += (sender, args) =>{CheckAndSetShardingKeyThatSupportAutoCreate(args.Entity);};shardingDbContextExecutor.CreateDbContextAfter += (sender, args) =>{var dbContext = args.DbContext;if (dbContext is AbpDbContext<TDbContext> abpDbContext && abpDbContext.LazyServiceProvider == null){abpDbContext.LazyServiceProvider = this.LazyServiceProvider;if (dbContext is IAbpEfCoreDbContext abpEfCoreDbContext && this.UnitOfWorkManager.Current != null){abpEfCoreDbContext.Initialize(new AbpEfCoreDbContextInitializationContext(this.UnitOfWorkManager.Current));}}};}return shardingDbContextExecutor;}private void CheckAndSetShardingKeyThatSupportAutoCreate<TEntity>(TEntity entity) where TEntity : class{if (entity is IShardingKeyIsGuId){if (entity is IEntity<Guid> guidEntity){if (guidEntity.Id != default){return;}var idProperty = entity.GetObjectProperty(nameof(IEntity<Guid>.Id));var dbGeneratedAttr = ReflectionHelper.GetSingleAttributeOrDefault<DatabaseGeneratedAttribute>(idProperty);if (dbGeneratedAttr != null && dbGeneratedAttr.DatabaseGeneratedOption != DatabaseGeneratedOption.None){return;}EntityHelper.TrySetId(guidEntity,() => GuidGenerator.Create(),true);}}else if (entity is IShardingKeyIsCreationTime){AuditPropertySetter?.SetCreationProperties(entity);}}/// <summary>/// abp 5.x+ 如果存在并发字段那么需要添加这段代码/// </summary>protected override void HandlePropertiesBeforeSave(){if (GetShardingExecutor() == null){base.HandlePropertiesBeforeSave();}}public override void Dispose(){_shardingDbContextExecutor?.Dispose();base.Dispose();}public override async ValueTask DisposeAsync(){if (_shardingDbContextExecutor != null){await _shardingDbContextExecutor.DisposeAsync();}await base.DisposeAsync();}}
}
批量操作数据用例: