首页 > 其他分享 >如何使用 Channel 类来创建一个生产者-消费者模型

如何使用 Channel 类来创建一个生产者-消费者模型

时间:2024-05-25 22:56:12浏览次数:22  
标签:Task 消费者 生产者 await channel 类来 var Channel

如何使用 Channel 类来创建一个生产者-消费者模型

.NET 中 Channel 类简单使用

 

Channel 是干什么的

The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations.
Channels are an implementation of the producer/consumer conceptual programming model.
以上是微软官方的解释 channels。用中文说的话就是这个类提供了在生产者跟消费者之间异步传统数据的能力,简单来说可以认为是一个内存消息队列。

示例 1

下面是一个简单的示例,说明如何使用 Channel 类来创建一个生产者-消费者模型:

    static async Task Main(string[] args)
    {
        var channel = Channel.CreateUnbounded<int>();

        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i);
                await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
            }

            channel.Writer.Complete();
        });

        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者接收到: {item}");
            }
        });

        await Task.WhenAll(producer, consumer);
    }

在这个例子中,我们创建了一个无界的通道,然后创建了两个任务,一个是生产者,一个是消费者。生产者每秒生成一个数字,然后写入通道。消费者从通道中读取数据并打印出来。当生产者完成写入后,它会调用 channel.Writer.Complete() 来通知消费者没有更多的数据可以读取。

示例 2

你可以使用 Channel.CreateBounded(capacity) 方法来创建一个有界的通道,其中 capacity 参数指定了通道的容量。当通道满时,尝试写入的操作将会阻塞,直到有空间可用。

    static async Task Main(string[] args)
    {
        var channel = Channel.CreateBounded<int>(5); // 创建一个容量为5的有界通道

        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                await channel.Writer.WriteAsync(i);
                Console.WriteLine($"生产者生成了: {i}");
                await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
            }

            channel.Writer.Complete();
        });

        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者接收到: {item}");
                await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
            }
        });

        await Task.WhenAll(producer, consumer);
    }

在这个例子中,我们创建了一个容量为5的有界通道。生产者每秒生成一个数字,然后写入通道。消费者从通道中读取数据并打印出来,但消费者处理数据的速度比生产者慢,所以当通道满时,生产者的 WriteAsync 操作将会阻塞,直到消费者读取了一些数据,使得通道有空间可用。

示例 3

下面是一个示例,展示了如何在多个生产者和消费者之间共享一个通道:

    static async Task Main(string[] args)
    {
        var channel = Channel.CreateUnbounded<int>();

        // 创建两个生产者
        var producer1 = Produce(channel.Writer, id: 1);
        var producer2 = Produce(channel.Writer, id: 2);

        // 创建两个消费者
        var consumer1 = Consume(channel.Reader, id: 1);
        var consumer2 = Consume(channel.Reader, id: 2);

        // 等待所有生产者和消费者完成
        await Task.WhenAll(producer1, producer2, consumer1, consumer2);
    }

    static async Task Produce(ChannelWriter<int> writer, int id)
    {
        for (int i = 0; i < 10; i++)
        {
            await writer.WriteAsync(i);
            Console.WriteLine($"生产者{id}生成了: {i}");
            await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
        }

        writer.Complete();
    }

    static async Task Consume(ChannelReader<int> reader, int id)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            Console.WriteLine($"消费者{id}接收到: {item}");
            await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
        }
    }

在这个例子中,我们创建了两个生产者和两个消费者,它们都共享同一个通道。这是一个非常重要使用模式。因为当我们使用消息队列的时候往往会有多个生产者跟多个消费者。我们可以通过控制生产者生产的速度来控制推入队列的数据量。我们还可以通过控制消费者的数量来控制消费数据的速度,从而来调节系统的流量,达到消峰填谷的作用。

总结

