首页 > 其他分享 >mormot.core.threads--TSynThreadPool

mormot.core.threads--TSynThreadPool

时间:2024-07-10 11:09:39浏览次数:19  
标签:WINIOCP core USE -- TSynThreadPool mormot 队列 线程 integer

mormot.core.threads--TSynThreadPool

{ ************ 面向服务器进程的线程池 }  

TSynThreadPool = class; // 前向声明TSynThreadPool类

/// 定义了TSynThreadPool所使用的工作线程
TSynThreadPoolWorkThread = class(TSynThread)
protected
    fOwner: TSynThreadPool; // 线程池所有者
    fThreadNumber: integer; // 线程编号
    {$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
    fProcessingContext: pointer; // 正在处理的上下文,受fOwner.fSafe.Lock保护
    fEvent: TSynEvent; // 同步事件
    {$endif USE_WINIOCP}
    procedure NotifyThreadStart(Sender: TSynThread); // 通知线程开始
    procedure DoTask(Context: pointer); // 异常安全地调用fOwner.Task()
public
    /// 初始化线程
    constructor Create(Owner: TSynThreadPool); reintroduce;
  
    /// 终结线程
    destructor Destroy; override;
  
    /// 循环等待并执行挂起的任务,通过调用fOwner.Task()
    procedure Execute; override;
  
    /// 关联的线程池
    property Owner: TSynThreadPool
      read fOwner;
end;

TSynThreadPoolWorkThreads = array of TSynThreadPoolWorkThread; // 线程池工作线程数组类型

/// 一个简单的线程池,用于例如快速处理HTTP/1.0请求
// - 在Windows上通过I/O完成端口实现,或在Linux/POSIX上使用经典的事件驱动方法
TSynThreadPool = class
protected
    {$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
    fSafe: TOSLightLock; // 使用更稳定的锁
    {$endif USE_WINIOCP}
    fWorkThread: TSynThreadPoolWorkThreads; // 工作线程数组
    fWorkThreadCount: integer; // 工作线程数量
    fRunningThreads: integer; // 正在运行的线程数量
    fExceptionsCount: integer; // 异常计数(未在代码中明确使用,但可能用于调试或监控)
    fContentionAbortDelay: integer; // 由于争用而拒绝连接的延迟时间(毫秒)
    fOnThreadTerminate: TOnNotifyThread; // 线程终止通知事件
    fOnThreadStart: TOnNotifyThread; // 线程开始通知事件
    fContentionTime: Int64; // 等待队列可用槽位的总时间(毫秒)
    fContentionAbortCount: cardinal; // 由于争用而中止的任务数
    fContentionCount: cardinal; // 等待队列可用槽位的次数
    fName: RawUtf8; // 线程池名称
    fTerminated: boolean; // 线程池是否已终止
    {$ifdef USE_WINIOCP} // 如果使用Windows I/O完成端口
    fRequestQueue: THandle; // IOCP有其自己的内部队列
    {$else}
    fQueuePendingContext: boolean; // 当所有线程都忙时,是否应维护一个内部队列
    fPendingContext: array of pointer; // 挂起的上下文数组
    fPendingContextCount: integer; // 挂起的上下文数量
    function GetPendingContextCount: integer; // 获取挂起上下文数量的函数
    function PopPendingContext: pointer; // 从挂起上下文数组中弹出一个元素的函数
    function QueueLength: integer; virtual; // 获取队列长度的虚拟函数(可能用于调试)
    {$endif USE_WINIOCP}
    /// 在I/O错误时结束线程
    function NeedStopOnIOError: boolean; virtual;
    /// 在通知后要执行的进程,这是一个抽象方法,需要被子类实现
    procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: pointer); virtual; abstract;
    /// 中止任务的进程
    procedure TaskAbort(aContext: Pointer); virtual;

public
    /// 使用指定的线程数初始化线程池
    // - 抽象的Task()方法将由其中一个线程调用
    // - 一个线程池最多可以关联256个线程
    // - 在Windows上,可以可选地接受一个之前使用Windows重叠I/O(IOCP)打开的aOverlapHandle
    // - 在POSIX上,如果aQueuePendingContext=true,则将挂起的上下文存储到内部队列中,
    //   以便在队列未满时Push()返回true
    {$ifdef USE_WINIOCP}
    constructor Create(NumberOfThreads: integer = 32; aOverlapHandle: THandle = INVALID_HANDLE_VALUE; const aName: RawUtf8 = '');
    {$else}
    constructor Create(NumberOfThreads: integer = 32; aQueuePendingContext: boolean = false; const aName: RawUtf8 = '');
    {$endif USE_WINIOCP}
  
    /// 关闭线程池,释放所有关联的线程
    destructor Destroy; override;
  
    /// 让线程池处理一个指定的任务(作为指针)
    // - 如果没有空闲线程可用,并且Create(aQueuePendingContext=false)被使用,则返回false(调用者稍后应重试)
    // - 如果在Create中aQueuePendingContext为true,或使用了IOCP,则提供的上下文将被添加到内部列表,并在可能时处理
    // - 如果aWaitOnContention默认为false,则在队列满时立即返回
    // - 设置aWaitOnContention=true以等待最多ContentionAbortDelay毫秒并重试将任务排队
    function Push(aContext: pointer; aWaitOnContention: boolean = false): boolean;
    {$ifndef USE_WINIOCP}
  
    /// 在Push()返回false后调用,以查看队列是否确实已满
    // - 如果QueuePendingContext为false,则返回false
    function QueueIsFull: boolean;
  
    /// 如果所有线程都忙时,线程池是否应维护一个内部队列
    // - 作为Create构造函数的参数提供
    property QueuePendingContext: boolean
      read fQueuePendingContext;
    {$endif USE_WINIOCP}
  
    /// 对此线程池中定义的线程的低级访问
    property WorkThread: TSynThreadPoolWorkThreads
      read fWorkThread;
published
    /// 线程池中可用的线程数
    // - 映射Create()参数,即默认为32
    property WorkThreadCount: integer
      read fWorkThreadCount;
  
    /// 当前在此线程池中处理任务的线程数
    // - 范围在0..WorkThreadCount之间
    property RunningThreads: integer
      read fRunningThreads;
  
    /// 由于线程池争用而被拒绝的任务数
    // - 如果此数字很高,请考虑设置更高的线程数,或分析并调整Task方法
    property ContentionAbortCount: cardinal
      read fContentionAbortCount;
  
    /// 由于争用而拒绝连接的延迟时间(毫秒)
    // - 默认为5000,即等待IOCP或aQueuePendingContext内部列表中有空间可用5秒
    // - 在此延迟期间,不接受新的连接(即不调用Accept),以便负载均衡器可以检测到争用并切换到池中的另一个实例,
    //   或直接客户端最终可能会拒绝其连接,因此不会开始发送数据
    property ContentionAbortDelay: integer
      read fContentionAbortDelay write fContentionAbortDelay;
  
    /// 等待队列中可用槽位的总时间(毫秒)
    // - 争用不会立即失败,但会重试直到ContentionAbortDelay
    // - 此处的高数值需要对Task方法进行代码重构
    property ContentionTime: Int64
      read fContentionTime;
  
    /// 线程池等待队列中可用槽位的次数
    // - 争用不会立即失败,但会重试直到ContentionAbortDelay
    // - 此处的高数值可能需要增加线程数
    // - 使用此属性和ContentionTime来计算平均争用时间
    property ContentionCount: cardinal
      read fContentionCount;
  
    {$ifndef USE_WINIOCP}
    /// 当前等待分配给线程的输入任务数
    property PendingContextCount: integer
      read GetPendingContextCount;
    {$endif USE_WINIOCP}
end;

{$M-} // 关闭内存管理消息

const
  // 允许TSynThreadPoolWorkThread堆栈使用最多256 * 2MB = 512MB的RAM
  THREADPOOL_MAXTHREADS = 256;

由于 TSynThreadPool类是一个高度抽象且依赖于特定实现的类(如它可能使用Windows的I/O完成端口或Linux/POSIX的事件驱动机制),编写一个完整的例程代码可能会相当复杂,并且需要模拟或实际实现这些依赖项。然而,我可以提供一个简化的示例,该示例展示了如何创建 TSynThreadPool实例、如何向其推送任务,并如何大致实现 Task方法。

请注意,以下代码是一个高度简化的示例,并不包含所有 TSynThreadPool类定义中的功能,特别是与Windows I/O完成端口或Linux/POSIX事件驱动机制相关的部分。此外,由于 TSynThreadPool是一个假设的类(因为它不是Delphi标准库或广泛认可的第三方库的一部分),我将基于您提供的类定义来编写这个示例。

program TSynThreadPoolDemo;

{$MODE DELPHI}

uses
  SysUtils, Classes; // 引入必要的单元

// 假设TSynThreadPool及其依赖项已经在某个单元中定义
// 这里我们使用一个占位符单元名YourThreadPoolUnit
uses YourThreadPoolUnit;

// 一个简单的任务上下文类(仅作为示例)
type
  TMyTaskContext = record
    Data: Integer;
  end;

// TSynThreadPool的Task方法的实现类
type
  TMyThreadPool = class(TSynThreadPool)
  protected
    procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer); override;
  end;

