首页 > 其他分享 >BlockingCollection 使用

BlockingCollection 使用

时间:2024-08-05 17:07:03浏览次数:8  
标签:Task BlockingCollection System collection 使用 集合 using

创建 BlockingCollection 的实例时通过构造函数指定最大容量,从而限制集合中项目的数量。
BlockingCollection 的最大容量设置为 5,这意味着生产者在尝试添加超过 5 个项目时会被阻塞,直到集合中有空间。消费者从集合中取出项目并处理它们,从而为生产者腾出空间。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        int maxCapacity = 5;
        BlockingCollection<int> collection = new BlockingCollection<int>(maxCapacity);

        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                collection.Add(i);
                Console.WriteLine($"Produced: {i}");
            }
            collection.CompleteAdding();
        });

        Task consumer = Task.Run(() =>
        {
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consumed: {item}");
                Thread.Sleep(500); // Simulate some work
            }
        });

        Task.WaitAll(producer, consumer);
    }
}

BlockingCollection.IsCompleted 属性用于指示集合是否已经完成添加并且是否已经消耗了所有数据。换句话说,它返回 true 的条件是集合已经调用了 CompleteAdding() 并且集合为空。因此,IsCompleted 包含了集合为空的判断。

如果你想判断是否已经调用了 CompleteAdding() 方法,但集合中仍然有值,你可以使用 BlockingCollection.IsAddingCompleted 属性。这两个属性的区别在于:

IsAddingCompleted:当调用了 CompleteAdding() 方法后,此属性将返回 true,即使集合中仍然有未消费的元素。
IsCompleted:当调用了 CompleteAdding() 方法且集合为空时,此属性将返回 true

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        BlockingCollection<int> collection = new BlockingCollection<int>();

        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 5; i++)
            {
                collection.Add(i);
                Console.WriteLine($"Produced: {i}");
            }
            collection.CompleteAdding();
        });

        Task consumer = Task.Run(() =>
        {
            while (!collection.IsCompleted)
            {
                if (collection.TryTake(out int item, TimeSpan.FromSeconds(1)))
                {
                    Console.WriteLine($"Consumed: {item}");
                }
                else if (collection.IsAddingCompleted)
                {
                    Console.WriteLine("Adding is completed but items still remain.");
                }
            }
            Console.WriteLine("Collection is completed and empty.");
        });

        Task.WaitAll(producer, consumer);
    }
}

在这个示例中,消费者任务在一个循环中检查 collection.IsCompleted 属性。如果集合未完成,尝试从集合中取出一个项目(带超时的 TryTake)。如果 TryTake 返回 false,并且 IsAddingCompleted 返回 true,这意味着已经调用了 CompleteAdding(),但集合中仍有未消费的元素。

当集合完成且为空时,collection.IsCompleted 将返回 true,消费者任务将输出 "Collection is completed and empty." 并结束。

BlockingCollection.TryTake 方法具有两种不同的行为,取决于是否提供了超时参数:

无参数的 TryTake 方法:

不会阻塞。如果集合中没有项,立即返回 false。
带有超时参数的 TryTake 方法:

可以阻塞。它会等待指定的超时期限以尝试从集合中获取一个项。在此期间,如果集合变得非空,则返回该项;否则,在超时期限过后返回 false。

由于 BlockingCollection 是线程安全的,多个线程可以安全地并发访问集合,无需担心多线程问题。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        BlockingCollection<int> collection = new BlockingCollection<int>(10); // 设置容量限制为10

        // Producer tasks
        Task producer1 = Task.Run(() => Produce(collection, 0));
        Task producer2 = Task.Run(() => Produce(collection, 100));

        // Consumer tasks
        Task consumer1 = Task.Run(() => Consume(collection));
        Task consumer2 = Task.Run(() => Consume(collection));

        Task.WaitAll(producer1, producer2, consumer1, consumer2);
    }

    static void Produce(BlockingCollection<int> collection, int start)
    {
        for (int i = start; i < start + 10; i++)
        {
            collection.Add(i);
            Console.WriteLine($"Produced: {i}");
            Thread.Sleep(100); // 模拟一些工作
        }
        collection.CompleteAdding();
    }

    static void Consume(BlockingCollection<int> collection)
    {
        foreach (var item in collection.GetConsumingEnumerable())
        {
            Console.WriteLine($"Consumed: {item}");
            Thread.Sleep(150); // 模拟一些工作
        }
        Console.WriteLine("Collection is completed and empty.");
    }
}

