首页 > 其他分享 >.NET Core 线程池(ThreadPool)底层原理浅谈

.NET Core 线程池(ThreadPool)底层原理浅谈

时间:2024-11-25 17:12:42浏览次数:9  
标签:Core workItem Task 浅谈 int ThreadPool tl 线程

简介

上文提到,创建线程在操作系统层面有4大无法避免的开销。因此复用线程明显是一个更优的策略,切降低了使用线程的门槛,提高程序员的下限。

.NET Core线程池日新月异,不同版本实现都有差别,在.NET 6之前,ThreadPool底层由C++承载。在之后由C#承载。本文以.NET 8.0.8为蓝本,如有出入,请参考源码.

ThreadPool结构模型图

image

眼见为实

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs
上源码

internal sealed partial class ThreadPoolWorkQueue
{
        internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();//全局队列
        internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();//高优先级队列,比如Timer产生的定时任务
        internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
            s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;//低优先级队列,比如回调
        internal readonly ConcurrentQueue<object>[] _assignableWorkItemQueues =
            new ConcurrentQueue<object>[s_assignableWorkItemQueueCount];//CPU 核心大于32个,全局队列会分裂为好几个,目的是降低CPU核心对全局队列的锁竞争
}

ThreadPool生产者模型

image

眼见为实

        public void Enqueue(object callback, bool forceGlobal)
        {
            Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));

            if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled())
                FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);

#if CORECLR
            if (s_prioritizationExperiment)//lowPriorityWorkItems目前还是实验阶段,CLR代码比较偷懒,这一段代码很不优雅,没有连续性。
            {
                EnqueueForPrioritizationExperiment(callback, forceGlobal);
            }
            else
#endif
            {
                ThreadPoolWorkQueueThreadLocals? tl;
                if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
                {
                    tl.workStealingQueue.LocalPush(callback);//如果没有特殊情况,默认加入本地队列
                }
                else
                {
                    ConcurrentQueue<object> queue =
                        s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
                            ? tl.assignedGlobalWorkItemQueue//CPU>32 加入分裂的全局队列
                            : workItems;//CPU<=32 加入全局队列
                    queue.Enqueue(callback);
                }
            }

            EnsureThreadRequested();
        }

细心的朋友,会发现highPriorityWorkItems的注入判断哪里去了?目前CLR对于高优先级队列只开放给内部,比如timer/Task使用

ThreadPool消费者模型

image

眼见为实

public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
        {
            // Check for local work items
            object? workItem = tl.workStealingQueue.LocalPop();
            if (workItem != null)
            {
                return workItem;
            }

            // Check for high-priority work items
            if (tl.isProcessingHighPriorityWorkItems)
            {
                if (highPriorityWorkItems.TryDequeue(out workItem))
                {
                    return workItem;
                }

                tl.isProcessingHighPriorityWorkItems = false;
            }
            else if (
                _mayHaveHighPriorityWorkItems != 0 &&
                Interlocked.CompareExchange(ref _mayHaveHighPriorityWorkItems, 0, 1) != 0 &&
                TryStartProcessingHighPriorityWorkItemsAndDequeue(tl, out workItem))
            {
                return workItem;
            }

            // Check for work items from the assigned global queue
            if (s_assignableWorkItemQueueCount > 0 && tl.assignedGlobalWorkItemQueue.TryDequeue(out workItem))
            {
                return workItem;
            }

            // Check for work items from the global queue
            if (workItems.TryDequeue(out workItem))
            {
                return workItem;
            }

            // Check for work items in other assignable global queues
            uint randomValue = tl.random.NextUInt32();
            if (s_assignableWorkItemQueueCount > 0)
            {
                int queueIndex = tl.queueIndex;
                int c = s_assignableWorkItemQueueCount;
                int maxIndex = c - 1;
                for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
                {
                    if (i != queueIndex && _assignableWorkItemQueues[i].TryDequeue(out workItem))
                    {
                        return workItem;
                    }
                }
            }

#if CORECLR
            // Check for low-priority work items
            if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem))
            {
                return workItem;
            }