{ TMyThreadPool }

procedure TMyThreadPool.Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer);
var
  Ctx: PMyTaskContext;
begin
  Ctx := PMyTaskContext(aContext);
  WriteLn('Processing task with data: ', Ctx.Data);
  // 在这里添加处理任务的代码
  // ...
end;

var
  Pool: TMyThreadPool;
  Ctx: TMyTaskContext;
  I: Integer;

begin
  try
    // 创建一个线程池实例,假设我们想要使用4个工作线程
    Pool := TMyThreadPool.Create(4);
    try
      // 模拟向线程池推送一些任务
      for I := 1 to 10 do
      begin
        Ctx.Data := I;
        if not Pool.Push(@Ctx) then
        begin
          // 在这个简化的示例中,我们不会处理Push返回false的情况
          // 在实际应用中,您可能需要等待、重试或将任务放入另一个队列中
          WriteLn('Failed to push task to the pool (this should not happen in this simplified example)');
        end;
      end;

      // 在这个简化的示例中,我们没有等待所有任务完成
      // 在实际应用中,您可能需要等待线程池中的所有任务都完成后再继续
      // ...

    finally
      // 销毁线程池实例,这将释放所有关联的资源
      Pool.Free;
    end;
  except
    on E: Exception do
      WriteLn('An error occurred: ', E.Message);
  end;
  // 保持控制台窗口打开,直到用户按任意键
  WriteLn('Press Enter to exit...');
  ReadLn;
