首页 > 其他分享 >自己动手实现一个轻量无负担的任务调度ScheduleTask

自己动手实现一个轻量无负担的任务调度ScheduleTask

时间:2024-05-15 16:09:54浏览次数:13  
标签:ScheduleTask Task return class task var 任务调度 轻量 public

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

    public interface IScheduleTask
    {
        Task ExecuteAsync();
    }
    public abstract class ScheduleTask : IScheduleTask
    {
        public virtual Task ExecuteAsync()
        {
            return Task.CompletedTask;
        }
    }

第二步定义特性标注任务执行周期等信的metadata

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
    public class ScheduleTaskAttribute(string cron) : Attribute
    {
        /// <summary>
        /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
        /// 最小单位为分钟
        /// </summary>
        public string Cron { get; set; } = cron;
        public string? Description { get; set; }
        /// <summary>
        /// 是否异步执行.默认false会阻塞接下来的同类任务
        /// </summary>
        public bool IsAsync { get; set; } = false;
        /// <summary>
        /// 是否初始化即启动,默认false
        /// </summary>
        public bool IsStartOnInit { get; set; } = false;
    }

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

    public interface IScheduler
    {
        /// <summary>
        /// 判断当前的任务是否可以执行
        /// </summary>
        bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
    }

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

    public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
    {
        public Type ScheduleTaskType { get; set; } = scheduleTaskType;
        public string Cron { get; set; } = cron;
        public string? Description { get; set; }
        public bool IsAsync { get; set; } = false;
        public bool IsStartOnInit { get; set; } = false;
    }
    public interface IScheduleMetadataStore
    {
        /// <summary>
        /// 获取所有ScheduleTaskMetadata
        /// </summary>
        Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
    }

实现一个Configuration级别的Store

    internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
    {
        const string Key = "BiwenQuickApi:Schedules";

        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
        {
            var options = configuration.GetSection(Key).GetChildren();

            if (options?.Any() is true)
            {
                var metadatas = options.Select(x =>
                {
                    var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
                    if (type is null)
                        throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");

                    return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
                    {
                        Description = x[nameof(ConfigurationScheduleOption.Description)],
                        IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
                        IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
                    };
                });
                return Task.FromResult(metadatas);
            }
            return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
        }
    }

