首页 > 其他分享 >使用任务队列TaskQueue和线程池ThreadPool技术实现自定义定时任务框架详解

使用任务队列TaskQueue和线程池ThreadPool技术实现自定义定时任务框架详解

时间:2024-12-14 23:32:21浏览次数:7  
标签:task 自定义 ThreadPool TaskQueue 任务 线程 定时 currentThreads public

在这里插入图片描述

前言

在桌面软件开发中,定时任务是一个常见的需求,比如定时清理日志、发送提醒邮件或执行数据备份等操作。在C#中有一个非常著名的定时任务处理库Hangfire,不过在我们深入了解Hangfire 之前,我们可以手动开发一个定时任务案例,用以帮助我们理解Hangfire的核心原理。我们可以利用 任务队列(Task Queue)线程池(Thread Pool) 这两个核心概念,构建一个高效且具有良好扩展性的定时任务系统。本文将从零开始实现一个基于任务队列和线程池的定时任务框架,并结合实际应用场景进行详细解析。

一、实现思路

  1. 任务队列:存储待执行的任务,支持并发安全,并可以支持任务优先级。
  2. 线程池:预先分配一组线程,避免频繁创建和销毁线程,同时能够动态调整线程池的大小以应对不同负载。
  3. 定时调度器:定时检查任务队列,根据任务的执行时间将任务分发到线程池中执行,并支持任务执行状态管理。

二、实现步骤

1. 任务定义

首先,我们定义一个通用任务类,用于存储任务信息,扩展它以支持优先级和执行状态。

public enum TaskStatus
{
    Pending,
    Executing,
    Completed,
    Failed
}

public class ScheduledTask
{
    public string TaskName { get; set; }
    public DateTime ExecutionTime { get; set; }
    public Action TaskAction { get; set; }
    public TaskStatus Status { get; set; }
    public int Priority { get; set; } // 优先级,值越小优先级越高

    public ScheduledTask(string taskName, DateTime executionTime, Action taskAction, int priority = 0)
    {
        TaskName = taskName;
        ExecutionTime = executionTime;
        TaskAction = taskAction;
        Status = TaskStatus.Pending;
        Priority = priority;
    }
}

2. 任务队列

为了支持任务优先级,我们使用一个优先级队列来存储任务,并确保任务的安全性。可以借助 ConcurrentQueueSortedList 来处理任务。

using System.Collections.Concurrent;

public class TaskQueue
{
    private readonly ConcurrentQueue<ScheduledTask> _tasks = new ConcurrentQueue<ScheduledTask>();

    public void Enqueue(ScheduledTask task)
    {
        _tasks.Enqueue(task);
    }

    public ScheduledTask Dequeue()
    {
        return _tasks.TryDequeue(out var task) ? task : null;
    }

    public IEnumerable<ScheduledTask> GetAllTasks()
    {
        return _tasks.OrderBy(task => task.ExecutionTime).ThenBy(task => task.Priority).ToList();
    }
}

3. 定时调度器

定时调度器的作用是定期检查任务队列,并根据任务的执行时间和优先级将任务分发到线程池中执行。

using System.Threading;
using System.Threading.Tasks;

public class Scheduler
{
    private readonly TaskQueue _taskQueue;
    private readonly Timer _timer;
    private readonly int _checkInterval;
    private readonly SemaphoreSlim _semaphore;
    
    public Scheduler(TaskQueue taskQueue, int checkInterval)
    {
        _taskQueue = taskQueue;
        _checkInterval = checkInterval;
        _semaphore = new SemaphoreSlim(1, 1); // 保证任务分发是单线程执行
        _timer = new Timer(CheckAndDispatchTasks, null, 0, _checkInterval);
    }