end.

// 注意:由于TSynThreadPool是一个假设的类,并且上述代码没有实现所有细节(如线程池的实际工作线程管理、任务队列等),
// 因此这个示例主要是为了展示如何使用该类(如果它存在的话)的大致结构。
// 在实际应用中,您需要根据TSynThreadPool类的具体实现来调整此代码。

重要说明

  1. 占位符单元:在上述代码中,我使用了 YourThreadPoolUnit作为包含 TSynThreadPool类定义的占位符单元名。在实际应用中,您需要将其替换为包含该类定义的实际单元名。
  2. 任务上下文:我定义了一个简单的 TMyTaskContext记录类型来作为任务的上下文。在实际应用中,您可能需要根据需要定义更复杂的上下文类型。
  3. 错误处理:在 Push方法返回 false的情况下,上述代码仅打印了一条消息,并没有采取任何恢复措施。在实际应用中,您可能需要实现更复杂的错误处理逻辑(如重试、等待或将任务放入另一个队列中)。
  4. 等待任务完成:上述代码没有等待线程池中的所有任务都完成。在实际应用中,您可能需要实现某种形式的等待机制(例如,使用同步事件或计数器)来确保所有任务都已完成后再继续执行后续代码。
  5. 线程池实现:由于 TSynThreadPool是一个假设的类,并且其实现细节(如工作线程的管理、任务队列的实现等)并未在您的类定义中给出,因此上述代码仅提供了一个大致的框架。在实际应用中,您需要根据 TSynThreadPool类的具体实现来调整此代码。

