当前位置: 首页 > news >正文

多任务实时进度监控系统:基于ABP vNext与SignalR的架构实践

需求背景

在实际业务场景中,我们常遇到需要长时间执行的后台任务(如数据处理、报表生成等)。这类任务通常包含多个子任务,需要满足:

  1. 实时更新每个子任务的进度状态
  2. 前端页面关闭后任务不中断
  3. 多个全局任务独立运行和监控
  4. 多用户查看同一任务时进度同步
  5. 新用户打开页面自动显示最新状态

实现思路

启动任务
更新进度
SignalR广播
多任务分组
多任务分组
多任务分组
前端页面
后端API
任务管理服务
持久化存储
后台作业系统
执行子任务
前端组件
页面A-任务1
页面B-任务2
页面C-任务3

后端实现(ABP vNext)

1. 多任务管理服务
public interface IMultiTaskService : ISingletonDependency
{Task StartOrResumeTask(string taskId, string taskName);TaskProgressDto GetTaskProgress(string taskId);Task UpdateTaskProgress(string taskId, int completed, int total, string status = "Running");List<TaskInfoDto> GetAllTasks();
}
public class MultiTaskService : IMultiTaskService
{private readonly ILogger<MultiTaskService> _logger;private readonly IBackgroundJobManager _jobManager;private readonly IHubContext<MultiTaskHub> _hubContext;private readonly ConcurrentDictionary<string, TaskProgressDto> _tasks = new ConcurrentDictionary<string, TaskProgressDto>();public Task StartOrResumeTask(string taskId, string taskName){if (_tasks.TryGetValue(taskId, out var task) && task.Status == "Running"){return Task.CompletedTask;}// 初始化任务进度_tasks[taskId] = new TaskProgressDto{TaskId = taskId,TaskName = taskName,Status = "Running",StartTime = DateTime.UtcNow,Completed = 0,Total = 100 // 初始值,实际执行中会更新};// 启动后台任务return _jobManager.EnqueueAsync<MultiTaskJob, MultiTaskJobArgs>(new MultiTaskJobArgs { TaskId = taskId, TaskName = taskName });}public TaskProgressDto GetTaskProgress(string taskId){return _tasks.TryGetValue(taskId, out var task) ? task : new TaskProgressDto { TaskId = taskId, Status = "NotFound" };}public async Task UpdateTaskProgress(string taskId, int completed, int total, string status = "Running"){if (_tasks.TryGetValue(taskId, out var task)){task.Completed = completed;task.Total = total;task.Status = status;task.Percentage = total > 0 ? (int)Math.Round((double)completed / total * 100) : 0;task.LastUpdated = DateTime.UtcNow;if (status == "Completed" || status == "Failed"){task.EndTime = DateTime.UtcNow;}// 广播给所有关注此任务的客户端await _hubContext.Clients.Group(taskId).SendAsync("ReceiveTaskProgress", task);}}public List<TaskInfoDto> GetAllTasks(){return _tasks.Values.Select(t => new TaskInfoDto{TaskId = t.TaskId,TaskName = t.TaskName,Status = t.Status,Percentage = t.Percentage}).ToList();}
}
2. 多任务后台作业系统
public class MultiTaskJob : AsyncBackgroundJob<MultiTaskJobArgs>, ITransientDependency
{private readonly IMultiTaskService _taskService;private readonly ILogger<MultiTaskJob> _logger;public MultiTaskJob(IMultiTaskService taskService, ILogger<MultiTaskJob> logger){_taskService = taskService;_logger = logger;}public override async Task ExecuteAsync(MultiTaskJobArgs args){var taskId = args.TaskId;var taskName = args.TaskName;try{_logger.LogInformation($"开始执行全局任务: {taskName} ({taskId})");// 获取该类型任务的所有子任务var subtasks = GetSubtasksForTaskType(taskName);await _taskService.UpdateTaskProgress(taskId, 0, subtasks.Count);for (int i = 0; i < subtasks.Count; i++){// 执行子任务await ExecuteSubtask(subtasks[i]);// 更新进度await _taskService.UpdateTaskProgress(taskId, i + 1, subtasks.Count);// 模拟耗时操作await Task.Delay(1000);}// 任务完成await _taskService.UpdateTaskProgress(taskId, subtasks.Count, subtasks.Count, "Completed");_logger.LogInformation($"全局任务完成: {taskName} ({taskId})");}catch (Exception ex){_logger.LogError(ex, $"全局任务失败: {taskName} ({taskId})");await _taskService.UpdateTaskProgress(taskId, 0, 0, "Failed");}}private List<Subtask> GetSubtasksForTaskType(string taskName){// 根据任务类型返回不同的子任务列表return taskName switch{"数据同步" => new List<Subtask>{new("连接数据源"),new("读取数据"),new("转换数据格式"),new("写入目标系统"),new("验证数据一致性")},"报表生成" => new List<Subtask>{new("收集数据"),new("计算指标"),new("生成图表"),new("生成PDF"),new("发送通知")},"系统备份" => new List<Subtask>{new("创建快照"),new("压缩数据"),new("传输到备份服务器"),new("验证完整性"),new("清理临时文件")},_ => throw new ArgumentException($"未知任务类型: {taskName}")};}private async Task ExecuteSubtask(Subtask subtask){// 实际业务逻辑_logger.LogInformation($"执行子任务: {subtask.Name}");// 模拟耗时操作await Task.Delay(new Random().Next(500, 2000));}
}public record Subtask(string Name);
3. SignalR Hub实现
[HubRoute("/hubs/multiTask")]
public class MultiTaskHub : Hub
{private readonly IMultiTaskService _taskService;public MultiTaskHub(IMultiTaskService taskService){_taskService = taskService;}public override async Task OnConnectedAsync(){await base.OnConnectedAsync();}// 客户端加入任务组public async Task JoinTaskGroup(string taskId){await Groups.AddToGroupAsync(Context.ConnectionId, taskId);// 立即发送当前任务进度var progress = _taskService.GetTaskProgress(taskId);await Clients.Caller.SendAsync("ReceiveTaskProgress", progress);}// 客户端离开任务组public async Task LeaveTaskGroup(string taskId){await Groups.RemoveFromGroupAsync(Context.ConnectionId, taskId);}
}

API 控制器

[Route("api/tasks")]
public class TaskController : AbpController
{private readonly IMultiTaskService _taskService;public TaskController(IMultiTaskService taskService){_taskService = taskService;}[HttpPost("start")]public async Task<ActionResult> StartTask([FromBody] StartTaskRequest request){await _taskService.StartOrResumeTask(request.TaskId, request.TaskName);return Ok();}[HttpGet("progress/{taskId}")]public TaskProgressDto GetTaskProgress(string taskId){return _taskService.GetTaskProgress(taskId);}[HttpGet("all")]public List<TaskInfoDto> GetAllTasks(){return _taskService.GetAllTasks();}
}public class StartTaskRequest
{public string TaskId { get; set; }public string TaskName { get; set; }
}public class TaskInfoDto
{public string TaskId { get; set; }public string TaskName { get; set; }public string Status { get; set; }public int Percentage { get; set; }
}

前端实现(Vue+ElementUI)

1. 全局任务进度组件
<template><el-dialog :visible.sync="dialogVisible":title="taskName"width="80%"@closed="disconnectSignalR"><div class="task-container"><div class="global-progress"><el-progress :percentage="globalPercentage":status="globalStatus":stroke-width="20"/><div class="progress-info"><span>{{ completed }}/{{ total }} ({{ globalPercentage }}%)</span><el-tag :type="statusTagType">{{ statusText }}</el-tag><span class="time-info">启动: {{ startTime | formatTime }} <span v-if="endTime"> | 结束: {{ endTime | formatTime }}</span></span></div></div><el-divider>子任务详情</el-divider><el-table :data="subtasks" v-loading="loading"height="300px"><el-table-column prop="name" label="任务名称" width="200" /><el-table-column label="状态" width="120"><template #default="{ row, $index }"><el-tag :type="getSubtaskStatus($index) === 'success' ? 'success' : 'primary'">{{ getSubtaskStatus($index) === 'success' ? '已完成' : '进行中' }}</el-tag></template></el-table-column><el-table-column label="进度"><template #default="{ row, $index }"><el-progress :percentage="getSubtaskPercentage($index)":status="getSubtaskStatus($index)"/></template></el-table-column></el-table></div></el-dialog>
</template><script>
import * as signalR from '@microsoft/signalr';export default {props: {taskId: {type: String,required: true},taskName: {type: String,required: true},subtasks: {type: Array,default: () => []}},data() {return {dialogVisible: false,connection: null,loading: false,// 任务状态completed: 0,total: 0,status: 'NotStarted',startTime: null,endTime: null};},computed: {globalPercentage() {return this.total > 0 ? Math.round((this.completed / this.total) * 100) : 0;},globalStatus() {return this.status === 'Completed' ? 'success' : this.status === 'Failed' ? 'exception' : 'primary';},statusText() {return {NotStarted: '未开始',Running: '运行中',Completed: '已完成',Failed: '已失败'}[this.status] || this.status;},statusTagType() {return {NotStarted: 'info',Running: 'primary',Completed: 'success',Failed: 'danger'}[this.status];}},filters: {formatTime(value) {return value ? new Date(value).toLocaleTimeString() : '';}},methods: {openDialog() {this.dialogVisible = true;this.$nextTick(() => this.initSignalR());},async initSignalR() {this.loading = true;// 1. 先获取当前进度try {const progress = await this.$api.get(`/api/tasks/progress/${this.taskId}`);this.updateProgress(progress);} catch (error) {console.error('获取进度失败', error);}// 2. 连接 SignalR Hubthis.connection = new signalR.HubConnectionBuilder().withUrl('/hubs/multiTask').withAutomaticReconnect().build();// 3. 监听进度更新this.connection.on('ReceiveTaskProgress', this.updateProgress);// 4. 启动连接并加入任务组try {await this.connection.start();await this.connection.invoke('JoinTaskGroup', this.taskId);console.log(`已加入任务组: ${this.taskId}`);} catch (err) {console.error('SignalR 连接失败', err);}this.loading = false;},disconnectSignalR() {if (this.connection) {this.connection.invoke('LeaveTaskGroup', this.taskId);this.connection.stop();this.connection = null;}},updateProgress(progress) {if (!progress || progress.taskId !== this.taskId) return;this.completed = progress.completed || 0;this.total = progress.total || 0;this.status = progress.status || 'NotStarted';this.startTime = progress.startTime;this.endTime = progress.endTime;},getSubtaskPercentage(index) {if (this.status !== 'Running') {return this.status === 'Completed' ? 100 : 0;}const subtaskCount = this.subtasks.length;const subtaskSize = this.total / subtaskCount;// 已完成的任务if (index * subtaskSize < this.completed) return 100;// 当前任务if (index * subtaskSize <= this.completed && (index + 1) * subtaskSize > this.completed) {return Math.round((this.completed - index * subtaskSize) / subtaskSize * 100);}return 0;},getSubtaskStatus(index) {const percentage = this.getSubtaskPercentage(index);return this.status === 'Completed' ? 'success' : this.status === 'Failed' ? 'exception' : percentage === 100 ? 'success': 'primary';}}
};
</script><style scoped>
.task-container {padding: 15px;
}.global-progress {margin-bottom: 20px;
}.progress-info {display: flex;align-items: center;margin-top: 10px;gap: 15px;font-size: 14px;
}.time-info {margin-left: auto;color: #909399;font-size: 13px;
}
</style>
2. 在不同页面中使用任务监控
页面A.vue - 数据同步任务
<template><div class="page-container"><h2>数据同步管理</h2><el-button type="primary" @click="startDataSync":loading="starting">开始数据同步</el-button><global-task-progress ref="syncTaskProgress"taskId="data_sync_task"taskName="数据同步":subtasks="[{ name: '连接数据源' },{ name: '读取数据' },{ name: '转换数据格式' },{ name: '写入目标系统' },{ name: '验证数据一致性' }]"/></div>
</template><script>
import GlobalTaskProgress from '@/components/GlobalTaskProgress.vue';export default {components: { GlobalTaskProgress },data() {return {starting: false};},methods: {async startDataSync() {this.starting = true;try {// 启动后端任务await this.$api.post('/api/tasks/start', {taskId: 'data_sync_task',taskName: '数据同步'});// 打开进度监控this.$refs.syncTaskProgress.openDialog();} catch (error) {this.$message.error('启动数据同步任务失败');console.error(error);} finally {this.starting = false;}}}
};
</script>
页面B.vue - 报表生成任务
<template><div class="page-container"><h2>月度报表生成</h2><el-button type="primary" @click="startReportGeneration":loading="starting">生成月度报表</el-button><global-task-progress ref="reportTaskProgress"taskId="monthly_report_task"taskName="报表生成":subtasks="[{ name: '收集数据' },{ name: '计算指标' },{ name: '生成图表' },{ name: '生成PDF' },{ name: '发送通知' }]"/></div>
</template><script>
import GlobalTaskProgress from '@/components/GlobalTaskProgress.vue';export default {components: { GlobalTaskProgress },data() {return {starting: false};},methods: {async startReportGeneration() {this.starting = true;try {await this.$api.post('/api/tasks/start', {taskId: 'monthly_report_task',taskName: '报表生成'});this.$refs.reportTaskProgress.openDialog();} catch (error) {this.$message.error('启动报表生成任务失败');console.error(error);} finally {this.starting = false;}}}
};
</script>
页面C.vue - 系统备份任务
<template><div class="page-container"><h2>系统备份</h2><el-button type="warning" @click="startSystemBackup":loading="starting">执行系统备份</el-button><global-task-progress ref="backupTaskProgress"taskId="system_backup_task"taskName="系统备份":subtasks="[{ name: '创建快照' },{ name: '压缩数据' },{ name: '传输到备份服务器' },{ name: '验证完整性' },{ name: '清理临时文件' }]"/></div>
</template><script>
import GlobalTaskProgress from '@/components/GlobalTaskProgress.vue';export default {components: { GlobalTaskProgress },data() {return {starting: false};},methods: {async startSystemBackup() {this.starting = true;try {await this.$api.post('/api/tasks/start', {taskId: 'system_backup_task',taskName: '系统备份'});this.$refs.backupTaskProgress.openDialog();} catch (error) {this.$message.error('启动系统备份任务失败');console.error(error);} finally {this.starting = false;}}}
};
</script>
3. 全局任务概览页面
<template><div class="dashboard-container"><h1>全局任务监控中心</h1><el-row :gutter="20"><el-col :span="8" v-for="task in tasks" :key="task.taskId"><el-card class="task-card"><div slot="header" class="clearfix"><span>{{ task.taskName }}</span><el-tag :type="statusTagType(task)" class="status-tag">{{ taskStatusText(task) }}</el-tag></div><div class="task-progress"><el-progress :percentage="task.percentage" :status="taskStatus(task)" :stroke-width="12"/></div><div class="task-actions"><el-button v-if="task.status === 'Running'"type="primary"size="small"@click="viewTaskDetails(task.taskId)">查看详情</el-button><el-button v-if="task.status === 'NotStarted' || task.status === 'Failed'"type="success"size="small"@click="restartTask(task)">重新启动</el-button></div></el-card></el-col></el-row><!-- 任务详情弹窗 --><component v-for="task in activeTasks":key="task.taskId":is="getTaskComponent(task.taskId)":ref="`taskProgress_${task.taskId}`"/></div>
</template><script>
import GlobalTaskProgress from '@/components/GlobalTaskProgress.vue';// 不同任务对应的子任务列表
const taskSubtasks = {'data_sync_task': [{ name: '连接数据源' },{ name: '读取数据' },{ name: '转换数据格式' },{ name: '写入目标系统' },{ name: '验证数据一致性' }],'monthly_report_task': [{ name: '收集数据' },{ name: '计算指标' },{ name: '生成图表' },{ name: '生成PDF' },{ name: '发送通知' }],'system_backup_task': [{ name: '创建快照' },{ name: '压缩数据' },{ name: '传输到备份服务器' },{ name: '验证完整性' },{ name: '清理临时文件' }]
};export default {components: { GlobalTaskProgress },data() {return {tasks: [],loading: true,intervalId: null,activeTasks: []};},mounted() {this.loadTasks();this.intervalId = setInterval(this.loadTasks, 5000);},beforeDestroy() {clearInterval(this.intervalId);},methods: {async loadTasks() {try {this.tasks = await this.$api.get('/api/tasks/all');this.loading = false;} catch (error) {console.error('获取任务列表失败', error);}},statusTagType(task) {return {Running: 'primary',Completed: 'success',Failed: 'danger',NotStarted: 'info'}[task.status] || 'info';},taskStatusText(task) {return {Running: '运行中',Completed: '已完成',Failed: '已失败',NotStarted: '未开始'}[task.status] || task.status;},taskStatus(task) {return task.status === 'Completed' ? 'success' : task.status === 'Failed' ? 'exception' : 'primary';},viewTaskDetails(taskId) {// 激活任务组件if (!this.activeTasks.some(t => t.taskId === taskId)) {this.activeTasks.push({taskId,taskName: this.tasks.find(t => t.taskId === taskId).taskName});}// 打开进度弹窗this.$nextTick(() => {this.$refs[`taskProgress_${taskId}`][0].openDialog();});},async restartTask(task) {try {await this.$api.post('/api/tasks/start', {taskId: task.taskId,taskName: task.taskName});// 打开进度监控this.viewTaskDetails(task.taskId);} catch (error) {this.$message.error('重启任务失败');console.error(error);}},getTaskComponent(taskId) {return {template: `<global-task-progress :ref="'taskProgress_${taskId}'"taskId="${taskId}"taskName="${this.tasks.find(t => t.taskId === taskId)?.taskName || taskId}":subtasks="taskSubtasks['${taskId}'] || []"/>`,components: { GlobalTaskProgress },data: () => ({ taskSubtasks })};}}
};
</script><style scoped>
.dashboard-container {padding: 20px;
}.task-card {margin-bottom: 20px;height: 180px;display: flex;flex-direction: column;
}.status-tag {float: right;
}.task-progress {flex-grow: 1;display: flex;align-items: center;padding: 10px 0;
}.task-actions {display: flex;justify-content: flex-end;
}
</style>

功能亮点

  1. 多任务并行监控

    • 每个任务独立ID标识
    • 独立进度存储和广播通道
    • 任务类型可动态扩展
  2. 实时进度同步

    • SignalR分组广播机制
    • 新客户端自动获取最新状态
    • 智能计算子任务进度百分比
  3. 容错与恢复

    sequenceDiagram前端->>后端: 启动任务后端->>数据库: 持久化任务状态后端->>后台作业: 执行子任务后台作业->>后端: 更新进度后端->>所有客户端: SignalR广播前端关闭->>后端: 任务继续执行新客户端->>后端: 请求当前进度后端->>新客户端: 返回最新状态
    
  4. 可视化体验优化

    • 全局进度条+子任务详情双视图
    • 状态颜色编码(运行中/成功/失败)
    • 时间轴展示(开始/结束时间)
  5. 系统管理功能

    • 全局任务概览面板
    • 任务手动重启机制
    • 历史任务清理策略

总结

本文提出的基于ABP vNext和SignalR的多任务监控方案,通过五大关键技术点解决核心问题:

  1. 任务持久化:ABP后台作业保证任务中断后可恢复
  2. 实时通信:SignalR分组广播实现多客户端同步
  3. 进度计算:智能算法展示子任务进度
  4. 组件复用:统一进度组件支持多页面集成
  5. 状态管理:全局服务维护多任务状态

技术栈:ABP vNext · SignalR · Vue3 · ElementPlus · Redis · Docker

http://www.dtcms.com/a/321558.html

相关文章:

  • [激光原理与应用-175]:测量仪器 - 频谱型 - 拉曼光谱仪的工作原理、内部组成、核心芯片、核心算法
  • 项目一系列-第3章 若依框架入门
  • Java中的方法引用操作符(::)详解与实战应用
  • “A flash of inspiration“, protect us from prompt injection?
  • 实习的收获
  • 【Jmeter】设置线程组运行顺序的方法
  • 安装部署K8S集群环境(实测有效版本)
  • 复杂姿态漏检率↓79%!陌讯多模态算法在安全带穿戴识别的落地实践
  • Node.js Turbo 包入门教程
  • web端-登录页面验证码的实现(springboot+vue前后端分离)超详细
  • (Arxiv-2025) CINEMA:通过基于MLLM的引导实现多主体一致性视频生成
  • 基于Jeecgboot3.8.1的flowable流程审批人与发起人相同设置-前端部分
  • Vue2与Vue3 Hooks对比:写法差异与演进思考
  • 【3d61638 渍韵】001 png pdf odt 5与明天各种号(虚拟文章スミレ数据)
  • PDF处理控件Aspose.PDF教程:使用 C#、Java 和 Python 代码调整 PDF 页面大小
  • 以rabbitmq为例演示podman导出导入镜像文件
  • kafka 为什么需要分区?分区的引入带来了哪些好处
  • Kafka + 时间轮 + 数据库实现延迟队列方案
  • 前端开发:JavaScript(7)—— Web API
  • 机器学习视角下的黄金市场动态:3400美元关口的多因子驱动机制
  • Seata分布式事务环境搭建
  • Access开发右下角浮窗提醒
  • RS485转Profibus网关在QDNA钠离子分析仪与S7-300PLC系统集成中的应用
  • 深入解析K-means聚类:从原理到调优实战
  • 基于STM32F030C8T6单片机实现与CH224Q诱骗芯片的I2C通信和电压输出配置
  • 9:USB摄像头的最后一战(上):MP4音视频合封!
  • 《MySQL索引底层原理:B+树、覆盖索引与最左前缀法则》
  • TF 上架全流程实战,从构建到 TestFlight 分发
  • iOS 签名证书全流程详解,申请、管理与上架实战
  • 飞算JavaAI深度剖析:开启Java开发智能新时代