首页 > 其他分享 >NetCore Channel-生产者&消费者

NetCore Channel-生产者&消费者

时间:2024-09-20 10:03:05浏览次数:1  
标签:Task NetCore 生产者 await static Channel message channel

using System.Threading.Channels;

namespace ChannelDemo
{
    public class ChannelMgr
    {
        // 优势
        // 允许开发者根据需要创建具有固定容量(有界)或无限容量(无界)的通道
        // 
        static Channel<string> channel = Channel.CreateBounded<string>(10); // 创建一个有界的通道,最大容量10条待处理消息(当通道满时,生产者将被阻塞,直到有空间可用。这种流控制可以对生产者进行阻塞,防止生产者过载,保护消费者不被淹没)
        //static Channel<string> channel = Channel.CreateUnbounded<string>(); // 创建一个无界通道,允许无限量的数据写入,直到系统的可用内存被耗尽

        //多种容量控制策略:对于有界通道,提供了Wait、DropNewest、DropOldest、DropWrite等多种策略来处理当通道达到容量上限时的情况

        static async Task Producer()
        {
            for (int i = 0; i < 30; i++)
            {
                string message = $"Message {i} -{DateTime.Now.ToString()}";
                await channel.Writer.WriteAsync(message); // 异步写入通道  
                Console.WriteLine($"Produced: {message}");
                await Task.Delay(100); // 模拟生产时间  
            }
            //channel.Writer.Complete(); // 标记通道为完成,不再写入数据,配合channel.Reader.WaitToReadAsync()->false,结束消费任务
            //Console.WriteLine("Write End");
        }

        static async Task Consumer()
        {
            while (await channel.Reader.WaitToReadAsync())
            {
                await foreach (var message in channel.Reader.ReadAllAsync()) // 异步读取所有消息  
                {
                    Console.WriteLine($"Consumed: {message}");
                    await Task.Delay(500); // 模拟处理时间  
                }
                await Task.Delay(10);
            }
            Console.WriteLine("Read End");
        }

        public static async Task StartConsumer()
        {
            // 启动消费者任务  
            Consumer();
        }

        public static async Task StartProducer()
        {
            // 启动生产者任务  
            Producer(); 
        }
    }
}

 

标签:Task,NetCore,生产者,await,static,Channel,message,channel
From: https://www.cnblogs.com/chen1880/p/18421903

相关文章

  • Go语言并发编程之Channels详解
    并发编程是Go语言的一大特色,而channel(通道)则是Go语言中用于实现并发的核心工具之一。它源于CSP(CommunicatingSequentialProcesses)的概念,旨在让多个goroutine之间能够高效地进行通信和同步。本文将深入探讨channel的用法、原理和最佳实践,通过丰富的示例代码和详细的解释,帮......
  • Kafka生产者如何实现消息的批量发送?
    ApacheKafka生产者可以实现消息的批量发送,这有助于提高数据传输的效率并减少网络负载。在Kafka中,生产者可以配置几个参数来控制批量发送的行为:batch.size:这个参数指定了生产者批次的大小(以字节为单位)。当生产者收集到一定数量的消息(达到指定的字节数),它会将这些消息......
  • Java多种方式实现 有界缓冲区下的多个生产者、消费者模型 (Semaphore、while+wait+noti
    /**@Author:SongyangJi@ProjectName:[email protected]@Description:*/classProducerThreadextendsThread{intrate;MultiProducerConsumermultiProducerConsumer;publicProducerThread(intrate,MultiProducerConsumermultiProducer......
  • 织梦如何让channelartlist标签支持limit属性
    在织梦CMS(DEDECMS)中,默认情况下channelartlist标签并不支持limit属性。但是,你可以通过修改织梦CMS的核心文件来实现这一功能。以下是详细的步骤:步骤1:备份现有文件在进行任何修改之前,请确保备份相关文件,以防修改失败或出现其他问题。步骤2:修改核心文件定位文件:打......
  • Error while loading conda entry point: anaconda-cloud-auth (cannot import name
    这个错误是由于conda环境中的某些插件或依赖损坏,特别是在conda.plugins.types模块中无法找到ChannelAuthBase。这通常发生在conda安装不完整、升级失败或插件包损坏的情况下。可能的解决方案:1.更新conda首先尝试更新conda,这可以修复一些与依赖相关的问题:condaupdatecon......
  • 论文分享 《Timing Side-channel Attacks and Countermeasures in CPU Microarchitect
    Attack概述传统攻击(CONVENTIONALATTACKS)在传统攻击中,Attacker通常:与Victim共享硬件资源(比如说LLC,BP,Prefetcher等)可以观察,改变微架构状态攻击步骤本文作者将传统攻击分为以下三步,如Fig1所示:定位“漏洞”:该漏洞包括“代码漏洞”(vulnerablecodegadgets),即......
  • NetCore DynamicExpresso 动态表达式使用例子
    Simple.cs简单使用例子usingDynamicExpresso;namespaceDynamicExpressoDemo{classCustomer{publicstringName{get;set;}}publicclassSimple{publicstaticvoidTest(){//返回结果Interpreter......
  • 为什么需要用到channel
    Channel是Go语言中并发编程的核心工具之一,主要用于解决以下问题:1.数据传递和通信在并发编程中,不同的goroutine可能需要交换数据。使用channel可以安全地在goroutine之间传递数据,而无需显式地使用锁。channel提供了类型安全的通信机制,使得数据传输既简洁又安全。2.......
  • 什么是golang中的channel
    在Go语言中,channel是一种用于在goroutine之间进行通信和同步的工具。它允许一个goroutine发送数据到channel,另一个goroutine从channel接收数据,从而实现并发编程中的数据交换。 Channel的关键特性类型安全:每个channel都有一个指定的类型,确保发送到channel的......
  • C# .netcore NPOI库 实现报表的列自适应删减
    实际需求:业务上的一个需求,数据库表A中的B字段存放的是该条数据的一些标签,标签存在两级【即一级标签和二级标签】,现在要是实现将这些标签统计到报表中,一级标签作为表头,二级标签作为填充值。由于之前的报表每增加一个列都需要去数据库表中增加这个字段名称,然后代码中写统计逻辑,这......