首页 > 其他分享 >【多线程笔记】Channel

【多线程笔记】Channel

时间:2023-12-19 15:13:35浏览次数:30  
标签:Writer 笔记 Channels static channel Reader 多线程 Channel

在面对 生产者-消费者 的场景下, netcore 提供了一个新的命名空间 System.Threading.Channels 来帮助我们更高效的处理此类问题,有了这个 Channels 存在, 生产者 和 消费者 可以各自处理自己的任务而不相互干扰,有利于两方的并发处理,这篇文章我们就来讨论下如何使用 System.Threading.Channels。
要想使用 生产者-消费者 场景,除Channel外还有几种实现途径,比如:BlockingCollection 和 TPL Dataflow。

为什么要使用 Channel

可以利用 Channels 来实现 生产者和消费者 之间的解耦,大体上有两个好处:
生产者 和 消费者 是相互独立的,两者可以并行执行。
如果生产者不给力,可以创建多个的生产者,如果消费者不给力,可以创建更多的消费者。
总的来说,在 生产者-消费者 模式下可以帮助我们提高应用程序的吞吐率。

创建 channel

本质上来说,你可以创建两种类型的 channel,一种是有限容量的 bound channel,一种是无限容量的 unbound channel,接下来的问题是,如何创建呢?Channels 提供了两种 工厂方法 用于创建,如下代码所示:

  • CreateBounded 创建的 channel 是一个有消息上限的通道。
  • CreateUnbounded 创建的 channel 是一个无消息上限的通道。

下面的代码片段展示了如何创建 unbounded channel,并且只能存放 string 类型。

        static void Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<string>();
        }

对了,Bounded channel 还提供了一个 FullMode 属性,用于指定当 channel 已满时该如何对插入的 message 进行处理,通常有四种做法。

  • Wait
  • DropWrite
  • DropNewest
  • DropOldest

下面的代码片段展示了如何在 Bounded channel 上使用 FullMode。

        static void Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
        }

将消息写入到 channel

要想将 message 写入到 channel,可以使用 WriteAsync() 方法,如下代码所示:

        static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            await channel.Writer.WriteAsync("Hello World!");
        }

从 channel 中读取消息

要想从 channel 中读取 message,可以使用 ReadAsync(),如下代码所示:

        static async Task Main(string[] args)
        {
            var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var message))
                {
                    Console.WriteLine(message);
                }
            }
        }

System.Threading.Channels 例子

下面是完整的代码清单,展示了如何从 channel 中读写 message。

class Program
{
    // 创建一个 Channel 对象  
    private static Channel<int> channel = Channel.CreateUnbounded<int>();
    static async Task Main()
    {
        // 启动一个新线程来写入数据到 Channel  
        Task.Run(WriteDataToChannel).ConfigureAwait(false);
        // 启动一个新线程来从 Channel 读取数据  
        await Task.Run(ReadDataFromChannel);


        Console.ReadKey();
    }

    static void WriteDataToChannel()
    {
        for (int i = 0; i < 10; i++)
        {
            channel.Writer.TryWrite(i);
            Thread.Sleep(1000); // 模拟耗时操作  
        }
        channel.Writer.Complete(); // 通知读取者数据已写入完毕  
    }

    static async Task ReadDataFromChannel()
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            while (channel.Reader.TryRead(out int data))
            {
                Console.WriteLine($"Read data: {data}"); // 处理读取到的数据  
            }
        }
    }
}

消费者的“订阅”主要是由以下几种方式来进行的:
1、Channel.Reader.WaitToReadAsync()。这个方法会在当有数据可用时返回true
2、Channel.Reader.ReadAllAsync()。注意这个方法返回的是一个IAsyncEnumerable。IAsyncEnumerable是C#8推出的一种异步流模型。我们可以使用await foreach这种方式来消费异步流。在Channel中,如果Writer没有明确Complete,那么await foreach也会成为一个死循环。所以在使用这个模型前,需要注意。

    await foreach (var item in channel.Reader.ReadAllAsync())//async stream,在没有被生产者明确Complete的情况下,这里会一致阻塞下去
    {
        Console.WriteLine(item);
    }

3、Channel.Reader.ReadAsync()。

生产者的“生产”主要是由以下几种方式来进行的:
1、Channel.Writer.WriteAsync(),生产一条消息。
2、Channel.Writer.WaitToWriteAsync()。当有空间可写时,返回一个true。因为Channel有限容类型的Channel,所以这个方法也可以作为一个屏障,当Channel空间已满时,进行阻塞。
3、Channel.Writer.Complete()。发送一个信号,告知Reader消费者,Channel已经不会再发送任何消息了。此时Channel的状态变为closed,如果Reader继续读取信息,则会抛异常:

