当前位置: 首页 > news >正文

C# Channel

  • 核心概念
  • 创建Channel
    • 无界通道
    • 有界通道
      • `FullMode`选项
  • 生产者-消费者模式
    • 生产者写入数据
    • 消费者读取数据
  • 完整示例
  • 高级配置
    • 优化选项:
    • 取消操作:通过 `CancellationToken` 取消读写。
  • 错误处理
  • 适用场景
  • `Channel`的类型
  • 创建`Channel`
  • 写入和读取消息
  • 使用场景
  • 示例代码
  • 注意事项

C#中, System.Threading.Channels 提供了 高效的异步生产-消费模型,适用于多任务间的数据传递。以下是其核心概念及使用方法的总结:

核心概念

Channel<T>:异步消息队列,支持多生产者和多消费者。

ChannelWriter<T>:用于异步写入数据(WriteAsync),完成后需调用 Complete()

ChannelReader<T>:用于异步读取数据,支持 ReadAsyncReadAllAsync 遍历。

创建Channel

无界通道

var channel = Channel.CreateUnbounded<int>();

容量无限,适用于不确定数据量的场景。

有界通道

var options = new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait // 满时等待
};
var channel = Channel.CreateBounded<int>(options);

FullMode选项

  • Wait(默认):写入时阻塞直到有空间。

  • DropOldest/DropNewest:丢弃最旧/最新数据。

  • DropWrite:丢弃当前写入的数据。

生产者-消费者模式

生产者写入数据

async Task Producer(ChannelWriter<int> writer)
{
    for (int i = 0; i < 10; i++)
    {
        await writer.WriteAsync(i);
        await Task.Delay(100);
    }
    writer.Complete(); // 标记完成
}

消费者读取数据

async Task Consumer(ChannelReader<int> reader)
{
    // 方式1: ReadAllAsync遍历
    await foreach (var item in reader.ReadAllAsync())
    {
        Console.WriteLine($"Received: {item}");
    }

    // 方式2: 手动循环
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out var item))
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

完整示例

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();

        var producer = Producer(channel.Writer);
        var consumer = Consumer(channel.Reader);

        await Task.WhenAll(producer, consumer);
    }

    static async Task Producer(ChannelWriter<int> writer)
    {
        try
        {
            for (int i = 0; i < 10; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(100);
            }
        }
        catch (Exception ex)
        {
            writer.Complete(ex); // 传递异常
        }
        finally
        {
            writer.Complete();
        }
    }

    static async Task Consumer(ChannelReader<int> reader)
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync())
            {
                Console.WriteLine($"Processed: {item}");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error: {ex.Message}");
        }
    }
}

高级配置

优化选项:

var options = new UnboundedChannelOptions()
{
    SingleWriter = true,  // 单一生产者优化
    SingleReader = false  // 允许多消费者
};

取消操作:通过 CancellationToken 取消读写。

await writer.WriteAsync(item, cancellationToken);

错误处理

生产者异常时,调用 writer.Complete(ex) 通知消费者。

消费者通过 try-catch 捕获遍历时的异常。

适用场景

数据流水线处理。

高吞吐量的异步任务。

多任务间的负载均衡。


C#中,System.Threading.Channels 是一个强大的异步通信机制,主要用于实现生产者-消费者模式。它提供了线程安全的通道(Channel),用于在不同线程之间传递数据。以下是关于C# Channel的详细介绍:

Channel的类型

Channel有两种类型:
有界通道(Bounded Channel):具有固定容量,当通道已满时,可以根据指定的策略处理新消息。
无界通道(Unbounded Channel):没有容量限制,适合生产者和消费者速度匹配的场景。

创建Channel

使用Channel.CreateBounded<T>创建有界通道,需要指定容量和满时的处理策略(如WaitDropNewestDropOldest等)。
使用Channel.CreateUnbounded<T>创建无界通道。

写入和读取消息

生产者通过channel.Writer.WriteAsync()方法写入消息。
消费者通过channel.Reader.ReadAsync()channel.Reader.WaitToReadAsync()读取消息。

使用场景

Channel主要用于生产者-消费者模式,可以实现高效的异步数据处理。它支持多线程操作,并可以通过SingleReaderSingleWriter属性限制通道的读写行为。

示例代码

以下是一个简单的生产者-消费者示例:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait
});

Task producer = Task.Run(async () =>
{
    for (int i = 0; i < 10; i++)
    {
        await channel.Writer.WriteAsync(i);
        Console.WriteLine($"Produced: {i}");
    }
    channel.Writer.Complete();
});

Task consumer = Task.Run(async () =>
{
    while (await channel.Reader.WaitToReadAsync())
    {
        if (channel.Reader.TryRead(out var item))
        {
            Console.WriteLine($"Consumed: {item}");
        }
    }
});

await Task.WhenAll(producer, consumer);

注意事项

  • 缓冲区溢出:生产者写入速度过快可能导致缓冲区溢出。
  • 正确关闭Channel:在数据完全消费后关闭Channel,避免数据丢失。

相关文章:

  • 【性能测试】Jmeter下载安装、环境配置-小白使用手册(1)
  • 国产编辑器EverEdit - 脚本(解锁文本编辑的无限可能)
  • Python开发Scikit-learn面试题及参考答案
  • 下载WindTerm学习的二三知识
  • 利用阿里云Atlas地区选择器与Plotly.js实现数据可视化与交互
  • TicTacToe(井字棋 包含 3*3 和 4*4)游戏开发案例 【C++】
  • 【数据结构】初识集合框架及背后的数据结构(简单了解)
  • Git和GitHub基础教学
  • 蓝耘携手通义万相2.1:引领AI创作革新,重塑视觉体验
  • python爬虫:Android自动化工具Auto.js的详细使用
  • 【Java学习】泛型
  • 知识库Dify和cherry无法解析影印pdf word解决方案
  • 个人记录,Unity资源解压和管理插件
  • FX-继承访问权限问题
  • 力扣经典题目:螺旋矩阵
  • 六十天前端强化训练之第十五天React组件基础案例:创建函数式组件展示用户信息(第15-21天:前端框架(React))
  • P2P中NAT穿越方案(UDP/TCP)(转)
  • [AI]从零开始的ComflyUI安装教程
  • Shell脚本编程基础篇(2)
  • 前端 UI 框架发展史
  • 确诊前列腺癌后,拜登首次发声
  • 学人、学术、学科、学脉:新时代沾溉下的中国西方史学史
  • 抖音开展“AI起号”专项治理,整治利用AI生成低俗猎奇视频等
  • 莱布雷希特专栏:古典乐坛边缘人
  • 深一度|上座率连创纪录撬动文旅,中超可否复制大连模式
  • 前四月国家铁路发送货物12.99亿吨,同比增长3.6%