前言
在桌面软件开发中,定时任务是一个常见的需求,比如定时清理日志、发送提醒邮件或执行数据备份等操作。在C#中有一个非常著名的定时任务处理库Hangfire,不过在我们深入了解Hangfire 之前,我们可以手动开发一个定时任务案例,用以帮助我们理解Hangfire的核心原理。我们可以利用 任务队列(Task Queue) 和 线程池(Thread Pool) 这两个核心概念,构建一个高效且具有良好扩展性的定时任务系统。本文将从零开始实现一个基于任务队列和线程池的定时任务框架,并结合实际应用场景进行详细解析。
一、实现思路
- 任务队列:存储待执行的任务,支持并发安全,并可以支持任务优先级。
- 线程池:预先分配一组线程,避免频繁创建和销毁线程,同时能够动态调整线程池的大小以应对不同负载。
- 定时调度器:定时检查任务队列,根据任务的执行时间将任务分发到线程池中执行,并支持任务执行状态管理。
二、实现步骤
1. 任务定义
首先,我们定义一个通用任务类,用于存储任务信息,扩展它以支持优先级和执行状态。
public enum TaskStatus
{
Pending,
Executing,
Completed,
Failed
}
public class ScheduledTask
{
public string TaskName { get; set; }
public DateTime ExecutionTime { get; set; }
public Action TaskAction { get; set; }
public TaskStatus Status { get; set; }
public int Priority { get; set; } // 优先级,值越小优先级越高
public ScheduledTask(string taskName, DateTime executionTime, Action taskAction, int priority = 0)
{
TaskName = taskName;
ExecutionTime = executionTime;
TaskAction = taskAction;
Status = TaskStatus.Pending;
Priority = priority;
}
}
2. 任务队列
为了支持任务优先级,我们使用一个优先级队列来存储任务,并确保任务的安全性。可以借助 ConcurrentQueue
和 SortedList
来处理任务。
using System.Collections.Concurrent;
public class TaskQueue
{
private readonly ConcurrentQueue<ScheduledTask> _tasks = new ConcurrentQueue<ScheduledTask>();
public void Enqueue(ScheduledTask task)
{
_tasks.Enqueue(task);
}
public ScheduledTask Dequeue()
{
return _tasks.TryDequeue(out var task) ? task : null;
}
public IEnumerable<ScheduledTask> GetAllTasks()
{
return _tasks.OrderBy(task => task.ExecutionTime).ThenBy(task => task.Priority).ToList();
}
}
3. 定时调度器
定时调度器的作用是定期检查任务队列,并根据任务的执行时间和优先级将任务分发到线程池中执行。
using System.Threading;
using System.Threading.Tasks;
public class Scheduler
{
private readonly TaskQueue _taskQueue;
private readonly Timer _timer;
private readonly int _checkInterval;
private readonly SemaphoreSlim _semaphore;
public Scheduler(TaskQueue taskQueue, int checkInterval)
{
_taskQueue = taskQueue;
_checkInterval = checkInterval;
_semaphore = new SemaphoreSlim(1, 1); // 保证任务分发是单线程执行
_timer = new Timer(CheckAndDispatchTasks, null, 0, _checkInterval);
}
private async void CheckAndDispatchTasks(object? state)
{
await _semaphore.WaitAsync(); // 防止并发执行
try
{
var now = DateTime.Now;
var tasksToDispatch = _taskQueue.GetAllTasks().Where(task => task.ExecutionTime <= now && task.Status == TaskStatus.Pending).ToList();
foreach (var task in tasksToDispatch)
{
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
task.Status = TaskStatus.Executing;
Console.WriteLine($"Executing task: {task.TaskName}");
task.TaskAction();
task.Status = TaskStatus.Completed;
}
catch (Exception ex)
{
task.Status = TaskStatus.Failed;
Console.WriteLine($"Task {task.TaskName} failed: {ex.Message}");
}
});
}
}
finally
{
_semaphore.Release();
}
}
}
4. 动态线程池管理
我们可以根据任务的数量和执行时间,动态调整线程池的大小。这里使用一个简单的线程池大小调整策略:
public class DynamicThreadPool
{
private readonly int _maxThreads;
private readonly int _minThreads;
private int _currentThreads;
public DynamicThreadPool(int minThreads, int maxThreads)
{
_minThreads = minThreads;
_maxThreads = maxThreads;
_currentThreads = minThreads;
}
public void AdjustThreadPoolSize(int taskCount)
{
if (taskCount > _currentThreads && _currentThreads < _maxThreads)
{
_currentThreads++;
Console.WriteLine($"Thread pool size increased to {_currentThreads}");
}
else if (taskCount < _currentThreads && _currentThreads > _minThreads)
{
_currentThreads--;
Console.WriteLine($"Thread pool size decreased to {_currentThreads}");
}
ThreadPool.SetMinThreads(_currentThreads, _currentThreads);
}
}
5. 示例应用
结合以上模块,我们编写一个简单的定时任务系统:
using System;
class Program
{
static void Main(string[] args)
{
var taskQueue = new TaskQueue();
// 添加定时任务
taskQueue.Enqueue(new ScheduledTask(
"Task 1",
DateTime.Now.AddSeconds(5),
() => Console.WriteLine($"Task 1 executed at {DateTime.Now}")
));
taskQueue.Enqueue(new ScheduledTask(
"Task 2",
DateTime.Now.AddSeconds(10),
() => Console.WriteLine($"Task 2 executed at {DateTime.Now}")
));
// 启动调度器
var scheduler = new Scheduler(taskQueue, 1000);
var threadPool = new DynamicThreadPool(2, 10);
Console.WriteLine("Scheduler started. Press any key to exit...");
Console.ReadKey();
}
}
6. 执行结果
三、优势解析
- 线程复用与动态调整:利用线程池避免了频繁创建和销毁线程,并且支持动态调整线程池大小以应对不同负载。
- 任务优先级管理:通过优先级字段实现高优先级任务的优先执行,确保任务按照预定逻辑执行。
- 任务状态管理:支持任务执行状态的管理,能够跟踪任务的执行过程,并进行错误处理。
- 并发安全:通过
SemaphoreSlim
和BlockingCollection
等机制确保任务分发的线程安全。
四、改进方向
- 任务重试机制:对于执行失败的任务,可以增加重试机制,避免因为临时问题导致任务永久失败。
- 任务超时处理:为每个任务设置超时时间,超时未执行的任务可以做特殊处理(比如重试或通知用户)。
- 任务结果回调:可以为任务添加执行后的回调方法,例如执行成功后通知用户或记录日志。
五、总结
本文通过结合任务队列、线程池和定时调度器,构建了一个高效、灵活的定时任务系统。该框架具有高扩展性,能够处理多种实际应用场景。在实际项目中,可以根据需求进一步优化和扩展,如增加动态调整线程池大小、任务优先级管理、任务状态管理等功能。
标签:task,自定义,ThreadPool,TaskQueue,任务,线程,定时,currentThreads,public From: https://blog.csdn.net/houbincarson/article/details/144478065