首页 > 其他分享 >线程池

线程池

时间:2024-08-20 10:09:34浏览次数:8  
标签:private state 线程 new using public

`

using Bogus;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Dynamic;
using System.Runtime.ExceptionServices;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;

namespace ConsoleApp1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var rnd = new Random();

            for (var i = 0; i < 100; i++)
            {
                LazyThreadPool.QueueUserWorkItem((state) =>
                {
                    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}, {state}");
                    Thread.Sleep(rnd.Next(5) * 1000);
                }, "hello world:" + DateTime.Now);

                Thread.Sleep(rnd.Next(500));
            }

            Console.ReadKey();
        }
    }

/// <summary>
/// 线程池
/// </summary>
public class LazyThreadPool
{
    /// <summary>
    /// 最大线程数
    /// </summary>
    private int _maxThreadCount;
    /// <summary>
    /// 核心线程数
    /// </summary>
    private int _coreThreadCount;
    /// <summary>
    /// 工作线程
    /// </summary>
    private List<WorkerThread> _threads = new List<WorkerThread>();
    /// <summary>
    /// 任务队列
    /// </summary>
    private BlockingCollection<WorkItem> _queue = new BlockingCollection<WorkItem>();

    /// <summary>
    /// 默认线程池
    /// </summary>
    private static LazyThreadPool _defaultPool;
    /// <summary>
    /// 默认核心线程数
    /// </summary>
    public static int DefaultCoreThreadCount { get; set; } = 2;

    static LazyThreadPool()
    {
        _defaultPool = new LazyThreadPool(DefaultCoreThreadCount, Environment.ProcessorCount);
    }

    /// <summary>
    /// 向线程池添加任务
    /// </summary>
    /// <param name="callback"></param>
    /// <param name="state"></param>
    public static void QueueUserWorkItem(WaitCallback callback, object? state)
    {
        _defaultPool.QueueWorkItem(callback, state);
    }

    /// <summary>
    /// 构建线程池
    /// </summary>
    /// <param name="coreThreadCount"></param>
    /// <param name="maxThreadcount"></param>
    public LazyThreadPool(int coreThreadCount, int maxThreadcount)
    {
        _coreThreadCount = coreThreadCount;
        _maxThreadCount = maxThreadcount;

        // 开启任务消费
        this.StartConsuming();
    }

    /// <summary>
    /// 向线程池添加任务
    /// </summary>
    /// <param name="callback"></param>
    /// <param name="state"></param>
    public void QueueWorkItem(WaitCallback callback, object? state)
    {
        _queue.Add(new WorkItem(callback, state));
    }

    /// <summary>
    /// 开启任务消费
    /// </summary>
    private void StartConsuming()
    {
        new Thread(() =>
        {
            foreach (var workItem in _queue.GetConsumingEnumerable())
            {
                // 查找闲置线程
                var idleWorker = FindIdleWorker();

                // 未找到,则等待(优化)
                while (idleWorker == null)
                {
                    idleWorker = FindIdleWorker();
                    Thread.Sleep(1);
                }

                // 分配任务
                idleWorker.Assign(workItem.Callback, workItem.State);
            }
        }).Start();
    }

    /// <summary>
    /// 查找空闲的线程
    /// </summary>
    /// <returns></returns>
    private WorkerThread? FindIdleWorker()
    {
        // 创建核心线程
        if (_threads.Count == 0)
        {
            this.InitCoreWorkerThreads();
        }

        var idleWorker = _threads.FirstOrDefault(x => x.IsIdle);
        if (idleWorker != null) return idleWorker;

        if (_threads.Count < _maxThreadCount)
        {
            // 如果没有空闲线程,且线程池未满,则创建新线程
            var worker = new WorkerThread();
            _threads.Add(worker);
            return worker;
        }
        else
        {
            // 如果没有空闲线程,且线程池已满,则返回null  
            return null;
        }
    }

    /// <summary>
    /// 初始化核心线程
    /// </summary>
    private void InitCoreWorkerThreads()
    {
        for (var i = 0; i < _coreThreadCount; i++)
        {
            var worker = new WorkerThread();
            _threads.Add(worker);
        }
    }
}

/// <summary>
/// 工作线程
/// </summary>
public class WorkerThread
{
    private Thread _thread;
    private WaitCallback? _work = null;
    private object? _workState = null;
    private AutoResetEvent _autoResetEvent = new AutoResetEvent(false);

    public WorkerThread()
    {
        this._thread = new Thread((state) =>
        {
            var worker = (state as WorkerThread)!;

            while (true)
            {
                // 未分配任务时,等待
                _autoResetEvent.WaitOne();

                if (!worker.IsIdle)
                {
                    worker.ExecWork();
                    worker.SetIdle();
                }
            }
        });
        this._thread.Start(this);
    }

    /// <summary>
    /// 为工作线程分配任务
    /// </summary>
    /// <param name="action"></param>
    /// <param name="state"></param>
    public void Assign(WaitCallback action, object? state)
    {
        this._work = action;
        this._workState = state;
        _autoResetEvent.Set();
    }

    /// <summary>
    /// 是否空闲
    /// </summary>
    public bool IsIdle
    {
        get
        {
            return this._work == null;
        }
    }

    /// <summary>
    /// 执行工作
    /// </summary>
    private void ExecWork()
    {
        if (this._work != null) this._work(this._workState);
    }

    /// <summary>
    /// 设置为限制状态
    /// </summary>
    private void SetIdle()
    {
        this._work = null;
    }
}

/// <summary>
/// 任务项
/// </summary>
/// <param name="Callback"></param>
/// <param name="State"></param>
public record WorkItem(WaitCallback Callback, object? State);

}
`