    private async void CheckAndDispatchTasks(object? state)
    {
        await _semaphore.WaitAsync(); // 防止并发执行
        try
        {
            var now = DateTime.Now;
            var tasksToDispatch = _taskQueue.GetAllTasks().Where(task => task.ExecutionTime <= now && task.Status == TaskStatus.Pending).ToList();
            
            foreach (var task in tasksToDispatch)
            {
                ThreadPool.QueueUserWorkItem(_ =>
                {
                    try
                    {
                        task.Status = TaskStatus.Executing;
                        Console.WriteLine($"Executing task: {task.TaskName}");
                        task.TaskAction();
                        task.Status = TaskStatus.Completed;
                    }
                    catch (Exception ex)
                    {
                        task.Status = TaskStatus.Failed;
                        Console.WriteLine($"Task {task.TaskName} failed: {ex.Message}");
                    }
                });
            }
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

4. 动态线程池管理

我们可以根据任务的数量和执行时间,动态调整线程池的大小。这里使用一个简单的线程池大小调整策略:

public class DynamicThreadPool
{
    private readonly int _maxThreads;
    private readonly int _minThreads;
    private int _currentThreads;

    public DynamicThreadPool(int minThreads, int maxThreads)
    {
        _minThreads = minThreads;
        _maxThreads = maxThreads;
        _currentThreads = minThreads;
    }

    public void AdjustThreadPoolSize(int taskCount)
    {
        if (taskCount > _currentThreads && _currentThreads < _maxThreads)
        {
            _currentThreads++;
            Console.WriteLine($"Thread pool size increased to {_currentThreads}");
        }
        else if (taskCount < _currentThreads && _currentThreads > _minThreads)
        {
            _currentThreads--;
            Console.WriteLine($"Thread pool size decreased to {_currentThreads}");
        }

        ThreadPool.SetMinThreads(_currentThreads, _currentThreads);
    }
}

5. 示例应用

结合以上模块,我们编写一个简单的定时任务系统:

using System;

class Program
{
    static void Main(string[] args)
    {
        var taskQueue = new TaskQueue();
        // 添加定时任务
        taskQueue.Enqueue(new ScheduledTask(
            "Task 1",
            DateTime.Now.AddSeconds(5),
            () => Console.WriteLine($"Task 1 executed at {DateTime.Now}")
        ));
        taskQueue.Enqueue(new ScheduledTask(
            "Task 2",
            DateTime.Now.AddSeconds(10),
            () => Console.WriteLine($"Task 2 executed at {DateTime.Now}")
        ));
        // 启动调度器
        var scheduler = new Scheduler(taskQueue, 1000);
        var threadPool = new DynamicThreadPool(2, 10);
        
        Console.WriteLine("Scheduler started. Press any key to exit...");
        Console.ReadKey();
    }
}

6. 执行结果

在这里插入图片描述

三、优势解析

  1. 线程复用与动态调整:利用线程池避免了频繁创建和销毁线程,并且支持动态调整线程池大小以应对不同负载。
  2. 任务优先级管理:通过优先级字段实现高优先级任务的优先执行,确保任务按照预定逻辑执行。
  3. 任务状态管理:支持任务执行状态的管理,能够跟踪任务的执行过程,并进行错误处理。
  4. 并发安全:通过 SemaphoreSlimBlockingCollection 等机制确保任务分发的线程安全。

四、改进方向

  1. 任务重试机制:对于执行失败的任务,可以增加重试机制,避免因为临时问题导致任务永久失败。
  2. 任务超时处理:为每个任务设置超时时间,超时未执行的任务可以做特殊处理(比如重试或通知用户)。
  3. 任务结果回调:可以为任务添加执行后的回调方法,例如执行成功后通知用户或记录日志。

五、总结

本文通过结合任务队列、线程池和定时调度器,构建了一个高效、灵活的定时任务系统。该框架具有高扩展性,能够处理多种实际应用场景。在实际项目中,可以根据需求进一步优化和扩展,如增加动态调整线程池大小、任务优先级管理、任务状态管理等功能。

标签:task,自定义,ThreadPool,TaskQueue,任务,线程,定时,currentThreads,public
From: https://blog.csdn.net/houbincarson/article/details/144478065

相关文章

  • 【Julia】自定义函数
    [Julia]006函数映射法pc=x->2*xprintln(pc(21.1))表达式法f(x,y)=x*y#定义乘法函数简化方法println(f(3.,7.))结构法functionf(x,y)#定义乘法函数x*yendprintln(f(3,7))return关键字functionsum(n,k)#多个中间变量时使用if......
  • 转载:【AI系统】自定义计算图 IR
    模型转换涉及对模型的结构和参数进行重新表示。在进行模型转换时,通常需要理解模型的计算图结构,并根据目标格式的要求对其进行调整和转换,可能包括添加、删除或修改节点、边等操作,以确保转换后的计算图能够正确地表示模型的计算流程。本文主要介绍自定义计算图的方法以及模型转换的......
  • gorm: 自定义日志
    一,官方文档:地址https://gorm.io/zh_CN/docs/logger.html参考的gormlogger代码实现:https://github.com/go-gorm/gorm/blob/master/logger/logger.go自定义日志需要实现的接口:typeInterfaceinterface{LogMode(LogLevel)InterfaceInfo(context.Context,stri......
  • Vue3自定义组件实现图片预览下载
    示例代码ImgPreview.vue<template> <divclass="preview"@click="onClick"> <divclass="preview-img"> <divclass="opt-box"> <CloudDownloadOutlined:style="{fontSize:'44p......
  • 转载:【AI系统】自定义计算图 IR
    模型转换涉及对模型的结构和参数进行重新表示。在进行模型转换时,通常需要理解模型的计算图结构,并根据目标格式的要求对其进行调整和转换,可能包括添加、删除或修改节点、边等操作,以确保转换后的计算图能够正确地表示模型的计算流程。本文主要介绍自定义计算图的方法以及模型转换的......
  • SpringBoot - 自定义启动Banner(附:使用艺术字体)
    我们知道 SpringBoot 项目启动时会在控制台打印出一个 banner,下面演示如何定制这个 banner。1,修改banner文字 (1)首先在 resources 目录下创建一个 banner.txt 文件。2,使用艺术字体(1)如果想要将文本设置为类似默认 banner 那样的艺术字体,可以借助下面几个在线......
  • Spring Security6 实现数据库自定义验证和jwt校验
    SpringSecurity6数据库自定义验证和jwt校验的简单实现以及个人解读版本springboot3.4.0mybatis-plus3.5.7jjwt0.12.6在使用jjwt的时候需要导入三个依赖分别是jjwt-api,jjwt-impl和jjwt-jackson,导入三个有点麻烦,所以可以直接导入jjwt依赖,这个依赖包含前面三个<depen......
  • 自定义资源支持:K8s Device Plugin 从原理到实现
    本文主要分析k8s中的device-plugin机制工作原理,并通过实现一个简单的device-plugin来加深理解。1.背景默认情况下,k8s中的Pod只能申请CPU和Memory这两种资源,就像下面这样:resources:requests:memory:"1024Mi"cpu:"100m"limits:memory:"2......
  • 微信小程序中使用echarts 自定义图片时报错: Image is not defined
    最近需要在小程序中完成一个图表,其中需要导入一些自定义的图片来显示。使用echarts-for-weixin项目之后,发现报了如下错误:ReferenceError:Imageisnotdefined经查看源码发现,Echarts.Js文件中是使用NewImage来创建图片的,而小程序中应该使用Canvas.Createimage()因此需要修......
  • RenderDoc在Texture View中使用自定义的Shader
    简单介绍一下如何在RenderDoc中使用Channels设定为Custom后的Shader;官方文档:HowdoIuseacustomvisualisationshader?在TextureView中设置Channels为Custom;输入新创建的Shader名称及后缀;点击绿色加号创建CustomShader,创建的hlsl保存会在C:\Users\PC\AppData\Roami......