通过这种方式,你可以在多个线程中安全地使用 Add 和 Take 操作,并且 BlockingCollection 会处理所有必要的同步问题

标签:Task,BlockingCollection,System,collection,使用,集合,using
From: https://www.cnblogs.com/JosenEarth/p/18343605

相关文章

  • 使用普罗米修斯API统计服务宕机时间
    使用/api/v1/query?query=mysql_global_status_uptime&time=查询服务上次启动时间使用/api/v1/query_rangeStringquery="mysql_global_status_uptime{instance=\""+startDTO.getInstance()+"\"}";查询某段时间内服务的数据变化通过60秒和1秒的step来分析出服务......
  • 使用Python 和 Selenium 抓取 酷狗 音乐专辑 附源码
    在这篇博客中,我将分享如何使用Python和Selenium抓取酷狗音乐网站上的歌曲信息。我们将使用BeautifulSoup解析HTML内容,并提取歌曲和专辑信息。依赖库requestsbeautifulsoup4selenium准备工作首先,我们需要安装一些必要的库:pipinstallrequestsbeautifulsoup4selenium......
  • 使用 Python和PyQt5 打造 你的专属文件查询工具! 附源码
    本文将介绍如何使用Python和PyQt5创建一个简单的文件查询工具。该工具允许用户选择一个目录,并在该目录中搜索特定的文件。依赖库首先,确保你已经安装了PyQt5库:pipinstallPyQt5步骤第一步:导入库我们需要导入必要的库,包括sys、os和PyQt5。importsysimportosfromP......
  • 了解 Databricks 文件系统 (DBFS) 中的文件访问与使用 Python 和 Spark 的卷的比较
    我当前正在尝试从Databricks文件系统(DBFS)读取和显示文件,但遇到了问题。这是我使用的代码:file_path="/dbfs/cluster-logs/use_case/default_job_cluster/cluster_id/init_scripts/cluster_id/20240801_proxy-init.sh.stderr.log"withopen(file_path,'r')asfile:......
  • centos下使用阿里云镜像安装docker
    环境:OS:Centos7步骤1:[[email protected]]#yuminstall-yyum-utilsdevice-mapper-persistent-datalvm2Loadedplugins:fastestmirror,langpacksLoadingmirrorspeedsfromcachedhostfileCouldnotretrievemirrorlisthttp://mirrorlist.centos.org/?......
  • Runtime类的使用
    Runtime类的使用得到系统内存的一些信息@TestpublicvoidruntimeInfo(){Runtimeruntime=Runtime.getRuntime();intprocessors=runtime.availableProcessors();longfreeMemory=runtime.freeMemory();longmaxMemory=runtime.maxMemory();......
  • 在 Glue 作业中使用 python3+ 创建 CloudFront 签名 URL
    是否可以使用python3+为GlueJob中S3文件中的一个特定文件创建具有一定时间限制的CloudFront签名URL?我看到可以在Lambda中做到这一点,但在Python文档中找不到任何内容,特别是胶水工作。任何人都可以提供一些提示吗?defload_private_key(key_path):withopen(......
  • 使用react+node调用科大讯飞api实现实时语音听写(流式版)
    前言--踩坑过程一时间心血来潮,想用科大讯飞的api来做一个语音实时转文字,也是走了很多弯路,边写边查边生成,最后算是完成了。功能实现了但是没有做UI。本来想试试光靠不要服务端光靠前端直接调用科大讯飞的api来实现,但是发现太慢了,四五秒才蹦出来一个字。然后没办法,搭建了一个......
  • C# 使用Flurl http请求处理流式响应
    AI对话接口采用流式返回,使用Flurl处理返回的数据流usingFlurl;usingFlurl.Http;[HttpPost]publicasyncTask<string>GetLiushiChatLaw(){//1、请求参数,根据实际情况YourModelrequest=newYourModel();stringallStr="";stringchatLawApiUrl="ht......
  • 【Dynamo】AnyCAD使用Dynamo绘制三维模型(二)——生成序列和范围的几种方式
    说明:Dynamo为开源项目,开源地址:https://github.com/DynamoDS/Dynamo.git本文章使用版本:v3.0.3范围使用Range节点start和end分别表示范围的边界,step表示步长。如下为[1,10]范围内步长为2结果​使用CodeBlock节点在CodeBlock填写如下形式的代码beginning..end..step-si......