标签:private,state,线程,new,using,public
From: https://www.cnblogs.com/readafterme/p/18368882

相关文章

  • 两线程读写数组
    #include<stdio.h>#include<stdlib.h>#include<pthread.h>#include<unistd.h>#defineARRAY_SIZE10intshared_array[ARRAY_SIZE];pthread_mutex_tmutex;void*write_data(void*arg){intthread_id=*(int*)arg;......
  • 面试场景题:一次关于线程池使用场景的讨论。
    你好呀,我是歪歪。来一起看看一个关于线程池使用场景上的问题,就当是个场景面试题了。问题是这样的:字有点多,我直接给你上个图你就懂了:前端发起一个生成报表页面的请求,这个页面上的数据由后端多个接口返回,另外由于微服务化了,所以数据散落在每个微服务中,因此需要调用多个下游接......
  • 多线程基础知识(一)
    多线程多线程​ 进程:正在运行的程序,是系统进行资源分配和调用对的独立单位,每一个进程都有它的内存空间和系统资源。可以理解为,一个正在运行的程序。​ 线程:是进程中的单个顺序控制流,是一条执行路径,一个进程如果只有一条执行路径,则称为单线程程序;一个进程如果有多条执行路径,则称......
  • 线程不安全问题实例
    packagecom.shujia.day19.sellTickets;/*使用Runnable的方式实现为了模拟更加真实的售票情况,我们加入延迟问题:我们加入了延迟之后,发现a.有重复售卖同一张票的情况(原因1)b.还出现了一个不该出现的票数据,比如第0张票,第-1张票(原因2)......
  • 多线程概念
    packagecom.shujia.day19;/*多线程:进程:正在运行的程序,是系统进行资源分配和调用的独立单位。每一个进程都有它自己的内存空间和系统资源。理解:一个正在运行的软件线程:是进程中的单个顺序控制流,是......
  • @Async使用ThreadPoolTaskExecutor 多线程
    SpringBoot中的线程池ThreadPoolTaskExecutor,@Async的使用线程池@Configuration@EnableAsyncpublicclassExcutorConfig{@Bean(name="ThreadPoolTaskExecutor")publicThreadPoolTaskExecutorThreadPoolTaskExecutor(){ThreadPoolTaskExecutorex......
  • C++中的多线程编程和锁机制
    二、多线程、锁2.1C语言线程库pthread(POSIXthreads)2.2.1线程创建pthread_create#include<pthread.h>pthread_tthread;ThreadDataargs={1,"Hellofromparameterizedthread"};intresult=pthread_create(&thread,attr,function,args); //线程创建即......
  • 线程的理解与创建
    线程定义‌线程是‌操作系统能够进行运算调度的最小单位,它是‌进程中可独立执行的子任务。线程是操作系统中用于并发执行任务的基本单元,每个进程可以包含一个或多个线程。这些线程在进程中并发执行,允许同时处理多个任务,从而提高系统的整体性能和响应速度。线程与进程的区别......
  • C++可控制线程
    大家好,本人是C++新人qing。我学习编程也快十年了,这一年来我用C++写了一些程序,有了一些新奇的想法。我写了一些诸如“C语言存储变长字符串”、“C++可控制线程对象”、“TCP通信接收任意长度字符串”的代码。这些都是我的拙作,希望能够分享给大家,主要是新人可以练练手,有意见也......
  • JavaEE篇:多线程(1)
    一认识线程(Thread)1.1概念1.1.1线程是什么?线程被创建出来是为了完成分配给它的任务。线程又称轻量级进程,是操作系统的基本调度单位。一个线程就是一个执行流。线程的创建销毁和切换都比进程更加的方便。进程是操作系统分配资源的基本单位,线程的创建和释放不涉及资源分配......