首页 > 其他分享 >TaskScheduler

TaskScheduler

时间:2024-08-16 14:37:30浏览次数:10  
标签:tasks TaskScheduler task 任务 Task 线程

TaskScheduler是什么

TaskScheduler决定了将Task调度到什么地方去执行,即TaskScheduler决定了Task如何被调度

ThreadPoolTaskScheduler

如果不特别指定,默认就是 ThreadPoolTaskScheduler

内部有两种处理逻辑,一种是针对LongRunning需求的Task,会单独走后台Thread路径;另一种是非LongRunning需求的Task,直接走ThreadPool线程池路径。

Why?针对LongRunning的Task,如果长时间运行占用着ThreadPool的线程,这时候ThreadPool为了保证线程充足,会再次开辟一些Thread,如果耗时任务此时释放了,会导致ThreadPool线程过多,上下文切换频繁,所以这种情况下让Task在Thread中执行还是非常不错的选择

SynchronizationContextTaskScheduler

适用于GUI程序:耗时操作一般不会放到UI线程处理,而是放到工作线程去处理,处理完之后通过发送消息到Queue,GUI程序就可以从Queue中取出来消费,更新UI内容。

How?在Task.Factory.StartNew方法中传参:TaskScheduler.FromCurrentSynchronizationContext()

自己实现一个TaskScheduler

namespace EDT.MultiThread.Demo
{
    public class CustomTaskScheduler : TaskScheduler
    {
        Thread thread = null;

        BlockingCollection<Task> collection = new BlockingCollection<Task>();

        public CustomTaskScheduler()
        {
            thread = new Thread(() =>
            {
                foreach (var task in collection.GetConsumingEnumerable())
                {
                    TryExecuteTask(task);
                }
            });

            thread.Start();
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return collection.ToArray();
        }

        protected override void QueueTask(Task task)
        {
            collection.Add(task);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            throw new NotImplementedException();
        }
    }
}

调用端示例代码:

var scheduler = new CustomTaskScheduler();
for (int i = 0; i < 100; i++)
{
    var task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");
    }, CancellationToken.None, TaskCreationOptions.None, scheduler);
}

Console.ReadLine();

自定义scheduler

基本步骤
继承 TaskScheduler:
需要创建一个类来继承 TaskScheduler 并实现以下三个抽象成员:

QueueTask(Task task): 将任务加入调度队列。
TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued): 尝试在线程池线程上同步执行任务。
GetScheduledTasks(): 返回当前调度队列中的任务。
实现调度逻辑:
在 QueueTask 中实现您的调度逻辑,比如将任务添加到队列中,然后处理队列中的任务。

使用自定义 TaskScheduler:
可以使用 TaskFactory 或 Task 类的 Start 方法将任务安排在您的自定义 TaskScheduler 上。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // Task 列表
    private readonly int _maxDegreeOfParallelism; // 最大并发数
    private int _runningTasks = 0; // 当前运行的任务数

    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        bool lockTaken = false;
        try
        {
            Monitor.TryEnter(_tasks, ref lockTaken);
            if (lockTaken)
            {
                return _tasks.ToArray();
            }
            else
            {
                throw new NotSupportedException("无法获取任务列表");
            }
        }
        finally
        {
            if (lockTaken) Monitor.Exit(_tasks);
        }
    }

    protected override void QueueTask(Task task)
    {
        lock (_tasks)
        {
            _tasks.AddLast(task);
            if (_runningTasks < _maxDegreeOfParallelism)
            {
                _runningTasks++;
                NotifyThreadPoolOfPendingWork();
            }
        }
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        // 在线程池线程上执行任务时,允许内联执行
        if (Thread.CurrentThread.IsThreadPoolThread)
        {
            return TryExecuteTask(task);
        }
        return false;
    }

    private void NotifyThreadPoolOfPendingWork()
    {
        ThreadPool.UnsafeQueueUserWorkItem(_ =>
        {
            Task task;
            lock (_tasks)
            {
                if (_tasks.Count == 0)
                {
                    _runningTasks--;
                    return;
                }

                task = _tasks.First.Value;
                _tasks.RemoveFirst();
            }

            TryExecuteTask(task);
            NotifyThreadPoolOfPendingWork();
        }, null);
    }
}

