异步委托执行管理器:更新
1. 核心功能
DelegateSpooler
是一个 异步委托执行管理器,主要职责包括:
-
委托队列管理:通过管道ID(
pipe
)组织待执行的ThreadStart
委托 -
异步任务调度:使用
BeginInvoke
/EndInvoke
实现异步执行 -
执行状态跟踪:监控正在运行的任务(通过
execed
字典) -
线程安全控制:通过锁机制保护共享资源
2. 关键设计解析
(1)双缓冲任务队列
数据结构 | 用途 | 线程安全 |
---|---|---|
List<ThreadStart> delegates | 待执行任务队列 | lock 保护 |
Dictionary<int, KeyValuePair<ThreadStart, IAsyncResult>> execed | 执行中任务记录 | 仅后台线程访问 |
(2)异步执行流程
3)管道(Pipe)机制
-
数字ID标识:每个管道对应一个任务槽位
-
状态查询:
IsPipeExecuting
检查任务是否正在执行 -
动态扩容:
InitQueue
初始化指定数量的管道
3. 重要方法详解
(1)任务提交(Set)
public void Set(int pipe, ThreadStart del)
{lock (delegates) {// 边界检查后更新指定管道任务if (0 <= pipe && pipe < delegates.Count) {delegates[pipe] = del; // 替换现有任务}}
}
-
线程安全:通过
lock
保护delegates
修改 -
幂等性:重复设置同一管道会覆盖前一个任务
(2)后台工作循环(bgWorker)
private void bgWorker()
{while (true) {// 1. 检查已完成任务foreach (var item in execed.ToList()) {if (item.Value.Value.IsCompleted) {try { item.Value.Key.EndInvoke(item.Value.Value); }catch { /* 静默处理异常 */ }execed.Remove(item.Key);}}// 2. 启动新任务lock (delegates) {for (int i = 0; i < delegates.Count; i++) {if (delegates[i] != null && !execed.ContainsKey(i)) {var wrapped = InternalUtilities.WrapDelegateForCulture(delegates[i]);execed[i] = new KeyValuePair<ThreadStart, IAsyncResult>(wrapped,wrapped.BeginInvoke(null, null) // 异步启动);delegates[i] = null; // 标记为已调度}}}Thread.Sleep(10); // 降低CPU占用}
}
(3)文化感知委托包装
InternalUtilities.WrapDelegateForCulture(delegates[i])
-
作用:保持委托执行时的文化上下文(CultureInfo)
-
典型实现:
public static ThreadStart WrapDelegateForCulture(ThreadStart start)
{CultureInfo culture = Thread.CurrentThread.CurrentCulture;return () => {Thread.CurrentThread.CurrentCulture = culture;start();};
}
4. 线程模型分析
组件 | 线程类型 | 职责 |
---|---|---|
worker | 后台线程 | 持续执行 bgWorker 循环 |
委托执行 | 线程池线程 | 通过 BeginInvoke 使用IOCP线程 |
关键特性:
-
非阻塞设计:主线程提交任务后立即返回
-
后台优先级:
IsBackground=true
确保进程退出时线程自动终止 -
异常隔离:单个任务异常不会影响整体调度
5. 典型使用场景
(1)硬件控制系统的异步操作
var spooler = new DelegateSpooler("MotorControl");
spooler.InitQueue(4); // 初始化4个管道// 提交任务到管道0
spooler.Set(0, () => motor.MoveToPosition(100));// 检查任务状态
if (!spooler.IsPipeExecuting(0))
{Console.WriteLine("管道0任务已完成");
}
(2)事件聚合处理
// 多个事件源通过不同管道提交任务
sensor1.DataReceived += (_,e) => spooler.Set(1, () => ProcessSensor1(e.Data));
sensor2.DataReceived += (_,e) => spooler.Set(2, () => ProcessSensor2(e.Data));
6. 潜在问题与改进
(1)内存泄漏风险
-
问题:
execed
字典可能积累已完成任务的记录(如果EndInvoke
未被调用) -
解决:强制清理机制
internal void CleanCompleted()
{var completed = execed.Where(kv => kv.Value.Value.IsCompleted).ToList();foreach (var item in completed) {execed.Remove(item.Key);}
}
(2)线程池压力
-
问题:大量
BeginInvoke
可能耗尽线程池 -
优化:限制并发数
private SemaphoreSlim _throttle = new SemaphoreSlim(10);
// 在bgWorker中:
_throttle.Wait();
execed[i] = ...;
wrapped.BeginInvoke(_ => _throttle.Release(), null);
(3)现代替代方案
// 使用Task替代ThreadStart
private List<Func<Task>> _tasks = new List<Func<Task>>();// 使用CancellationToken支持取消
private CancellationTokenSource _cts = new CancellationTokenSource();
7. 完整调用链示例
总结
-
设计初衷:提供基于管道的异步任务调度能力,适用于需要控制任务执行顺序和状态的场景。
-
优势:
-
精确控制任务生命周期(提交→执行→完成)
-
管道隔离不同任务流
-
文化上下文保持
-
-
局限:
-
依赖旧的APM模式(
BeginInvoke
/EndInvoke
) -
缺乏现代
async/await
支持
-
推荐改进方向:
-
迁移到
Task
-based API -
增加取消支持
-
实现
IAsyncDisposable
接口