项目中,当需要使用多线程时,习惯用Task 或者Thread去开启线程,当需要响应速度时,Task总是不那么让人满意。
建立任务池,多个线程在循环执行,当有任务过来时,就立即使用空闲的线程去执行任务,以达到快速响应的效果。
以下是任务池的设计代码:
使用中有问题,或有优化的地方,可以在评论区提出,感谢!
------------------------------------------------------------------------------------------------------------------------
可以直接引用Nuget包 LS.Helper 即可使用 或直接使用源码也可以(往下拉)
使用示例:
/// <summary>
/// 用Thread运行方法
/// 一般用于整个生命周期的线程
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public static Thread RunByThread(Action action)
{
Thread _t = new Thread(() => { action.Invoke(); }) { IsBackground = true };
_t.Start();
return _t;
}
/// <summary>
/// 使用任务池运行方法
/// 一般用于临时运行或者周期性运行的方法
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public static BaseResult RunByActionPool(Action action)
{
return ACPool.AddAction(action);
}
private static readonly object locker = new object();
private static ActionPool _acPool;
/// <summary>
/// 任务池对象
/// </summary>
public static ActionPool ACPool
{
get
{
lock (locker)
{
if (_acPool == null)
{
_acPool = new ActionPool();
_acPool.InitThread();
}
return _acPool;
}
}
}
✍更新说明
-----2023年09月11日--------------------------------------------------------------------------------------------------------------
1.增加Waiting状态,当SetAction()时,状态立即变更为Waiting
2.SetAction()前再次对线程状态做确认,确保数据稳定
下面是实现源码:
/// <summary>
/// 任务池
/// 目前支持无返回值的方法执行
/// </summary>
public class ActionPool
{
/// <summary>
/// 线程总数
/// </summary>
public int PoolCount = 50;
/// <summary>
/// 待执行的方法集合
/// </summary>
private ConcurrentQueue<Action> actions = new ConcurrentQueue<Action>();
/// <summary>
/// 标识是否运行任务池
/// </summary>
private bool isRunThread = false;
/// <summary>
/// 线程列表
/// </summary>
private List<ActionThread> threads = new List<ActionThread>();
/// <summary>
/// 轮询线程
/// </summary>
private Thread _thread;
/// <summary>
/// 日志信息输出 委托
/// </summary>
public delegate void DelegateLog(string log);
/// <summary>
/// 日志信息输出
/// </summary>
public event DelegateLog SendLog;
/// <summary>
/// 任务池
/// </summary>
public ActionPool()
{
}
/// <summary>
/// 任务池初始化
/// </summary>
/// <param name="poolCount">线程总数</param>
public void InitThread(int poolCount = 50)
{
try
{
PoolCount = poolCount;
if (!isRunThread)
{
isRunThread = true;
threads = new List<ActionThread>();
for (int i = 0; i < PoolCount; i++)
{
var at = new ActionThread();
at.SendLog += At_SendLog;
threads.Add(at);
}
_thread = new Thread(Execute) { IsBackground = true };
_thread.Start();
}
}
catch (Exception ex)
{
throw ex;
}
}
private void At_SendLog(string log)
{
SendLog?.Invoke(log);
}
/// <summary>
/// 当有任务进来时,选择空闲的线程去执行任务
/// </summary>
private void Execute()
{
while (isRunThread)
{
try
{
if (actions.Count > 0)
{
var index = threads.FindIndex(x => x.IsReady());
if (index >= 0)
{
var t = threads[index];
//再次确认是否可以添加任务进来
if (t.IsReady())
{
if (actions.TryDequeue(out Action action))
t.SetAction(action);
}
}
}
}
catch (Exception ex)
{ }
Thread.Sleep(10);
}
}
/// <summary>
/// 添加执行方法到任务池
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public BaseResult AddAction(Action action)
{
try
{
if (action == null)
return new BaseResult(false, "Action 不能为Null");
actions.Enqueue(action);
return BaseResult.Successed;
}
catch (Exception ex)
{
return new BaseResult(false, ex.ToString());
}
}
/// <summary>
/// 结束线程池
/// </summary>
public void Stop()
{
try
{
isRunThread = false;
if (threads.Count > 0)
{
foreach (var t in threads)
{
try
{
t.Stop();
}
catch (Exception ex) { }
}
}
_thread?.Join();
}
catch (Exception ex)
{
}
}
}
/// <summary>
/// 工作线程对象
/// </summary>
public class ActionThread
{
/// <summary>
/// 线程对象
/// </summary>
private Thread _thread;
/// <summary>
/// 工作状态
/// </summary>
public ActionState ActionStatus;
/// <summary>
/// 需要执行的方法
/// </summary>
private Action _action;
/// <summary>
/// 标识是否运行线程
/// </summary>
private bool isRunThread = false;
/// <summary>
/// 循环线程的睡眠时间
/// </summary>
private int _sleeptime = 500;
/// <summary>
/// 锁对象
/// </summary>
private static readonly object _lock = new object();
/// <summary>
/// 日志信息输出 委托
/// </summary>
public delegate void DelegateLog(string log);
/// <summary>
/// 日志信息输出
/// </summary>
public event DelegateLog SendLog;
public ActionThread(int sleeptime = 500)
{
ActionStatus = ActionState.Init;
_sleeptime = sleeptime;
_action = null;
isRunThread = true;
_thread = new Thread(Execute) { IsBackground = true };
_thread.Start();
ActionStatus = ActionState.Ready;
}
/// <summary>
/// 判断线程是否有空闲
/// </summary>
/// <returns></returns>
public bool IsReady()
{
return (ActionStatus == ActionState.Ready && _action == null);
}
/// <summary>
/// 执行任务--循环方法
/// </summary>
public void Execute()
{
while (isRunThread)
{
try
{
lock (_lock)
{
if ((ActionStatus == ActionState.Ready|| ActionStatus==ActionState.Waiting) && _action != null)
{
ActionStatus = ActionState.Working;
DateTime begin = DateTime.Now;
SendLog?.Invoke($"ID[{_thread.ManagedThreadId}] 开始执行Action[{_action.Method.Name}]");
//_action.Invoke();
var res = _action.BeginInvoke((ar) =>
{
_action = null;
ActionStatus = ActionState.Ready;
}, null);
//_action.EndInvoke(res);
SendLog?.Invoke($"ID[{_thread.ManagedThreadId}] 完成执行Action[{_action.Method.Name}] 耗时:{DateTime.Now.Subtract(begin).TotalMilliseconds}");
}
}
}
catch (Exception ex)
{
}
finally
{
}
Thread.Sleep(_sleeptime);
}
}
/// <summary>
/// 插入任务
/// </summary>
/// <param name="action"></param>
public BaseResult SetAction(Action action)
{
try
{
if (ActionStatus == ActionState.Ready)
{
_action = action;
ActionStatus= ActionState.Waiting;
return BaseResult.Successed;
}
return new BaseResult(false, "线程正忙,无法添加任务");
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 停止线程工作
/// </summary>
public void Stop()
{
try
{
isRunThread = false;
_action = null;
ActionStatus = ActionState.Init;
Thread.Sleep(1000);
_thread?.Join();
}
catch (Exception ex)
{
throw ex;
}
}
}
/// <summary>
/// 线程工作状态
/// </summary>
public enum ActionState
{
/// <summary>
/// 初始化中
/// </summary>
Init,
/// <summary>
/// 待开始
/// </summary>
Ready,
/// <summary>
/// 等待开始
/// </summary>
Waiting,
/// <summary>
/// 任务执行中
/// </summary>
Working
}