System.Threading.Channels.ChannelClosedException:“The channel has been closed.”

标签:Writer,笔记,Channels,static,channel,Reader,多线程,Channel
From: https://www.cnblogs.com/fanfan-90/p/17913696.html

相关文章

  • 圆方树学习笔记
    今天在做ABC318G这道题,要用到圆方树的知识,于是就去学了圆方树。学习圆方树首先需要学习点双连通分量以及缩点,此处不多赘述。圆方树中分两种类型的点:圆点和方点。圆点指的是原来的无向图中的所有点,而方点指的是每一个点双连通分量所代表的点。相当于每一个点双连通分量就是一个......
  • 状压 DP 学习笔记
    前言2023.8.30开始停课集训。开始补\(CSP-S\)的知识点,先打算来学状压\(DP\)。定义状压\(DP\)的全称是状态压缩动态规划,也是动态规划中的一种。但是其与普通\(DP\)不同的是它将某种状态(一般为二进制\(01\)串,\(1\)表示选,\(0\)表示不选。也有其它进制)作为了\(dp\)......
  • c#学习笔记-------------------------readonly修饰符
    一、ReadOnly关键字MSDN官方的解释readonly 关键字是可以在字段上使用的修饰符。当字段声明包括 readonly 修饰符时,该声明引入的字段赋值只能作为声明的一部分出现,或者出现在同一类的构造函数中.具体意思是:readonly是一个修饰字段的关键字:被它修饰的字段只有在初始化或者......
  • rust语言_学习笔记
    rust语言_学习笔记转载注明来源:本文链接来自osnosn的博客,写于2023-12-10.安装rust【安装_rustup_cargo_rustc_交叉编译测试】cargo的config设置更换ustc源,使用代理。设置缺省registry。见【rustcargo配置】。crate库搜索去【crates.io】搜索去【docs.......
  • 网络流学习笔记
    这个必须写。先梳理一下,到时候再整理,证明先简写或者跳过。流网络:一个有向图,每条边有一个容量,有一个源点\(s\)和一个汇点\(t\)。每条边有一个属性称为容量,如果把流网络抽象成水管的话,那么边的容量就是每根水管的每秒最大承受的进水量。每条边也有一个流量,这个值大于等于\(0\)......
  • 阅读笔记《掌握需求过程》2
    这次我们从第三章开始看,项目启动有关的事项。这一章包含12小节,即icebreaker项目(就是本书中为了方便读者理解需求过程,始终贯穿的实例),产品目标——我们需要该产品的原因是什么,谁为它付钱:客户和顾客,用户——理解他们,风险承担者和顾问,需求限制条件,为您的宝宝命名,设定范围,该产品的成本......
  • 大数据实验报告 | 填坑笔记
     利用JavaAPI进行这个查找操作的时候,总是顺序输出,考虑是代码的原因 没有进行判定,所以只要不为空都输出出来了,进行条件判定指定行键之后,就可以了!redis启动不起来,考虑换个端口  input目录的创建过程遇到一些小问题 删除不掉就用完整目录删  地址对应正确,否则......
  • [学习笔记]珂朵莉树
    目录0x00:介绍1x00:思想1x01:节点保存1x02:核心操作split1x03:推平操作assign2x00:例题2x01:CF896C2x02:CF915E3x00:总结0x00介绍珂朵莉树(ChthollyTree),又称ODT(OldDriverTree),一种数据结构,但似乎暴力到不能称之为数据结构。可以很好地骗分,在随机数据下十分有效,常用于将\([l......
  • LightGCL Simple Yet Effective Graph Contrastive Learning For Recommendation论文
    Abstract目前的图对比学习方法都存在一些问题,它们要么对用户-项目交互图执行随机增强,要么依赖于基于启发式的增强技术(例如用户聚类)来生成对比视图。这些方法都不能很好的保留内在的语义结构,而且很容易受到噪声扰动的影响。所以我们提出了一个图对比学习范式LightGCL来减轻基于CL......
  • 《架构师之路:软件架构之美》阅读笔记三
    《架构师之路:软件架构之美》是一本关于软件架构的入门书籍,作者李家智从自己的实践经验出发,结合了业内一些经典的案例和经验,系统地介绍了软件架构的基本概念、原则和方法。本书主要分为三个部分:第一部分介绍了软件架构的基本概念和原则;第二部分详细介绍了一些常用的软件架构模式,如......