`
using Bogus;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Dynamic;
using System.Runtime.ExceptionServices;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
namespace ConsoleApp1
{
internal class Program
{
static void Main(string[] args)
{
var rnd = new Random();
for (var i = 0; i < 100; i++)
{
LazyThreadPool.QueueUserWorkItem((state) =>
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}, {state}");
Thread.Sleep(rnd.Next(5) * 1000);
}, "hello world:" + DateTime.Now);
Thread.Sleep(rnd.Next(500));
}
Console.ReadKey();
}
}
/// <summary>
/// 线程池
/// </summary>
public class LazyThreadPool
{
/// <summary>
/// 最大线程数
/// </summary>
private int _maxThreadCount;
/// <summary>
/// 核心线程数
/// </summary>
private int _coreThreadCount;
/// <summary>
/// 工作线程
/// </summary>
private List<WorkerThread> _threads = new List<WorkerThread>();
/// <summary>
/// 任务队列
/// </summary>
private BlockingCollection<WorkItem> _queue = new BlockingCollection<WorkItem>();
/// <summary>
/// 默认线程池
/// </summary>
private static LazyThreadPool _defaultPool;
/// <summary>
/// 默认核心线程数
/// </summary>
public static int DefaultCoreThreadCount { get; set; } = 2;
static LazyThreadPool()
{
_defaultPool = new LazyThreadPool(DefaultCoreThreadCount, Environment.ProcessorCount);
}
/// <summary>
/// 向线程池添加任务
/// </summary>
/// <param name="callback"></param>
/// <param name="state"></param>
public static void QueueUserWorkItem(WaitCallback callback, object? state)
{
_defaultPool.QueueWorkItem(callback, state);
}
/// <summary>
/// 构建线程池
/// </summary>
/// <param name="coreThreadCount"></param>
/// <param name="maxThreadcount"></param>
public LazyThreadPool(int coreThreadCount, int maxThreadcount)
{
_coreThreadCount = coreThreadCount;
_maxThreadCount = maxThreadcount;
// 开启任务消费
this.StartConsuming();
}
/// <summary>
/// 向线程池添加任务
/// </summary>
/// <param name="callback"></param>
/// <param name="state"></param>
public void QueueWorkItem(WaitCallback callback, object? state)
{
_queue.Add(new WorkItem(callback, state));
}
/// <summary>
/// 开启任务消费
/// </summary>
private void StartConsuming()
{
new Thread(() =>
{
foreach (var workItem in _queue.GetConsumingEnumerable())
{
// 查找闲置线程
var idleWorker = FindIdleWorker();
// 未找到,则等待(优化)
while (idleWorker == null)
{
idleWorker = FindIdleWorker();
Thread.Sleep(1);
}
// 分配任务
idleWorker.Assign(workItem.Callback, workItem.State);
}
}).Start();
}
/// <summary>
/// 查找空闲的线程
/// </summary>
/// <returns></returns>
private WorkerThread? FindIdleWorker()
{
// 创建核心线程
if (_threads.Count == 0)
{
this.InitCoreWorkerThreads();
}
var idleWorker = _threads.FirstOrDefault(x => x.IsIdle);
if (idleWorker != null) return idleWorker;
if (_threads.Count < _maxThreadCount)
{
// 如果没有空闲线程,且线程池未满,则创建新线程
var worker = new WorkerThread();
_threads.Add(worker);
return worker;
}
else
{
// 如果没有空闲线程,且线程池已满,则返回null
return null;
}
}
/// <summary>
/// 初始化核心线程
/// </summary>
private void InitCoreWorkerThreads()
{
for (var i = 0; i < _coreThreadCount; i++)
{
var worker = new WorkerThread();
_threads.Add(worker);
}
}
}
/// <summary>
/// 工作线程
/// </summary>
public class WorkerThread
{
private Thread _thread;
private WaitCallback? _work = null;
private object? _workState = null;
private AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
public WorkerThread()
{
this._thread = new Thread((state) =>
{
var worker = (state as WorkerThread)!;
while (true)
{
// 未分配任务时,等待
_autoResetEvent.WaitOne();
if (!worker.IsIdle)
{
worker.ExecWork();
worker.SetIdle();
}
}
});
this._thread.Start(this);
}
/// <summary>
/// 为工作线程分配任务
/// </summary>
/// <param name="action"></param>
/// <param name="state"></param>
public void Assign(WaitCallback action, object? state)
{
this._work = action;
this._workState = state;
_autoResetEvent.Set();
}
/// <summary>
/// 是否空闲
/// </summary>
public bool IsIdle
{
get
{
return this._work == null;
}
}
/// <summary>
/// 执行工作
/// </summary>
private void ExecWork()
{
if (this._work != null) this._work(this._workState);
}
/// <summary>
/// 设置为限制状态
/// </summary>
private void SetIdle()
{
this._work = null;
}
}
/// <summary>
/// 任务项
/// </summary>
/// <param name="Callback"></param>
/// <param name="State"></param>
public record WorkItem(WaitCallback Callback, object? State);
}
`