#endif

            // Try to steal from other threads' local work items
            {
                WorkStealingQueue localWsq = tl.workStealingQueue;
                WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
                int c = queues.Length;
                Debug.Assert(c > 0, "There must at least be a queue for this thread.");
                int maxIndex = c - 1;
                for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
                {
                    WorkStealingQueue otherQueue = queues[i];
                    if (otherQueue != localWsq && otherQueue.CanSteal)
                    {
                        workItem = otherQueue.TrySteal(ref missedSteal);
                        if (workItem != null)
                        {
                            return workItem;
                        }
                    }
                }
            }

            return null;
        }

什么是线程饥饿?

线程饥饿(Thread Starvation)是指线程长时间得不到调度(时间片),从而无法完成任务。

  1. 线程被无限阻塞
    当某个线程获取锁后长期不释放,其它线程一直在等待
  2. 线程优先级降低
    操作系统锁竞争中,高优先级线程,抢占低优先级线程的CPU时间
  3. 线程在等待
    比如线程Wait/Result时,线程池资源不够,导致得不到执行

眼见为实

@一线码农 使用大佬的案例
https://www.cnblogs.com/huangxincheng/p/15069457.html
https://www.cnblogs.com/huangxincheng/p/17831401.html

ThreadPool如何改善线程饥饿

CLR线程池使用爬山算法来动态调整线程池的大小来来改善线程饥饿的问题。
本人水平有限,放出地址,有兴趣的同学可以自行研究
https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs

ThreadPool如何增加线程

在 PortableThreadPool 中有一个子类叫 GateThread,它就是专门用来增减线程的类

其底层使用一个while (true) 每隔500ms来轮询线程数量是否足够,以及一个AutoResetEvent来接收注入线程Event.
如果不够就新增

《CLR vir C#》 一书中,提过一句 CLR线程池每秒最多新增1~2个线程。结论的源头就是在这里
注意:是线程池注入线程每秒1~2个,不是每秒只能创建1~2个线程。OS创建线程的速度块多了。

眼见为实

https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs
image
image

眼见为实

        static void Main(string[] args)
        {
            for (int i = 0;i<=100000;i++)
            {
                ThreadPool.QueueUserWorkItem((x) =>
                {
                    Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}");
                    Thread.Sleep(int.MaxValue);
                });
            }

            Console.ReadLine();
        }

可以观察输出,判断是不是每秒注入1~2个线程

Task

不用多说什么了吧?

Task的底层调用模型图

image
Task的底层实现主要取决于TaskSchedule,一般来说,除了UI线程外,默认是调度到线程池

眼见为实

Task.Run(() => { { Console.WriteLine("Test"); } });

其底层会自动调用Start(),Start()底层调用的TaskShedule.QueueTask().而作为实现类ThreadPoolTaskScheduler.QueueTask底层调用如下。
image

可以看到,默认情况下(除非你自己实现一个TaskShedule抽象类).Task的底层使用ThreadPool来管理。

有意思的是,对于长任务(Long Task),直接是用一个单独的后台线程来管理,完全不参与调度。

Task对线程池的优化

既然Task的底层是使用ThreadPool,而线程池注入速度是比较慢的。Task作为线程池的高度封装,有没有优化呢?
答案是Yes
当使用Task.Result时,底层会调用InternalWaitCore(),如果Task还未完成,会调用ThreadPool.NotifyThreadBlocked()来通知ThreadPool当前线程已经被阻塞,必须马上注入一个新线程来代替被阻塞的线程。
相对每500ms来轮询注入线程,该方式采用事件驱动,注入线程池的速度会更快。

眼见为实

点击查看代码
        static void Main(string[] args)
        {
            var client = new HttpClient();

            for(int i = 0; i < 100000; i++)
            {
                ThreadPool.QueueUserWorkItem(x =>
                {
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {x}: 这是耗时任务");
                    try
                    {
                        var content = client.GetStringAsync("https://youtube.com").Result;
                        Console.WriteLine(content);
                    }
                    catch (Exception)
                    {

                        throw;
                    }

                });
            }

            Console.ReadLine();
        }

image
image

其底层通过AutoResetEvent来触发注入线程的Event消息

结论

多用Task,它更完善。对线程池优化更好。没有不使用Task的理由

标签:Core,workItem,Task,浅谈,int,ThreadPool,tl,线程
From: https://www.cnblogs.com/lmy5215006/p/18566995

