首页 > 编程语言 >聊一聊 C#线程池 的线程动态注入 (中)

聊一聊 C#线程池 的线程动态注入 (中)

时间:2024-12-24 13:44:04浏览次数:3  
标签:previousDelayElapsed C# uint PerformBlockingAdjustment 聊一聊 bool private 线程

一:背景

1. 讲故事

上一篇我们用 Thread.Sleep 的方式演示了线程池饥饿场景下的动态线程注入,可以观察到大概 1s 产生 1~2 个新线程,很显然这样的增长速度扛不住上游请求对线程池的DDOS攻击,导致线程池队列越来越大,但C#团队这么优秀,能优化的地方绝对会给大家尽可能的优化,比如这篇我们聊到的 Task.Result 场景下的注入。

二:Task.Result 角度下的动态注入

1. 测试代码

为了直观的体会到优化效果,先上一段测试代码观察一下。


        static void Main(string[] args)
        {
            for (int i = 0; i < 10000; i++)
            {
                ThreadPool.QueueUserWorkItem((idx) =>
                {
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {idx}: 这是耗时任务");

                    try
                    {
                        var client = new HttpClient();
                        var content = client.GetStringAsync("https://youtube.com").Result;
                        Console.WriteLine(content.Length);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }, i);
            }

            Console.ReadLine();
        }

从卦象上来看大概1s产生4个新线程,再仔细看的话大概是250ms一个,虽然250不大好听,但不管怎么说确实比 Thread.Sleep 场景下只产生 1~2 个线程要快了好几倍,以终为始,我们再反向的看下这个优化的底层逻辑在哪?

2. 底层逻辑在哪里

还是那句话,千言万语不抵一张图,流程图大概如下:

接下来解释下其中的几个元素。

  1. NotifyThreadBlocked

这是主动通知 GateThread 线程赶紧醒来,通过上一篇的知识大家应该知道 GateThread 会500ms一次被动唤醒,但为了提速不可能再这么干了,需要让人强制唤醒它,修剪后的源码如下:


    private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken)
    {
        var mres = new SetOnInvokeMres();

        AddCompletionAction(mres, addBeforeOthers: true);

        bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();

        var returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);

        return returnValue;
    }

    public bool NotifyThreadBlocked()
    {
        GateThread.Wake(this);

        return true;
    }

    public static void Wake(PortableThreadPool threadPoolInstance)
    {
        DelayEvent.Set();
    }

卦中的 DelayEvent.Set(); 正是强制唤醒 GateThread 的 event 事件。

  1. HasBlockingAdjustmentDelayElapsed

GateThread 是注入线程的官方通道,那到底要不要注入线程呢?肯定少不了一些判断,其中一个判断就是当前的延迟周期是否超过了 250ms,这个250ms的阈值最终由 BlockingConfig.MaxDelayMs 变量指定,这是能否调用 CreateWorkerThread方法需要闯的一个关口,参考代码如下:


        private static class BlockingConfig
        {
            MaxDelayMs =(uint) AppContextConfigHelper.GetInt32Config(
                        "System.Threading.ThreadPool.Blocking.MaxDelayMs",
                            250,
                            false);
        }

        private static void GateThreadStart()
        {
            while (true)
            {
                bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs));
                currentTimeMs = Environment.TickCount;

                do
                {
                    previousDelayElapsed = delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake);
                    if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary && !previousDelayElapsed)
                    {
                        break;
                    }

                    uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);

                } while (false);
            }
        }

        public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake)
        {
            if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay)
            {
                return true;
            }

            uint elapsedMsSincePreviousBlockingAdjustmentDelay = (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs);
            return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs;
        }

从上面的代码可以看到一旦 previousDelayElapsed =false 就直接 break 了,不再调用PerformBlockingAdjustment 方法来闯第二个关口。

  1. PerformBlockingAdjustment

一旦满足了250ms阈值之后,接下来就需要观察ThreadPool当前的负载能力,由内部的 ThreadCounts 提供支持,比如 NumProcessingWork 表示当前线程池正在处理的任务数, NumThreadsGoal 表示线程不要超过此上限值,如果超过了就进入动态注入阶段,参考代码如下:


    private struct ThreadCounts
    {
        public short NumProcessingWork;
        public short NumExistingThreads;
        public short NumThreadsGoal;
    }

有了这个基础之后,接下来再上一段注入线程需要满足的第二个关口。


        private static void GateThreadStart()
        {
            uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);
        }

        private uint PerformBlockingAdjustment(bool previousDelayElapsed)
        {
            var nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);

            if (addWorker)
            {
                WorkerThread.MaybeAddWorkingWorker(this);
            }
            return nextDelayMs;
        }

        private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker)
        {
            if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)
            {
                addWorker = true;
            }
        }