然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

    public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
    {
        /// <summary>
        /// 任务
        /// </summary>
        public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
        /// <summary>
        /// 触发时间
        /// </summary>
        public DateTime EventTime { get; set; } = eventTime;
    }
    /// <summary>
    /// 执行完成
    /// </summary>
    public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
    {
        /// <summary>
        /// 执行结束的时间
        /// </summary>
        public DateTime EndTime { get; set; } = endTime;
    }
    /// <summary>
    /// 执行开始
    /// </summary>
    public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
    /// <summary>
    /// 执行失败
    /// </summary>
    public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
    {
        /// <summary>
        /// 异常信息
        /// </summary>
        public Exception Exception { get; private set; } = exception;
    }

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

    internal class SampleNCrontabScheduler : IScheduler
    {
        /// <summary>
        /// 暂存上次执行时间
        /// </summary>
        private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();

        public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
        {
            var now = DateTime.Now;
            var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
            if (!haveExcuteTime)
            {
                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);

                //如果不是初始化启动,则不执行
                if (!scheduleMetadata.IsStartOnInit)
                    return false;
            }
            if (now >= time)
            {
                var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                //更新下次执行时间
                LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
                return true;
            }
            return false;
        }
    }

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:


    internal class ScheduleBackgroundService : BackgroundService
    {
        private static readonly TimeSpan _pollingTime
#if DEBUG
          //轮询20s 测试环境下,方便测试。
          = TimeSpan.FromSeconds(20);
#endif
#if !DEBUG
         //轮询60s 正式环境下,考虑性能轮询时间延长到60s
         = TimeSpan.FromSeconds(60);
#endif
        //心跳10s.
        private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
        private readonly ILogger<ScheduleBackgroundService> _logger;
        private readonly IServiceProvider _serviceProvider;
        public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
                try
                {
                    await RunAsync(stoppingToken);
                }
                catch (Exception ex)
                {
                    //todo:
                    _logger.LogError(ex.Message);
                }
                await WaitAsync(pollingDelay, stoppingToken);
            }
        }
        private async Task RunAsync(CancellationToken stoppingToken)
        {
            using var scope = _serviceProvider.CreateScope();
            var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
            if (tasks is null || !tasks.Any())
            {
                return;
            }
            //调度器
            var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
            async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
            {
                if (scheduler.CanRun(metadata, DateTime.Now))
                {
                    var eventTime = DateTime.Now;
                    //通知启动
                    _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
                    try
                    {
                        if (metadata.IsAsync)
                        {
                            //异步执行
                            _ = task.ExecuteAsync();
                        }
                        else
                        {
                            //同步执行
                            await task.ExecuteAsync();
                        }
                        //执行完成
                        _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
                    }
                    catch (Exception ex)
                    {
                        _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
                    }
                }
            };
            //注解中的task
            foreach (var task in tasks)
            {
                if (stoppingToken.IsCancellationRequested)
                {
                    break;
                }
                //标注的metadatas
                var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();

                if (!metadatas.Any())
                {
                    continue;
                }
                foreach (var metadata in metadatas)
                {
                    await DoTaskAsync(task, metadata);
                }
            }
            //store中的scheduler
            var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();

            //并行执行,提高性能
            Parallel.ForEach(stores, async store =>
            {
                if (stoppingToken.IsCancellationRequested)
                {
                    return;
                }
                var metadatas = await store.GetAllAsync();
                if (metadatas is null || !metadatas.Any())
                {
                    return;
                }
                foreach (var metadata in metadatas)
                {
                    var attr = new ScheduleTaskAttribute(metadata.Cron)
                    {
                        Description = metadata.Description,
                        IsAsync = metadata.IsAsync,
                        IsStartOnInit = metadata.IsStartOnInit,
                    };

                    var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
                    if (task is null)
                    {
                        return;
                    }
                    await DoTaskAsync(task, attr);
                }
            });
        }

        private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
        {
            try
            {
                await Task.Delay(_minIdleTime, stoppingToken);
                await pollingDelay;
            }
            catch (OperationCanceledException)
            {
            }
        }
    }

最后收尾阶段我们老规矩扩展一下IServiceCollection:

        internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
        {
            foreach (var task in ScheduleTasks)
            {
                services.AddTransient(task);
                services.AddTransient(typeof(IScheduleTask), task);
            }
            //调度器
            services.AddScheduler<SampleNCrontabScheduler>();
            //配置文件Store services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
            //BackgroundService
           services.AddHostedService<ScheduleBackgroundService>();
            return services;
        }
        /// <summary>
        /// 注册调度器AddScheduler
        /// </summary>
        public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
        {
            services.AddSingleton<IScheduler, T>();
            return services;
        }

        /// <summary>
        /// 注册ScheduleMetadataStore
        /// </summary>
        public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
        {
            services.AddSingleton<IScheduleMetadataStore, T>();
            return services;
        }

