在面对 生产者-消费者 的场景下, netcore 提供了一个新的命名空间 System.Threading.Channels 来帮助我们更高效的处理此类问题,有了这个 Channels 存在, 生产者 和 消费者 可以各自处理自己的任务而不相互干扰,有利于两方的并发处理,这篇文章我们就来讨论下如何使用 System.Threading.Channels。
要想使用 生产者-消费者 场景,除Channel外还有几种实现途径,比如:BlockingCollection 和 TPL Dataflow。
为什么要使用 Channel
可以利用 Channels 来实现 生产者和消费者 之间的解耦,大体上有两个好处:
生产者 和 消费者 是相互独立的,两者可以并行执行。
如果生产者不给力,可以创建多个的生产者,如果消费者不给力,可以创建更多的消费者。
总的来说,在 生产者-消费者 模式下可以帮助我们提高应用程序的吞吐率。
创建 channel
本质上来说,你可以创建两种类型的 channel
,一种是有限容量的 bound channel
,一种是无限容量的 unbound channel
,接下来的问题是,如何创建呢?Channels 提供了两种 工厂方法 用于创建,如下代码所示:
- CreateBounded
创建的 channel 是一个有消息上限的通道。 - CreateUnbounded
创建的 channel 是一个无消息上限的通道。
下面的代码片段展示了如何创建 unbounded channel,并且只能存放 string 类型。
static void Main(string[] args)
{
var channel = Channel.CreateUnbounded<string>();
}
对了,Bounded channel
还提供了一个 FullMode
属性,用于指定当 channel
已满时该如何对插入的 message
进行处理,通常有四种做法。
- Wait
- DropWrite
- DropNewest
- DropOldest
下面的代码片段展示了如何在 Bounded channel 上使用 FullMode。
static void Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
}
将消息写入到 channel
要想将 message
写入到 channel
,可以使用 WriteAsync()
方法,如下代码所示:
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
await channel.Writer.WriteAsync("Hello World!");
}
从 channel 中读取消息
要想从 channel
中读取 message
,可以使用 ReadAsync()
,如下代码所示:
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var message))
{
Console.WriteLine(message);
}
}
}
System.Threading.Channels 例子
下面是完整的代码清单,展示了如何从 channel 中读写 message。
class Program
{
// 创建一个 Channel 对象
private static Channel<int> channel = Channel.CreateUnbounded<int>();
static async Task Main()
{
// 启动一个新线程来写入数据到 Channel
Task.Run(WriteDataToChannel).ConfigureAwait(false);
// 启动一个新线程来从 Channel 读取数据
await Task.Run(ReadDataFromChannel);
Console.ReadKey();
}
static void WriteDataToChannel()
{
for (int i = 0; i < 10; i++)
{
channel.Writer.TryWrite(i);
Thread.Sleep(1000); // 模拟耗时操作
}
channel.Writer.Complete(); // 通知读取者数据已写入完毕
}
static async Task ReadDataFromChannel()
{
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out int data))
{
Console.WriteLine($"Read data: {data}"); // 处理读取到的数据
}
}
}
}
消费者的“订阅”主要是由以下几种方式来进行的:
1、Channel.Reader.WaitToReadAsync()。这个方法会在当有数据可用时返回true
2、Channel.Reader.ReadAllAsync()。注意这个方法返回的是一个IAsyncEnumerable。IAsyncEnumerable是C#8推出的一种异步流模型。我们可以使用await foreach这种方式来消费异步流。在Channel中,如果Writer没有明确Complete,那么await foreach也会成为一个死循环。所以在使用这个模型前,需要注意。
await foreach (var item in channel.Reader.ReadAllAsync())//async stream,在没有被生产者明确Complete的情况下,这里会一致阻塞下去
{
Console.WriteLine(item);
}
3、Channel.Reader.ReadAsync()。
生产者的“生产”主要是由以下几种方式来进行的:
1、Channel.Writer.WriteAsync(),生产一条消息。
2、Channel.Writer.WaitToWriteAsync()。当有空间可写时,返回一个true。因为Channel有限容类型的Channel,所以这个方法也可以作为一个屏障,当Channel空间已满时,进行阻塞。
3、Channel.Writer.Complete()。发送一个信号,告知Reader消费者,Channel已经不会再发送任何消息了。此时Channel的状态变为closed,如果Reader继续读取信息,则会抛异常:
System.Threading.Channels.ChannelClosedException:“The channel has been closed.”
标签:Writer,笔记,Channels,static,channel,Reader,多线程,Channel
From: https://www.cnblogs.com/fanfan-90/p/17913696.html