当前位置: 首页 > news >正文

Abp+ShardingCore+EFCore.BulkExtensions使用案例

工程模板:Abp Vnext

分表组件:Sharding Core

批量插入组件:EFCore.BulkExtensions

此组件对MSSQL(SQL Server)数据库的批量操作使用MERGE语句,非SqlBulkCopy,SqlBulkCopy处理临时表时/单表插入使用;

MERGE语句用于根据源表和目标表的匹配情况,执行插入、更新和删除操作。

优点:

(1)灵活性: MERGE可以同时处理插入和更新,适合需要根据条件合并数据的场景。

(2)简化代码: 一个语句可以处理多种操作,减少了多条 SQL 语句的需求。

缺点:

(1)性能较低: 对于大批量数据,MERGE 的性能通常不如 SqlBulkCopy,因为它需要执行匹配检查,并且在处理每一行时都会增加额外开销。

(2)复杂性: 语法相对复杂,理解和维护可能比较困难。

目的:向分表或者普通表中批量插入数据,提高写入效率

1、由于EFCore.BulkExtensions批量插入主从表关联数据有缺陷,因此对源码进行了调整;

2、在SPC.XXX.EntityFrameworkCore 工程中引入自定义打包文件EFCore.BulkExtensions 包或者直接引入源码;

3、自定义文件目录

