using System.Threading.Channels; namespace ChannelDemo { public class ChannelMgr { // 优势 // 允许开发者根据需要创建具有固定容量(有界)或无限容量(无界)的通道 // static Channel<string> channel = Channel.CreateBounded<string>(10); // 创建一个有界的通道,最大容量10条待处理消息(当通道满时,生产者将被阻塞,直到有空间可用。这种流控制可以对生产者进行阻塞,防止生产者过载,保护消费者不被淹没) //static Channel<string> channel = Channel.CreateUnbounded<string>(); // 创建一个无界通道,允许无限量的数据写入,直到系统的可用内存被耗尽 //多种容量控制策略:对于有界通道,提供了Wait、DropNewest、DropOldest、DropWrite等多种策略来处理当通道达到容量上限时的情况 static async Task Producer() { for (int i = 0; i < 30; i++) { string message = $"Message {i} -{DateTime.Now.ToString()}"; await channel.Writer.WriteAsync(message); // 异步写入通道 Console.WriteLine($"Produced: {message}"); await Task.Delay(100); // 模拟生产时间 } //channel.Writer.Complete(); // 标记通道为完成,不再写入数据,配合channel.Reader.WaitToReadAsync()->false,结束消费任务 //Console.WriteLine("Write End"); } static async Task Consumer() { while (await channel.Reader.WaitToReadAsync()) { await foreach (var message in channel.Reader.ReadAllAsync()) // 异步读取所有消息 { Console.WriteLine($"Consumed: {message}"); await Task.Delay(500); // 模拟处理时间 } await Task.Delay(10); } Console.WriteLine("Read End"); } public static async Task StartConsumer() { // 启动消费者任务 Consumer(); } public static async Task StartProducer() { // 启动生产者任务 Producer(); } } }
标签:Task,NetCore,生产者,await,static,Channel,message,channel From: https://www.cnblogs.com/chen1880/p/18421903