标签:WINIOCP,core,USE,--,TSynThreadPool,mormot,队列,线程,integer
From: https://www.cnblogs.com/hieroly/p/18290282

相关文章

  • ab for windows
    Windows上安装和使用ab压测Apachebench是一款压力测试工具,用于测试http服务请求的性能情况1下载http://httpd.apache.org/docs/current/platform/windows.html#down #管理员模式进入CMD C:>d:  D:\>cdapache  D:\apache>cdApache24  D:\apache\Apache24>c......
  • 组合API-toRefs函数
     使用场景:剥离响应式对象(解构|展开),想使用响应式对象中的多个或者所有属性做为响应式数据。<template><divclass="container"><div>{{name}}</div><div>{{age}}</div><button@click="updateName">修改数据</button></d......
  • Franka Robot setZeroForceTorque 设置零力矩
    在FrankaEmika机器人中,可以使用setZeroForceTorque()函数来设置机器人的零力矩。这个函数可以让机器人保持在零力矩状态,即不施加任何额外的力矩。这种状态下,机器人关节会保持"放松"的状态,可以被外力轻易地移动。以下是一个示例代码:#include<franka/robot.h>intmain()......
  • Docker 学习
    1DockerDocker是一个开源的应用容器引擎。Docker可以将应用程序及其依赖项打包到一个可移植的容器中,这个容器包含了应用程序的代码、运行环境、依赖库、配置文件等必须的资源。通过Docker可以实现快速部署并且隔离环境。无论部署的环境如何,Docker能保证容器中的应用程序都......
  • 重磅来袭!MoneyPrinterPlus一键发布短视频到视频号,抖音,快手,小红书上线了
    MoneyPrinterPlus开源有一段时间了,已经实现了批量短视频混剪,一键生成短视频等功能。有些小伙伴说了,我批量生成的短视频能不能一键上传到视频号,抖音,快手,小红书这些视频平台呢?答案是必须可以。下面上干货。软件准备当然,前提条件就是你需要下载MoneyPrinterPlus软件啦。下载......
  • 代码随想录算法训练营第8天 | 复习字符串API、双指针
    2024年7月10日题344.翻转字符数组记得用双指针,时间复杂度最低。题541.反转字符串II首先自己实现一个String的reverse函数方便后面用,记得字符数组和字符串的互转方式。然后计算有多少组2k,分组处理即可。classSolution{publicStringreverseStr(Strings,intk){......
  • 【C语言】学习笔记:找出一个二维数组中的最大值,并打印出该最大值及其在数组中的位置
    找出一个二维数组中的最大值,并打印出该最大值及其在数组中的位置。首先,定义了必要的变量,包括用于遍历数组的索引变量i和j,以及用于存储最大值及其位置的变量hang、lie和max。定义了一个名为arry的二维数组,并初始化了其元素。使用两个嵌套的for循环来遍历数组,并......
  • java类的加载顺序及复杂案例(阿里)
    一.无继承关系的情况下在Java中,类的初始化顺序涉及到多个方面,包括静态变量、静态初始化块、实例变量、实例初始化块(也称为构造器初始化块或初始化块)以及构造器的执行顺序。这里是一个详细的顺序说明:静态变量和静态初始化块:当类被加载到JVM时,类的静态成员(静态变量和静态初......
  • VideoFusion——视频编辑的新纪元
    「VideoFusion」,一款全能免费短视频拼接软件,专为应对多样化视频编辑挑战而诞生。它智能化处理不一致的视频参数,一键去除噪点,巧妙消除黑边,支持多种视频格式,提供丰富的自定义选项。简化繁杂的视频剪辑流程,释放创意潜能,VideoFusion引领视频编辑进入高效智能新纪元。......
  • VBA编程:从入门到高手之路
    引言VisualBasicforApplications(VBA)是MicrosoftOffice套件中内置的编程语言,广泛用于自动化办公任务,特别是在Excel中。本文将带您从VBA的基础知识开始,逐步深入到高级技巧,助您成为VBA编程高手。1.VBA基础1.1什么是VBA?VBA是一种事件驱动的编程语言,允许用户......