BulkExtensionsEfCoreBulkOperationProvider.cs 文件代码

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using ShardingCore.Extensions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore;namespace SPC.PlantDataService.EntityFrameworkCore
{public class BulkExtensionsEfCoreBulkOperationProvider : IEfCoreBulkOperationProvider{private readonly ILogger<BulkExtensionsEfCoreBulkOperationProvider> _logger;public BulkExtensionsEfCoreBulkOperationProvider(ILogger<BulkExtensionsEfCoreBulkOperationProvider> logger){_logger = logger;}public async Task DeleteManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkDeleteAsync(dataSourceMap.Value.ToList(), cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}public async Task InsertManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkInsertAsync(dataSourceMap.Value.ToList(), options =>{options.PreserveInsertOrder = false;options.IncludeGraph = IsIncludeGraph(dataSourceMap.Key, dataSourceMap.Value.ToList());options.BatchSize = 10000;}, cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}public async Task UpdateManyAsync<TDbContext, TEntity>(IEfCoreRepository<TEntity> repository, IEnumerable<TEntity> entities, bool autoSave, CancellationToken cancellationToken)where TDbContext : IEfCoreDbContextwhere TEntity : class, IEntity{var dbContext = (await repository.GetDbContextAsync()) as PlantDataServiceDbContext;var bulkShardingEnumerable = dbContext.BulkShardingTableEnumerable(entities);try{foreach (var dataSourceMap in bulkShardingEnumerable){await dataSourceMap.Key.BulkUpdateAsync(dataSourceMap.Value.ToList(), options =>{options.PreserveInsertOrder = false;options.IncludeGraph = IsIncludeGraph(dataSourceMap.Key, dataSourceMap.Value.ToList());options.BatchSize = 10000;}, cancellationToken: cancellationToken);}}catch (Exception ex){throw ex;}}private bool IsIncludeGraph(DbContext dbContext, IEnumerable<object> entities){// 创建一个 Stopwatch 实例Stopwatch stopwatch = new Stopwatch();stopwatch.Start();var graphNodes = GraphUtil.GetTopologicallySortedGraph(dbContext, entities);if (graphNodes != null){var graphNodesGroupedByType = graphNodes.GroupBy(y => y.Entity.GetType());foreach (var graphNode in graphNodesGroupedByType)_logger.LogInformation($"Data Count: {graphNode.Count()}");// 停止计时stopwatch.Stop();// 显示经过的时间_logger.LogInformation($"Elapsed Time: {stopwatch.ElapsedMilliseconds} ms");_logger.LogInformation($"Elapsed Time: {stopwatch.Elapsed}");if (graphNodesGroupedByType.Count() > 1)return true;}return false;}#regionprivate bool OnlyPrimitiveTypes(object? obj, bool innerFlag = false){if (obj == null)return true;Type objType = obj.GetType();if (IsPrimitiveType(objType))return true;// 如果是自定义类,直接返回 falseif (objType.IsClass && !objType.FullName.Contains("Volo.Abp") && innerFlag == true){return false;}// 如果是集合类型,直接返回falseif (typeof(IEnumerable).IsAssignableFrom(objType) && !objType.FullName.Contains("Volo.Abp")){return false;}// 对于任何非基本类型的自定义对象,检查其属性foreach (PropertyInfo property in objType.GetProperties()){if (!OnlyPrimitiveTypes(property.GetValue(obj), true)){return false;}}return true;}private bool IsPrimitiveType(Type type){// 判断基本类型,包括枚举和字符串if (type.IsPrimitive ||type.IsEnum ||type == typeof(string) ||type == typeof(decimal) ||type == typeof(DateTime) ||type == typeof(TimeSpan) ||type == typeof(Guid) ||type == typeof(byte) ||type == typeof(sbyte) ||type == typeof(short) ||type == typeof(ushort) ||type == typeof(int) ||type == typeof(uint) ||type == typeof(long) ||type == typeof(ulong) ||type == typeof(float) ||type == typeof(double) ||type == typeof(char)){return true;}// 检查是否为可空类型if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>)){return IsPrimitiveType(Nullable.GetUnderlyingType(type));}return false;}#endregion}
}

AbstractShardingAbpDbContext.cs 文件代码

using Microsoft.EntityFrameworkCore;
using ShardingCore.Extensions;
using ShardingCore.Sharding.Abstractions;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Auditing;
using Volo.Abp.Domain.Entities;
using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.Reflection;namespace SPC.SIMDataService.EntityFrameworkCore
{public abstract class AbstractShardingAbpDbContext<TDbContext> : AbpDbContext<TDbContext>, IShardingDbContextwhere TDbContext : DbContext{private bool _createExecutor = false;protected AbstractShardingAbpDbContext(DbContextOptions<TDbContext> options) : base(options){}private IShardingDbContextExecutor _shardingDbContextExecutor;public IShardingDbContextExecutor GetShardingExecutor(){if (!_createExecutor){_shardingDbContextExecutor = this.DoCreateShardingDbContextExecutor();_createExecutor = true;}return _shardingDbContextExecutor;}private IShardingDbContextExecutor DoCreateShardingDbContextExecutor(){var shardingDbContextExecutor = this.CreateShardingDbContextExecutor();if (shardingDbContextExecutor != null){shardingDbContextExecutor.EntityCreateDbContextBefore += (sender, args) =>{CheckAndSetShardingKeyThatSupportAutoCreate(args.Entity);};shardingDbContextExecutor.CreateDbContextAfter += (sender, args) =>{var dbContext = args.DbContext;if (dbContext is AbpDbContext<TDbContext> abpDbContext && abpDbContext.LazyServiceProvider == null){abpDbContext.LazyServiceProvider = this.LazyServiceProvider;if (dbContext is IAbpEfCoreDbContext abpEfCoreDbContext && this.UnitOfWorkManager.Current != null){abpEfCoreDbContext.Initialize(new AbpEfCoreDbContextInitializationContext(this.UnitOfWorkManager.Current));}}};}return shardingDbContextExecutor;}private void CheckAndSetShardingKeyThatSupportAutoCreate<TEntity>(TEntity entity) where TEntity : class{if (entity is IShardingKeyIsGuId){if (entity is IEntity<Guid> guidEntity){if (guidEntity.Id != default){return;}var idProperty = entity.GetObjectProperty(nameof(IEntity<Guid>.Id));var dbGeneratedAttr = ReflectionHelper.GetSingleAttributeOrDefault<DatabaseGeneratedAttribute>(idProperty);if (dbGeneratedAttr != null && dbGeneratedAttr.DatabaseGeneratedOption != DatabaseGeneratedOption.None){return;}EntityHelper.TrySetId(guidEntity,() => GuidGenerator.Create(),true);}}else if (entity is IShardingKeyIsCreationTime){AuditPropertySetter?.SetCreationProperties(entity);}}/// <summary>/// abp 5.x+ 如果存在并发字段那么需要添加这段代码/// </summary>protected override void HandlePropertiesBeforeSave(){if (GetShardingExecutor() == null){base.HandlePropertiesBeforeSave();}}public override void Dispose(){_shardingDbContextExecutor?.Dispose();base.Dispose();}public override async ValueTask DisposeAsync(){if (_shardingDbContextExecutor != null){await _shardingDbContextExecutor.DisposeAsync();}await base.DisposeAsync();}}
}

批量操作数据用例:

http://www.dtcms.com/a/308413.html

相关文章:

  • MCU中的DAC(数字模拟转换器)是什么?
  • 动态挑战-响应机制和密钥轮换
  • 算法练习:JZ32 从上往下打印二叉树
  • iOS高级开发工程师面试——其他
  • 磁盘坏道检测工具在美国服务器硬件维护中的使用规范
  • Linux 计划任务管理
  • 【在线五子棋对战】十一、整合封装服务器模块实现
  • linux git ssh配置过程
  • chrome.storage 和 localStorage
  • 自动化与配置管理工具 ——SaltStack
  • 用 AI 自动生成口型同步视频,短视频内容也能一人完成
  • 基于深度学习的医学图像分析:使用YOLOv5实现医学图像目标检测
  • 测试平台进化论:如何在CI/CD时代重构软件质量防线
  • # 前端开发规范基础汇总
  • 掌握Python三大语句:顺序、条件与循环
  • 深度解析:基于Python构建的闲鱼自动化营销与信息发送机器人
  • 暄桐:如何脱离“不学无术”的状态?
  • 集成学习方法之随机森林:从原理到实战的深度解析
  • pip库版本升级
  • vue vxe-table :edit-config=“editConfig“ 可以编辑的表格
  • Netcat终极实战指南:从端口扫描到渗透测试
  • Multimodal Fusion on Low-quality Data: A Comprehensive Survey 中文版
  • Java面试高频题目
  • 基于notepad++的sensor寄存器序列文本处理方法(以后会随时更新补充)
  • STM32F1到STM32F0的标准库移植指南--GPIO篇
  • 认知绞肉机:个体实践视域下认知暴力与元认知升维的活体实验研究
  • 如何将 Redis 监控集成到微服务整体的监控体系中( 如 Prometheus + Grafana)
  • [12月考试] E
  • 使用EasyPOI实现Java订单数据导出(含多物料信息)——模板语法详解与实战
  • Redis实战(4)-- BitMap结构与使用