当前位置: 首页 > news >正文

Asp.Net Core SignalR导入数据

文章目录

  • 前言
  • 一、安装包
  • 二、使用步骤
    • 1.实现SignalR Hub服务:
    • 2.实现CSV文件解析及数据导入服务
    • 3.控制器
    • 4.前端实现(vue)
  • 三、关键技术点说明
  • 总结


前言

导入CSV文件中的数据到数据库,使用CsvHelper解析CSV文件,SqlBulkCopy批量导入数据,SignalR Hub 进度推送。

一、安装包

  1. CsvHelper

    Install-Package CsvHelper
    
  2. SqlBulkCopy

    Install-Package Microsoft.EntityFrameworkCore.SqlServer
    

    注意安装包版本,根据自己项目的版本选择合适的Packages

二、使用步骤

1.实现SignalR Hub服务:

  1. MyHubService.cs

    using Microsoft.AspNetCore.Authorization;
    using Microsoft.AspNetCore.Identity;
    using Microsoft.AspNetCore.SignalR;
    using Microsoft.EntityFrameworkCore;
    using SignalRDemo.Data;
    using SignalRDemo.Entity;
    using SignalRDemo.Interfaces;
    using System.Collections.Concurrent;
    using System.Linq;
    using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;namespace SignalRDemo.HubService
    {public class MyHubService:Hub{private readonly MyDbContext myDbContext;private readonly UserManager<User> userManager;// 在内存中缓存组信息以提高性能private static readonly ConcurrentDictionary<string, GroupInfo> _groups =new ConcurrentDictionary<string, GroupInfo>();// 存储用户-连接ID映射private static readonly ConcurrentDictionary<string, string> _userConnections = new();// 存储组信息//private static readonly ConcurrentDictionary<string, HashSet<string>> _groups = new();// 存储管理员列表private static readonly HashSet<string> _admins = new();private readonly IImportECDictService _importECDictService;public MyHubService(UserManager<User> userManager, MyDbContext myDbContext, IImportECDictService importECDictService){this.userManager = userManager;this.myDbContext = myDbContext;_importECDictService = importECDictService;}// 客户端连接时调用public override async Task OnConnectedAsync(){var connectionID=Context.ConnectionId;var userID=Context.UserIdentifier;var userName = Context.User.Identity.Name;// 用户连接时加入其所属的组var userGroups = await GetUserGroupsAsync(Convert.ToInt64(userID));foreach (var groupName in userGroups){await Groups.AddToGroupAsync(Context.ConnectionId, groupName);}// 主动发送 connectionId 给客户端await Clients.Caller.SendAsync("ReceiveConnectionId", Context.ConnectionId);await base.OnConnectedAsync();存储用户-连接 映射//_userConnections[connectionID] = userID; 将用户加入"所有用户"组//await Groups.AddToGroupAsync(connectionID, "AllUsers"); 如果用户是管理员,加入管理员组//if (Context.User?.IsInRole("admin") == true)//{//    _admins.Add(userID);//    await Groups.AddToGroupAsync(connectionID, "AdminUsers");//}//await base.OnConnectedAsync();}// 客户端断开连接时调用public override async Task OnDisconnectedAsync(Exception? exception){var connectionID = Context.ConnectionId;if (_userConnections.TryRemove(connectionID,out var userID)){if (_admins.Contains(userID)){await Groups.RemoveFromGroupAsync(connectionID,"AdminUsers");}// 从所有组中移除await Groups.RemoveFromGroupAsync(connectionID, "AllUsers");}await base.OnDisconnectedAsync(exception);}// 在应用启动时加载组信息public async Task InitializeGroupsAsync(){try{var groups = await myDbContext.Groups.Include(g => g.Members).ToListAsync();foreach (var group in groups){var groupInfo = new GroupInfo{GroupId = group.GroupId,GroupName = group.GroupName,MemberIds = group.Members.Select(m => m.UserId).ToHashSet()};_groups.TryAdd(group.GroupName, groupInfo);}}catch (Exception ex){throw;}}private async Task<IEnumerable<string>> GetUserGroupsAsync(long userId){return await myDbContext.GroupMembers.Where(gm => gm.UserId == userId).Select(gm => gm.Group.GroupName).ToListAsync();}/// <summary>/// 向所有用户发送消息/// </summary>/// <param name="user"></param>/// <param name="content"></param>/// <returns></returns>[Authorize(Roles = "admin")]public async Task SendMessageAsync(string user, string content){//var connectionId = this.Context.ConnectionId;//string msg = $"{connectionId},{DateTime.Now.ToString()}:{user}";await Clients.All.SendAsync("ReceiveMsg", user, content);}/// <summary>/// 向特定用户发送消息 /// </summary>/// <param name="toUserName">接收者</param>/// <param name="content">发送的消息</param>/// <returns></returns>public async Task SendPrivateMsgAsync(string toUserName, string content){try{var senderUserID = Context.UserIdentifier;var senderUser= await userManager.FindByIdAsync(senderUserID);var toUser = await userManager.FindByNameAsync(toUserName);await Clients.User(toUser.Id.ToString()).SendAsync("ReceivePrivateMsg", senderUser.UserName, content);}catch (Exception ex){throw;}}   /// <summary>/// 向特定组发送消息/// </summary>/// <param name="groupName"></param>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendGroupMsgAsynnc(string groupName, string sender, string content){await Clients.Group(groupName).SendAsync("ReceiveGroupMsg", sender, groupName, content);}/// <summary>/// 向管理员组AdminUsers发送消息/// </summary>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendAdminMsgAsync(string sender, string content){await Clients.Group("AdminUsers").SendAsync("ReceiveAdminMsg", sender, content);}/// <summary>/// 向除发送者外的所有客户端发送消息/// </summary>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendOthersMsg(string sender, string content){await Clients.Others.SendAsync("ReceiveMsg",sender, content);}/// <summary>/// 创建自定义组/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task CreateGroup(string groupName){long userId = Convert.ToInt64(Context.UserIdentifier);if (_groups.ContainsKey(groupName)){await Clients.Caller.SendAsync("GroupCreationFailed", "组已存在");return;}// 创建新组并保存到数据库var group = new Group{GroupName = groupName,CreatedAt = DateTime.UtcNow,CreatorId = userId};myDbContext.Groups.Add(group);await myDbContext.SaveChangesAsync();// 添加到内存缓存var groupInfo = new GroupInfo{GroupId = group.GroupId,GroupName = groupName,MemberIds = new HashSet<long> { userId }};_groups.TryAdd(groupName, groupInfo);// 创建者自动加入组            await AddUserToGroup(groupName, userId);await Clients.All.SendAsync("GroupCreated", groupName);}private async Task AddUserToGroup(string groupName, long userId){try{var groupInfo = _groups[groupName];// 添加到数据库var groupMember = new GroupMember{GroupId = groupInfo.GroupId,UserId = userId,JoinedAt = DateTime.UtcNow};myDbContext.GroupMembers.Add(groupMember);await myDbContext.SaveChangesAsync();}catch (Exception){throw;}}/// <summary>/// 加入自定义组/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task JoinGroup(string groupName){var userId = Convert.ToInt64(Context.UserIdentifier);if (!_groups.TryGetValue(groupName, out var groupInfo)){await Clients.Caller.SendAsync("JoinGroupFailed", "组不存在");return;}if (groupInfo.MemberIds.Contains(userId)){await Clients.Caller.SendAsync("JoinGroupFailed", "您已在该组中");return;}// 添加用户到组await AddUserToGroup(groupName, userId);// 更新内存缓存groupInfo.MemberIds.Add(userId);// 将用户加入 SignalR 组await Groups.AddToGroupAsync(Context.ConnectionId, groupName);await Clients.Group(groupName).SendAsync("UserJoinedGroup", Context.User.Identity.Name, groupName);            }/// <summary>/// 用户离开自定义组/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task LeaveGroup(string groupName){var userId = Convert.ToInt64(Context.UserIdentifier);                if (!_groups.TryGetValue(groupName, out var groupInfo) ||!groupInfo.MemberIds.Contains(userId)){await Clients.Caller.SendAsync("LeaveGroupFailed", "您不在该组中");return;}// 从组中移除用户await RemoveUserFromGroup(groupName, userId);// 更新内存缓存groupInfo.MemberIds.Remove(userId);// 如果组为空,删除组if (groupInfo.MemberIds.Count == 0){await DeleteGroup(groupName);}else{// 将用户移出 SignalR 组await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);await Clients.Group(groupName).SendAsync("UserLeftGroup", Context.User.Identity.Name, groupName);}}private async Task RemoveUserFromGroup(string groupName, long userId){var groupInfo = _groups[groupName];// 从数据库移除var groupMember = await myDbContext.GroupMembers.FirstOrDefaultAsync(gm => gm.GroupId == groupInfo.GroupId && gm.UserId == userId);if (groupMember != null){myDbContext.GroupMembers.Remove(groupMember);await myDbContext.SaveChangesAsync();}}private async Task DeleteGroup(string groupName){if (_groups.TryRemove(groupName, out var groupInfo)){// 从数据库删除组var group = await myDbContext.Groups.FindAsync(groupInfo.GroupId);if (group != null){myDbContext.Groups.Remove(group);await myDbContext.SaveChangesAsync();}await Clients.All.SendAsync("GroupDeleted", groupName);}}public async Task SendProgress(string connectionId, ImportProgress progress){await Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}}
    }
    

2.实现CSV文件解析及数据导入服务

  1. IImportECDictService.cs

    namespace SignalRDemo.Interfaces
    {public interface IImportECDictService{Task ImportECDictAsync(string connectionId, IFormFile file);}
    }
  2. ImportECDictService.cs

    using SignalRDemo.Entity;
    using SignalRDemo.Interfaces;
    using System.Globalization;
    using CsvHelper;
    using CsvHelper.Configuration;
    using Microsoft.AspNetCore.SignalR;
    using SignalRDemo.HubService;
    using Microsoft.Data.SqlClient;
    using System.Data;namespace SignalRDemo.Repositories
    {public class ImportECDictService : IImportECDictService{private readonly IHubContext<MyHubService> _hubContext;private readonly IConfiguration _configuration;public ImportECDictService(IHubContext<MyHubService> hubContext, IConfiguration configuration){_hubContext = hubContext;_configuration = configuration;}public async Task ImportECDictAsync(string connectionId,IFormFile file){var progress = new ImportProgress { Status = "开始解析文件" };// 1. 解析CSV文件var records = await ParseCsvFile(file, connectionId, progress);// 2. 批量导入数据库await BulkInsertToDatabase(records, connectionId, progress);}/// <summary>/// 解析CSV文件/// </summary>/// <param name="file"></param>/// <param name="connectionId"></param>/// <param name="progress"></param>/// <returns></returns>public async Task<List<ECDictCSV>> ParseCsvFile(IFormFile file,string connectionId,ImportProgress progress){var records = new List<ECDictCSV>();var config = new CsvConfiguration(CultureInfo.InvariantCulture){// 通过 CultureInfo 设置分隔符(英文环境默认为逗号)// 若需使用其他分隔符(如分号),可创建自定义 CultureInfoDelimiter = ",", // 此行为兼容性保留,实际由 CultureInfo 控制// 其他配置HasHeaderRecord = true,MissingFieldFound = null,HeaderValidated = null};using (var stream = file.OpenReadStream())using (var reader = new StreamReader(stream))using (var csv = new CsvReader(reader, config)){// 读取所有记录并映射到模型records = await csv.GetRecordsAsync<ECDictCSV>().ToListAsync();// 更新进度progress.TotalRecords = records.Count;progress.Status = "文件解析完成,准备导入数据库";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}return records;}/// <summary>/// 分批次导入CSV数据到数据库/// </summary>/// <param name="records"></param>/// <param name="connectionId"></param>/// <param name="progress"></param>/// <returns></returns>private async Task BulkInsertToDatabase(List<ECDictCSV> records,string connectionId,ImportProgress progress){const int batchSize = 100;var totalBatches = (int)Math.Ceiling((double)records.Count / batchSize);var connect=_configuration.GetSection("ConnectionStrings").Get<ConnectionStrings>();using (var connection = new SqlConnection(connect.DefaultConnection)){await connection.OpenAsync();for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++){var batch = records.Skip(batchIndex * batchSize).Take(batchSize).ToList();using (var bulkCopy = new SqlBulkCopy(connection)){bulkCopy.DestinationTableName = "T_ECDictCSVs";// 映射列bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Word), "Word");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Phonetic), "Phonetic");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Definition), "Definition");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Translation), "Translation");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Pos), "Pos");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Collins), "Collins");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Oxford), "Oxford");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Tag), "Tag");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Bnc), "Bnc");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Frg), "Frg");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Exchange), "Exchange");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Detail), "Detail");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Audio), "Audio");// 创建DataTablevar dataTable = new DataTable();dataTable.Columns.Add(nameof(ECDictCSV.Word), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Phonetic), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Definition), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Translation), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Pos), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Collins), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Oxford), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Tag), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Bnc), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Frg), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Exchange), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Detail), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Audio), typeof(string));// 添加数据行foreach (var record in batch){dataTable.Rows.Add(record.Word,record.Phonetic, record.Definition,record.Translation,record.Pos, record.Collins, record.Oxford, record.Tag, record.Bnc, record.Frg, record.Exchange, record.Detail, record.Audio);}try{// 执行批量插入await bulkCopy.WriteToServerAsync(dataTable);// 更新进度progress.ProcessedRecords += batch.Count;progress.TotalRecords = records.Count;progress.Status = $"正在导入数据:批次 {batchIndex + 1}/{totalBatches}";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}catch (Exception ex){throw;}}                    }}// 导入完成progress.Status = "导入完成";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}}
    }

3.控制器

  1. ImportECDictController.cs
    using Microsoft.AspNetCore.Http;
    using Microsoft.AspNetCore.Mvc;
    using SignalRDemo.Interfaces;namespace SignalRDemo.Controllers
    {[Route("api/[controller]/[action]")][ApiController]public class ImportECDictController : ControllerBase{private readonly IImportECDictService _importECDictService;public ImportECDictController(IImportECDictService importECDictService){_importECDictService = importECDictService;}[HttpPost]public async Task<IActionResult> UploadCsv(IFormFile file){if (file == null || file.Length == 0){return BadRequest("请选择有效的 CSV 文件");}if (!Path.GetExtension(file.FileName).Equals(".csv", StringComparison.OrdinalIgnoreCase)){return BadRequest("文件必须是 CSV 格式");}// 获取客户端的SignalR连接IDvar connectionId = HttpContext.Request.Query["connectionId"].ToString();if (string.IsNullOrEmpty(connectionId))return BadRequest("缺少connectionId参数");try{await _importECDictService.ImportECDictAsync(connectionId,file);return Ok("导入任务已启动");}catch (Exception ex){return StatusCode(500, $"解析失败:{ex.Message}");}}}
    }
    

4.前端实现(vue)

  1. 示例:

    <div class="card" v-if="state.isConnected"> <input type="file" id="csvFile" accept=".csv" /><button @click="uploadFile" :disabled="state.ecdictStatus.includes('正在导入数据')">导入</button><!-- 进度条组件 --><progress :value="state.currentCount" :max="state.total" class="custom-progress"/><!-- 显示进度文本 --><div class="progress-info">{{ state.currentCount }} / {{ state.total }} ({{ progressPercentage }}%)</div><div id="status" style="color: #f87171;">{{state.ecdictStatus}}</div></div><script>
    import { reactive, computed,onMounted } from 'vue';
    import * as signalR from '@microsoft/signalr';export default {setup() {const state = reactive({     serverUrl: "https://localhost:7183/Hubs/MyHubService",connection: null,isConnected: false,isConnecting: false,isLoggingIn: false,connectionId: null,total:0,currentCount:0,ecdictStatus:"",errorDetails:,// 消息记录messages: [],});// 页面加载时执行onMounted(async () => {//fetchRoles(); // 页面加载时立即获取角色列表});// 计算进度百分比const progressPercentage = computed(() => {return Math.round((state.currentCount / state.total) * 100);});const uploadFile= async()=>{        const fileInput = document.getElementById("csvFile");if (!fileInput.files || fileInput.files.length === 0) {alert("请选择CSV文件");return;}const file = fileInput.files[0];const formData = new FormData();formData.append("file", file);try{if (!state.isConnected) return;state.ecdictStatus="正在导入数据";// 调用后端 API(需根据实际路径调整)const apiUrl = state.serverUrl.split('/Hubs/')[0] || 'https://localhost:7183';const response = await fetch(`${apiUrl}/api/ImportECDict/UploadCsv?connectionId=${state.connectionId}`, {method: "POST",                headers: {// 'Authorization': `Bearer ${state.token}` // 若需要认证},body: formData});if (!response.ok) {throw new Error(`导入失败: ${response.status}`);}const upres = await response.json();}catch(error){console.error("导入数据失败:", error);state.errorDetails = error.message;}};// 初始化SignalR连接const initSignalRConnection = async (token) => {state.isConnecting = true;state.connectionStatus = "正在连接...";state.errorDetails = "";try {if (state.connection) {await state.connection.stop();state.connection = null;}// 创建新连接state.connection = new signalR.HubConnectionBuilder().withUrl(state.serverUrl, {accessTokenFactory: () => token,skipNegotiation: true,transport: signalR.HttpTransportType.WebSockets}).withAutomaticReconnect().configureLogging(signalR.LogLevel.Information).build();// 注册消息处理程序// 监听服务端发送的 connectionIdstate.connection.on("ReceiveConnectionId", (connectionId) => {state.connectionId = connectionId;console.log("从服务端获取的连接ID:", connectionId);});//监听导入进度消息state.connection.on("ReceiveProgress",(progress)=>{console.warn("progress.ProcessedRecords ", progress.processedRecords);console.warn("progress.TotalRecords>0", progress.totalRecords);// 确保总记录数和已处理记录数有效if (progress.totalRecords > 0 && progress.processedRecords <= progress.totalRecords) {state.total = progress.totalRecords;state.currentCount = progress.processedRecords;state.ecdictStatus=progress.status;} else {console.warn("无效的进度数据:", progress);}// state.total=progress.TotalRecords;// state.currentCount=progress.ProcessedRecords;});// 连接状态变化state.connection.onreconnecting(() => {state.isConnected = false;state.connectionStatus = "连接丢失,正在重连...";});state.connection.onreconnected(async (connectionId) => {state.isConnected = true;state.connectionStatus = "已重新连接";});state.connection.onclose(() => {state.isConnected = false;state.connectionStatus = "连接已关闭";});// 启动连接await state.connection.start();alert("连接状态:"+ state.connection.state); // 应为 "Connected"state.isConnected = true;state.isConnecting = false;state.connectionId = state.connection.connectionId;//因为异步原因,此处可能为空state.connectionStatus = "已连接";} catch (error) {state.isConnected = false;state.isConnecting = false;state.connectionStatus = `连接失败: ${error.message}`;state.errorDetails = error.toString();}        };return {...//其他方法uploadFile,progressPercentage};}
    }
    </script>
    

三、关键技术点说明

  • SqlBulkCopy 优化

    • 使用分批处理(每 X 条记录一批)
    • 直接映射列以提高性能
    • 使用异步方法避免阻塞线程
  • 进度通知机制

    • 客户端通过 SignalR 建立持久连接
    • 服务端按批次更新进度并推送
    • 前端实时更新进度条和状态信息
  • 错误处理:

    • 捕获并返回导入过程中的异常
    • 确保事务一致性(必要时可使用数据库事务)

总结

通过以上步骤,可以实现简单的CSV文件解析,批量导入数据库,实时显示进度的功能。

相关文章:

  • VAS1085Q奇力科技LED驱动芯片车规级线性芯片
  • 8.3.1_冒泡排序
  • AI的发展过程:深度学习中的自然语言处理(NLP);大语言模型(LLM)详解;Transformer 模型结构详解;大模型三要素:T-P-G 原则
  • 《HarmonyOSNext弹窗:ComponentContent动态玩转企业级弹窗》
  • 告别excel:AI 驱动的数据分析指南
  • CentOS7自带的yum依然无法联网到官方源
  • 【C/C++】怎样设计一个合理的函数
  • 相机--单目相机
  • 7. 整数反转
  • Linux 内核 Slab 分配器核心组件详解
  • 基于51单片机和8X8点阵屏、独立按键的跳跃躲闪类小游戏
  • 如何在同一台电脑上安装并运行多个版本的 IntelliJ IDEA
  • xilinx的GT配置说明(一)
  • 【考研数学:高数6】一元函数微分学的应用(二)——中值定理、微分等式和微分不等式
  • AT2659低噪声放大器芯片
  • [KCTF]rev_babyrev
  • 永磁同步电机控制算法--抗饱和PI
  • C#winform画图代码记录
  • 基于地形数据计算山体阴影
  • YOLO-FireAD:通过混合注意力与双池化融合实现高精度实时火灾检测
  • 卡盟怎么网站怎么做/广告推广怎么做最有效
  • 张家港企业做网站/百度账号申请注册
  • 广元做网站/推介网
  • 动画网站源码/关键词优化骗局
  • 做网站企业经营范围/百度最新秒收录方法2023
  • 麻涌做网站/谷歌怎么投放广告