相关文章

  • GaussDB数据库SQL系列-SQL与ETL浅谈
    一、前言在SQL语言中,ETL(抽取、转换和加载)是一种用于将数据从源系统抽取到目标系统的过程。ETL过程通常包括三个阶段:抽取(Extract)、转换(Transform)和加载(Load)。但这些其实都脱离不了数据库系统,本节从GaussDB数据库生态出发,给大家简单讲一下SQL与ETL的过程与关系。二、SQL与ETL的......
  • 边缘计算与MEC浅谈
    本文分享自天翼云开发者社区《边缘计算与MEC浅谈》,作者:y****n一、什么是边缘计算边缘计算是在靠近物或数据源头的网络边缘侧,通过融合网络、计算、存储、应用核心能力的分布式开放平台,就近提供边缘智能服务。简单点讲,边缘计算是将从终端采集到的数据,直接在靠近数据产生的本地设备......
  • Vue3+Typescript+Axios+.NetCore实现导出Excel文件功能
    前端代码//导出ExcelconstexportMaintenanceOrderSettlementItemExcelClick=async()=>{leturl=`${VITE_APP_API_URL}/api/app/maintenance/settlement-service-item/${currentMaintenanceOrderId.value}/${currentMaintenanceOrderSettlementRow.value.id}`;......
  • 浅谈软件开发中的yield关键字:从餐厅服务理解异步编程之美
    在现代软件开发中,处理大量数据流时经常会遇到性能和内存消耗的问题。传统的编程方式往往是一次性获取所有数据,这就像餐厅厨师要把所有菜品做完才上菜一样,既不高效也不够灵活。而yield关键字的出现,为我们提供了一种优雅的解决方案。我们可以把yield机制类比成一家高效运转的餐......
  • CCX/CORE/L1/L2/L3之定义及关系
    在CPU中,CCX(ComputeComplex)、核心(Core)以及L1、L2和L3缓存是不同层次的内存层次结构和处理单元。1.CCX(ComputeComplex):CCX是AMDRyzen处理器架构中的一个基本组成单元。它由多个CPU核心组成,并共享一定的缓存和其他资源。每个CCX包含一组核心和共享的L3缓存。不同的CCX之间可能存......
  • 【大数据学习 | Spark-Core】广播变量和累加器
    1.共享变量Spark两种共享变量:广播变量(broadcastvariable)与累加器(accumulator)。累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象,相当于semijoin中的DistributedCache。共享变量出现的原因:我们传递给Spark的函数,如map(),或者filter()......
  • 【大数据学习 | Spark-Core】Spark的kryo序列化
    1.前言由于大多数Spark计算的内存性质,Spark程序可能会受到集群中任何资源(CPU,网络带宽或内存)的瓶颈。通常,如果内存资源足够,则瓶颈是网络带宽。数据序列化,这对于良好的网络性能至关重要。在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作。比如......
  • 【大数据学习 | Spark-Core】RDD的五大特性(包含宽窄依赖)
    分析一下rdd的特性和执行流程Alistofpartitions存在一系列的分区列表Afunctionforcomputingeachsplit每个rdd上面都存在compute方法进行计算AlistofdependenciesonotherRDDs每个rdd上面都存在一系列的依赖关系Optionally,aPartitionerforkey-valueRDDs......
  • 在 ASP.NET Core 中创建 gRPC 客户端和服务器
    前言gRPC是一种高性能、开源的远程过程调用(RPC)框架,它基于ProtocolBuffers(protobuf)定义服务,并使用HTTP/2协议进行通信。新建项目新建解决方案GrpcDemo新建webapi项目GrpcServer作为grpc服务端项目添加包<PackageReferenceInclude="Grpc.AspNetCore"Version="2.67.......
  • 《链表算法:浅谈并实现一下链表各种排序算法及其性能》
    前置知识数据结构-链表数组排序算法:选择排序[解法1],归并排序递归版[解法2],归并排序迭代法[解法3最优解][归并部分基础]合并两个有序链表如果您不满足于此,笔者也提供冒泡排序,插入排序,快速排序的链表写法。以及,我们会在下文讨论为什么不说明希尔排序,堆排序,因为两者不适合......