从卦中代码可以看到,一旦线程池中 处理的任务数 >= 线程上限值,这就表示当前线程池正在满负荷的跑,numRequestedWorkers>0 表示有新任务来了需要线程来处理,所以这两组条件一旦满足,就必须要创建新线程。

3. 如何眼见为实

刚才啰嗦了那么多,那如何眼见为实呢?非常简单,还是用 dnspy 的断点日志功能观察,我们下三个断点。

  1. 第一个条件 HasBlockingAdjustmentDelayElapsed 处增加 1. {!wasSignaledToWake} {this._adjustForBlockingAfterNextDelay}, 延迟时间:{currentTimeMs - this._previousBlockingAdjustmentDelayStartTimeMs} ,上一次延迟:{_previousBlockingAdjustmentDelayMs}

  1. 第二个条件 PerformBlockingAdjustment 处增加 2. 正在处理任务数:{threadCounts.NumProcessingWork} ,合适线程数:{num},是否要新增线程:{this._separated.numRequestedWorkers>0}

  1. 线程创建 WorkerThread.CreateWorkerThread 处增加 3. 已成功创建线程

最后把程序跑起来,观察 output窗口 的结果,非常清爽,吉卦。

三:总结

采用主动通知的方式唤醒GateThread可以让每秒线程注入数由原来的 1~2 个提升到 4 个,虽然有所优化,但面对上游洪水猛兽般的请求,很显然也是杯水车薪,最终还是酿成了线程饥饿的悲剧,下一篇我们继续研究如何让线程注入的快一点,再快一点。。。
图片名称

标签:previousDelayElapsed,C#,uint,PerformBlockingAdjustment,聊一聊,bool,private,线程
From: https://www.cnblogs.com/huangxincheng/p/18627222

相关文章

  • 实验六c++
    实验任务四源代码Vector.hpp1#include<iostream>2#include<stdexcept>3usingnamespacestd;45template<typenameT>6classVector{7public:8Vector(intn);9Vector(intn,Ta);10Vector(constVector<T>&......
  • 10款装了不后悔的高效PC软件,桌面便签、日历、录屏、搜索……
    在使用Win电脑办公时,一些简单高效的工具软件可以让工作事半功倍!今天就来介绍10款装了不后悔的软件!1、浏览器:夸克电脑浏览器,除了电脑自带的edge,还可以试试夸克,它有自带的网盘,可以保存重要的资料,在手机端也可以同步使用。2、桌面便签+待办+日历:敬业签在电脑桌面上随手记录工作......
  • centos7安装openldap出现的问题
    出现错误systemctlstatusslapd.service●slapd.service-OpenLDAPServerDaemonLoaded:loaded(/usr/lib/systemd/system/slapd.service;enabled;vendorpreset:disabled)Active:failed(Result:exit-code)since二2024-12-2409:24:07CST;6sagoDocs:man:......
  • How to sync shopify customer to odoo erp
    AboutShopifyShopifyisapopulare-commerceplatformthatallowsyoutocreateanonlinestoreandsellproductsonline.Itprovidesawiderangeoffeaturesandtoolstohelpyoumanageyourstore,includinginventorymanagement,orderprocessing,and......
  • electron 项目搭建
    安装electron1.初始化配置文件npminit#entrypoint应为main.js.2.安装electronnpminstall--save-develectron3.在package.json中添加启动命令"start":"electron."main.jsconstcreateWindow=()=>{constwin=newBrowserWindow({width:......
  • C语言-详细讲解-动态数组统计成绩
    1.题目要求用动态数组编程输入任意m个班学生(每班n个学生)的某门课的成绩,计算最高分,并指出具有该最高分成绩的学生是第几个班的第几个学生。其中,m和n的值由用户从键盘任意输入(不限定m和n的上限值)。输入提示信息:"Inputarraysizem,n:""Input%d*%darray:\n"输入m,n的格式......
  • 【Leetcode 每日一题】1705. 吃苹果的最大数目
    问题背景有一棵特殊的苹果树,一连nnn天,每天都可以长出若干个苹果。在第ii......
  • 【Leetcode 热题 100】994. 腐烂的橘子
    问题背景在给定的m×nm\timesnm×n网格g......
  • 深入理解 Java 中的并发编程:线程池与锁的使用
    深入理解Java中的并发编程:线程池与锁的使用在现代应用程序开发中,特别是在高并发场景下,如何高效地管理多个线程成为一个关键问题。Java提供了多种方式来处理并发编程,其中线程池和锁机制是最常用的两种方法。本文将深入探讨Java中的并发编程,重点讲解线程池的使用和锁......
  • YOLOv8改进目录一览 | 涉及卷积层、轻量化、注意力、损失函数、Backbone、SPPF、Neck
    必读内容......