当前位置: 首页 > news >正文

C# 异步任务队列封装

在 C# 中,可以使用 Task 和 ConcurrentQueue 来构建一个 异步任务队列,确保任务按照 FIFO(先进先出)顺序执行,并支持并发安全。

设计方案
任务队列 (ConcurrentQueue<Func>)
存储异步任务(每个任务都是 Func)
任务按 FIFO 方式执行

后台任务处理器
使用 SemaphoreSlim 控制并发
任务会按照顺序执行,避免阻塞主线程
存在并行执行任务

支持功能
EnqueueTask(Func):将新任务加入队列
自动处理任务:后台异步循环,按顺序执行任务
队列管理(支持动态添加任务、取消任务)
支持返回值和无返回值两种类型

代码示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public class AsyncTaskQueue
{
    private readonly ConcurrentQueue<TaskItem> _taskQueue = new();
    private readonly SemaphoreSlim _semaphore = new(1, 1); // 控制顺序任务执行
    private bool _isProcessing = false; // 任务处理状态

    private enum TaskType
    {
        Sequential,  // 顺序执行任务
        Parallel     // 并行执行任务
    }

    private class TaskItem
    {
        public Func<Task> TaskFunc { get; }
        public TaskType Type { get; }

        public TaskItem(Func<Task> taskFunc, TaskType type)
        {
            TaskFunc = taskFunc;
            Type = type;
        }
    }

    /// <summary>
    /// 添加 **顺序执行** 的无返回值任务到队列
    /// </summary>
    public void EnqueueTask(Func<Task> taskFunc)
    {
        _taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Sequential));
        ProcessQueue();
    }

    /// <summary>
    /// 添加 **顺序执行** 的有返回值任务到队列,并返回 `Task<T>`
    /// </summary>
    public Task<T> EnqueueTask<T>(Func<Task<T>> taskFunc)
    {
        var tcs = new TaskCompletionSource<T>();
        _taskQueue.Enqueue(new TaskItem(async () =>
        {
            try
            {
                T result = await taskFunc();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }, TaskType.Sequential));

        ProcessQueue();
        return tcs.Task;
    }

    /// <summary>
    /// 立即执行 **并行任务**(无返回值)
    /// </summary>
    public void EnqueueParallelTask(Func<Task> taskFunc)
    {
        _taskQueue.Enqueue(new TaskItem(taskFunc, TaskType.Parallel));
        ProcessQueue();
    }

    /// <summary>
    /// 立即执行 **并行任务**(有返回值)
    /// </summary>
    public Task<T> EnqueueParallelTask<T>(Func<Task<T>> taskFunc)
    {
        var tcs = new TaskCompletionSource<T>();
        _taskQueue.Enqueue(new TaskItem(async () =>
        {
            try
            {
                T result = await taskFunc();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }, TaskType.Parallel));

        ProcessQueue();
        return tcs.Task;
    }

    /// <summary>
    /// 处理队列中的任务(支持顺序执行 & 并行执行)
    /// </summary>
    private async void ProcessQueue()
    {
        if (_isProcessing) return;
        _isProcessing = true;

        while (_taskQueue.TryDequeue(out var taskItem))
        {
            if (taskItem.Type == TaskType.Parallel)
            {
                // 并行任务:立即执行,不等待
                _ = Task.Run(taskItem.TaskFunc);
            }
            else
            {
                // 顺序任务:等待执行
                await _semaphore.WaitAsync();
                try
                {
                    await taskItem.TaskFunc();
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"任务执行失败: {ex.Message}");
                }
                finally
                {
                    _semaphore.Release();
                }
            }
        }

        _isProcessing = false;
    }
}


调用示例

using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var queue = new AsyncTaskQueue();

        // **顺序执行任务**
        queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 1 开始...");
            await Task.Delay(2000);
            Console.WriteLine("顺序任务 1 结束");
        });

        queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 2 开始...");
            await Task.Delay(1000);
            Console.WriteLine("顺序任务 2 结束");
        });

        // **顺序执行任务(有返回值)**
        Task<int> task3 = queue.EnqueueTask(async () =>
        {
            Console.WriteLine("顺序任务 3(有返回值)开始...");
            await Task.Delay(1500);
            Console.WriteLine("顺序任务 3 结束");
            return 42;
        });

        // **并行任务**
        queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 A 开始...");
            await Task.Delay(3000);
            Console.WriteLine("并行任务 A 结束");
        });

        queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 B 开始...");
            await Task.Delay(2000);
            Console.WriteLine("并行任务 B 结束");
        });

        // **并行任务(有返回值)**
        Task<string> parallelTask = queue.EnqueueParallelTask(async () =>
        {
            Console.WriteLine("并行任务 C(有返回值)开始...");
            await Task.Delay(2500);
            Console.WriteLine("并行任务 C 结束");
            return "Hello, Parallel!";
        });

        // 等待顺序任务完成
        int result3 = await task3;
        Console.WriteLine($"顺序任务 3 返回值: {result3}");

        // 等待并行任务完成
        string parallelResult = await parallelTask;
        Console.WriteLine($"并行任务 C 返回值: {parallelResult}");

        // 等待所有任务完成
        await Task.Delay(5000);
    }
}



相关文章:

  • 测试直播postman+Jenkins所学
  • fiddler everywhere 绿色永久版
  • windows自动锁屏,并且要输入密码。如何取消?
  • 13 【HarmonyOS NEXT】 仿uv-ui组件开发之Avatar组件进阶指南(四)
  • Go语言分布式ID生成策略优选:UUID、Snowflake、XID、ObjectID、Krand性能对比评测
  • vite:初学 p5.js demo 画圆圈
  • vulnhub靶场之【digitalworld.local系列】的vengeance靶机
  • TMS320F28P550SJ9学习笔记6:SCI所有寄存器__结构体寄存器方式配置 SCI通信初始化__库函数发送测试
  • C语言-作用域与存储期
  • 25、《Spring Boot 3.0.0 集成 Nacos2.2 》
  • 极坐标轴 极坐标的使用 极坐标坐标轴和刻度线
  • OpenDevin:开源AI软件工程师的通用代理平台
  • python从入门到精通(二十五):文件操作和目录管理难度分级练习题
  • 登录校验会话技术
  • 01 微服务与 Spring Cloud 就是从“大杂烩”到“精密协作”的架构演进的必然结果
  • 数据结构:python实现最大堆算法
  • 联核科技AGV无人叉车的应用场景有哪些?
  • 3D数字化:家居行业转型升级的关键驱动力
  • 【电赛推荐芯片】差分放大器:INA143,仪表放大器:INA128 INA333 PGA204
  • 在WSL2-Ubuntu中安装CUDA12.8、cuDNN、Anaconda、Pytorch并验证安装
  • 七彩云南旅游网页设计毕业论文/百度热搜seo
  • ps做网站对齐技巧/google play商店
  • 网站开发需求单/成都黑帽seo
  • 郑州市多商家网站制作公司/专业网站制作
  • 如何做微信网站建设/seo企业推广案例
  • web网站开发毕设/郑州seo技术