Channel 类是 .NET CORE 3.0 后新加入的类。为我们提供了便利的生产者/消费者模式实现方案。相当于是一个进程内的内存队列,而且它没有持久化,纯内存操作,性能是非常非常高的。当我们面对真正的高并发的时候可以为我们的系统提供吞吐量。当然代价是内存跟放弃一些实时性。

标签:Task,消费者,生产者,await,channel,类来,var,Channel
From: https://www.cnblogs.com/Leo_wl/p/18213121

相关文章

  • Netty ChannelHandler的生命周期
    ChannelHandler方法的执行是有顺序的,而这个执行顺序可以被称为ChannelHandler的生命周期。 LifeCyCleTestHandlerimportio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.channel.ChannelHandlerContext;publicclassLifeCyCleTestHandlerextendsChan......
  • 互斥锁,IPC机制,队列,生产者消费者模型
    Ⅰ互斥锁【一】什么是互斥锁互斥锁其实就是一种锁。为当前进程或线程添加额外的限制限制当前时间段只能由当前进程使用,当前进程使用完成后才能其他进程继续使用其作用是保证在同一时刻只有一个线程在访问共享资源,从而避免多个线程同时读写数据造成的问题。互斥锁的基本原......
  • 生产者和消费者模型
    进程间通信(IPCinter-processcommunication)如何实现进程间通信将消息放入队列中,由另一个进程从另一个队列中取出这种通信方式是非阻塞的,发送进程不需要等待接收进程的相应就可执行multiprocessing有两种形式通信:队列、管道管道stdinstdoutstderr队列管道+......
  • .NET 中 Channel 类(内存级消息队列)简单使用
    Channel是干什么的#TheSystem.Threading.Channelsnamespaceprovidesasetofsynchronizationdatastructuresforpassingdatabetweenproducersandconsumersasynchronously.Thelibrarytargets.NETStandardandworksonall.NETimplementations.Channelsa......
  • 【Halcon】示例程序学习——append_channel / tile_channels
    Name:1、append_channel——将其他矩阵(通达)附加到图像2、tile_channels——多张图像平铺成一个大图像signature:1、append_channel(MultiChannelImage,Image:ImageExtended::)2、tile_channels(Image:TiledImage:NumColumns,TileOrder:)Description:1、运算符ap......
  • 生产者与消费者模式
    importthreadingfromqueueimportQueuefromrandomimportchoicedealList=["红烧猪蹄","卤鸡爪","酸菜鱼","糖醋里脊","九转大肠","阳春面","烤鸭","烧鸡","剁椒鱼头","酸汤肥牛",&quo......
  • go channel ->同步
    通道并非用来取代锁,各有不同使用场景。通道解决高级别逻辑层次并发架构,锁则用来保护低级别局部代码安全。●竟态条件:多线程同时读写共享资源(竟态资源)。●临界区:读写竟态资源的代码片段。●互斥锁:同一时刻,只有一个线程能进入临界区。●读写锁:写独占(其他读写均被阻塞),读共享。●信号......
  • golang channel 封装
    对于closed或nil通道,规则如下:无论收发,nil通道都会阻塞。不能关闭nil通道。重复关闭通道,引发panic!向已关闭通道发送数据,引发panic!从已关闭通道接收数据,返回缓冲数据或零值。nil通道是指没有make的变量。鉴于通道关闭后,所有基于此的阻塞都被解除,可用作通知。没......
  • 手写消费者生产者
    生产者importjava.util.List;importjava.util.Random;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.ReentrantLock;publicclassProducerThreadimplementsRunnable{privateStringthreadName;privateReentrantLoc......
  • Go语言系列——Go协程、信道(channel)、缓冲信道和工作池、Select、Mutex、结构体取代类
    文章目录21-Go协程Go协程是什么?Go协程相比于线程的优势如何启动一个Go协程?启动多个Go协程22-信道(channel)什么是信道?信道的声明通过信道进行发送和接收发送与接收默认是阻塞的信道的代码示例信道的另一个示例死锁单向信道关闭信道和使用forrange遍历信道23-缓冲信......