老规矩我们来测试一下:

    //通过特性标注的方式执行:
    [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
    [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
    public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
    {
        public async Task ExecuteAsync()
        {
            //执行5s
            await Task.Delay(TimeSpan.FromSeconds(5));
            logger.LogInformation("keep alive!");
        }
    }
	public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
    {
        public Task ExecuteAsync()
        {
            logger.LogInformation("Demo Config Schedule Done!");
            return Task.CompletedTask;
        }
    }

通过配置文件的方式配置Store:

{
  "BiwenQuickApi": {
    "Schedules": [
      {
        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
        "Cron": "0/5 * * * *",
        "Description": "Every 5 mins",
        "IsAsync": true,
        "IsStartOnInit": false
      },
      {
        "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
        "Cron": "0/10 * * * *",
        "Description": "Every 10 mins",
        "IsAsync": false,
        "IsStartOnInit": true
      }
    ]
  }
}

我们还可以自己实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

    public class DemoStore : IScheduleMetadataStore
    {
        public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
        {
            //模拟从数据库或配置文件中获取ScheduleTaskMetadata
            IEnumerable<ScheduleTaskMetadata> metadatas =
                [
                    new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
                    {
                        Description="测试的Schedule"
                    },
                ];
            return Task.FromResult(metadatas);
        }
    }
	
	//然后注册这个Store:
	builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo成功了:
image

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

标签:ScheduleTask,Task,return,class,task,var,任务调度,轻量,public
From: https://www.cnblogs.com/vipwan/p/18194062

相关文章

  • 从0到1,百亿级任务调度平台的架构与实现
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • Cron表达式-任务调度
    当我们谈论任务调度时,cron(Cron表达式)是一种非常常见和常用的方式。它是一种用于在特定时间间隔内定期执行任务的调度表达式。cron表达式由6个字段组成,分别代表分钟、小时、日期、月份、星期几和要执行的命令或脚本。下面是cron表达式的每个字段的含义:09***command分钟(0-5......
  • 城市三维模型CIM轻量化技术浅析
    城市三维模型CIM轻量化技术浅析 城市三维模型CIM(CityInformationModeling)是在数字化时代中,为城市规划、管理和可视化提供重要支持的关键工具。然而,大规模的城市模型往往具有复杂的几何结构和庞大的数据量,给数据存储、计算和可视化带来了挑战。为了解决这个问题,CIM的轻量......
  • 在Linux中,如何使用cron进行任务调度?
    Cron是Linux系统中用于任务调度的一个强大工具,它允许用户安排命令或脚本在特定的时间周期性地自动执行,无需用户干预。Cron作业可以按分钟、小时、日期、月份、星期几来设置执行时间。以下是使用cron进行任务调度的基本步骤:1.编辑Crontab文件Crontab(crontable)文件包含了所有计......
  • EasyLogger - 一款超轻量级、高性能的 C/C++ 日志库
    1、EasyLogger-一款超轻量级、高性能的C/C++日志库EasyLogger是一款超轻量级(ROM<1.6K,RAM<0.3K)、高性能的C/C++日志库,非常适合对资源敏感的软件项目,例如:IoT产品、可穿戴设备、智能家居等等。相比log4c、zlog这些知名的C/C++日志库,EasyLogger的功能更加简单,提供......
  • LwRB - 一款适用嵌入式系统的轻量级 RingBuffer+MultiTimer - 超精简的纯软件定时器驱
    1、MicroMagic发布世界上最快的64-bitRISC-V核近日,一家位于美国加州森尼维尔的小型电子设计公司MicroMagic宣称设计、生产出了全世界最快的64位RISC-V内核,比苹果的M1芯片和ArmCortex-A9表现还要出色。消息源: http://www.micromagic.com/news/RISCv-Fastest_PR.pdf这......
  • PikaScript - 面向嵌入式的超轻量级python引擎+Ring-Buffer - 仅80行代码的超简洁环形
    1、PikaScript-面向嵌入式的超轻量级python引擎PikaScript(前称mimiscript)是一个完全重写的超轻量级python引擎,零依赖,零配置,可以在少于4KB的RAM下运行(如stm32g030c8和stm32f103c8),极易部署和扩展。项目地址:https://github.com/pikasTech/pikascriptPikaScript是使用c语言写......
  • nRF51_Platform - 基于nRF51平台(蓝牙4.0)的轻量级SDK+AliOS Things - 阿里出品轻量级
    1、AliOSThings-阿里出品轻量级物联网嵌入式操作系统AliOSThings发布于2017年杭州云栖大会,是AliOS家族旗下的、面向IoT领域的、高可伸缩的物联网操作系统。AliOSThings致力于搭建云端一体化IoT基础设施,具备极致性能、极简开发、云端一体。项目主页: https://github.c......
  • 速度围观|使用分布式企业级任务调度平台,到底有多香?
    任务调度平台是关键的软件基础设施,专门设计用于自动化、高效和可靠地安排及执行预定的后台任务。谷歌云首席决策工程师KasimKhan曾提到:“在云计算环境中,自动化和效率是关键。”任务调度平台通过优化资源使用和集中管理功能,提供了一系列强大的调度策略、执行管理、监控报警和开发......
  • 城市三维模型CIM轻量化的应用分析
    城市三维模型CIM轻量化的应用分析 城市三维模型CIM(CityInformationModeling)在城市规划、管理和可视化方面具有重要的应用价值。然而,由于大规模城市模型的复杂性和庞大的数据量,对于实时应用和高效性能的要求越来越高。为了解决这一挑战,CIM的轻量化技术应运而生,以提高模型的性......