// 使用自定义 TaskScheduler
class Program
{
    static void Main(string[] args)
    {
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(2); // 最大并发任务数为 2
        var factory = new TaskFactory(scheduler);

        var tasks = new Task[5];
        for (int i = 0; i < 5; i++)
        {
            int taskNum = i;
            tasks[i] = factory.StartNew(() =>
            {
                Console.WriteLine($"Task {taskNum} 开始执行");
                Thread.Sleep(1000); // 模拟任务工作
                Console.WriteLine($"Task {taskNum} 执行完成");
            });
        }

        Task.WaitAll(tasks); // 等待所有任务完成
    }
}

非递归实现

private void NotifyThreadPoolOfPendingWork()
{
    while (true)
    {
        Task task;
        lock (_tasks)
        {
            if (_tasks.Count == 0)
            {
                _runningTasks--;
                return;
            }

            task = _tasks.First.Value;
            _tasks.RemoveFirst();
        }

        TryExecuteTask(task);
    }
}

TryExecuteTaskInline 的作用

直接执行任务:
当任务调度器在处理任务时,如果检测到任务可以在当前线程中直接执行,而无需排队等待,它就会尝试调用 TryExecuteTaskInline 来立即执行这个任务。

避免上下文切换:
如果任务在当前线程中执行而不必在线程池的其他线程中排队等待,可以减少上下文切换,提高执行效率。

条件限制:
调用 TryExecuteTaskInline 需要满足某些条件,比如当前线程是否适合执行任务(例如,是否处于适合任务执行的上下文中),以及任务是否允许内联执行。

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    // 在线程池线程上执行任务时,允许内联执行
    if (Thread.CurrentThread.IsThreadPoolThread)
    {
        return TryExecuteTask(task);
    }
    return false;
}

没有显式调用

TaskScheduler 的 TryExecuteTaskInline 方法通常由 Task 系统在内部调用,当任务的执行上下文(例如在当前线程上)允许内联执行时会被自动触发。
如果调用了 Task.Run 或 TaskFactory.StartNew 等方法,框架可能会先尝试调用 TryExecuteTaskInline,看看任务能否立即在当前线程中执行
在以下情况中,TryExecuteTaskInline 的使用是合理的:

性能优化:如果任务可以立即在当前线程中执行,而不需要排队或上下文切换,使用 TryExecuteTaskInline 可以提高性能。
避免死锁:在某些同步场景中,允许任务内联执行可以避免死锁情况的出现,尤其是在任务依赖于当前线程执行时。

标签:tasks,TaskScheduler,task,任务,Task,线程
From: https://www.cnblogs.com/JosenEarth/p/18362808

相关文章

  • C# Microsoft.Win32.TaskScheduler方式创建任务计划程序报错: System.ArgumentExceptio
    使用Microsoft.Win32.TaskScheduler创建任务计划程序可参考本人之前的一篇文章:https://www.cnblogs.com/log9527blog/p/17329755.html最新发现个别账户使用Microsoft.Win32.TaskScheduler创建任务计划程序报错:System.ArgumentException:(12,21):UserId:Account一种情况是账户......
  • SpringBoot自带ThreadPoolTaskScheduler实现数据库管理定时任务
    最近做了一个需求:将定时任务保存到数据库中,并在页面上实现定时任务的开关,以及更新定时任务时间后重新创建定时任务。于是想到了SpringBoot中自带的ThreadPoolTaskScheduler。在SpringBoot中提供的ThreadPoolTaskScheduler这个类,该类提供了一个schedule(Runnabletask,Triggert......