TaskScheduler是什么
TaskScheduler决定了将Task调度到什么地方去执行,即TaskScheduler决定了Task如何被调度
ThreadPoolTaskScheduler
如果不特别指定,默认就是 ThreadPoolTaskScheduler
内部有两种处理逻辑,一种是针对LongRunning需求的Task,会单独走后台Thread路径;另一种是非LongRunning需求的Task,直接走ThreadPool线程池路径。
Why?针对LongRunning的Task,如果长时间运行占用着ThreadPool的线程,这时候ThreadPool为了保证线程充足,会再次开辟一些Thread,如果耗时任务此时释放了,会导致ThreadPool线程过多,上下文切换频繁,所以这种情况下让Task在Thread中执行还是非常不错的选择
SynchronizationContextTaskScheduler
适用于GUI程序:耗时操作一般不会放到UI线程处理,而是放到工作线程去处理,处理完之后通过发送消息到Queue,GUI程序就可以从Queue中取出来消费,更新UI内容。
How?在Task.Factory.StartNew方法中传参:TaskScheduler.FromCurrentSynchronizationContext()
自己实现一个TaskScheduler
namespace EDT.MultiThread.Demo
{
public class CustomTaskScheduler : TaskScheduler
{
Thread thread = null;
BlockingCollection<Task> collection = new BlockingCollection<Task>();
public CustomTaskScheduler()
{
thread = new Thread(() =>
{
foreach (var task in collection.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
});
thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return collection.ToArray();
}
protected override void QueueTask(Task task)
{
collection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
throw new NotImplementedException();
}
}
}
调用端示例代码:
var scheduler = new CustomTaskScheduler();
for (int i = 0; i < 100; i++)
{
var task = Task.Factory.StartNew(() =>
{
Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");
}, CancellationToken.None, TaskCreationOptions.None, scheduler);
}
Console.ReadLine();
自定义scheduler
基本步骤
继承 TaskScheduler:
需要创建一个类来继承 TaskScheduler 并实现以下三个抽象成员:
QueueTask(Task task): 将任务加入调度队列。
TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued): 尝试在线程池线程上同步执行任务。
GetScheduledTasks(): 返回当前调度队列中的任务。
实现调度逻辑:
在 QueueTask 中实现您的调度逻辑,比如将任务添加到队列中,然后处理队列中的任务。
使用自定义 TaskScheduler:
可以使用 TaskFactory 或 Task 类的 Start 方法将任务安排在您的自定义 TaskScheduler 上。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // Task 列表
private readonly int _maxDegreeOfParallelism; // 最大并发数
private int _runningTasks = 0; // 当前运行的任务数
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken)
{
return _tasks.ToArray();
}
else
{
throw new NotSupportedException("无法获取任务列表");
}
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
protected override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_runningTasks < _maxDegreeOfParallelism)
{
_runningTasks++;
NotifyThreadPoolOfPendingWork();
}
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 在线程池线程上执行任务时,允许内联执行
if (Thread.CurrentThread.IsThreadPoolThread)
{
return TryExecuteTask(task);
}
return false;
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
Task task;
lock (_tasks)
{
if (_tasks.Count == 0)
{
_runningTasks--;
return;
}
task = _tasks.First.Value;
_tasks.RemoveFirst();
}
TryExecuteTask(task);
NotifyThreadPoolOfPendingWork();
}, null);
}
}
// 使用自定义 TaskScheduler
class Program
{
static void Main(string[] args)
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(2); // 最大并发任务数为 2
var factory = new TaskFactory(scheduler);
var tasks = new Task[5];
for (int i = 0; i < 5; i++)
{
int taskNum = i;
tasks[i] = factory.StartNew(() =>
{
Console.WriteLine($"Task {taskNum} 开始执行");
Thread.Sleep(1000); // 模拟任务工作
Console.WriteLine($"Task {taskNum} 执行完成");
});
}
Task.WaitAll(tasks); // 等待所有任务完成
}
}
非递归实现
private void NotifyThreadPoolOfPendingWork()
{
while (true)
{
Task task;
lock (_tasks)
{
if (_tasks.Count == 0)
{
_runningTasks--;
return;
}
task = _tasks.First.Value;
_tasks.RemoveFirst();
}
TryExecuteTask(task);
}
}
TryExecuteTaskInline 的作用
直接执行任务:
当任务调度器在处理任务时,如果检测到任务可以在当前线程中直接执行,而无需排队等待,它就会尝试调用 TryExecuteTaskInline 来立即执行这个任务。
避免上下文切换:
如果任务在当前线程中执行而不必在线程池的其他线程中排队等待,可以减少上下文切换,提高执行效率。
条件限制:
调用 TryExecuteTaskInline 需要满足某些条件,比如当前线程是否适合执行任务(例如,是否处于适合任务执行的上下文中),以及任务是否允许内联执行。
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 在线程池线程上执行任务时,允许内联执行
if (Thread.CurrentThread.IsThreadPoolThread)
{
return TryExecuteTask(task);
}
return false;
}
没有显式调用
TaskScheduler 的 TryExecuteTaskInline 方法通常由 Task 系统在内部调用,当任务的执行上下文(例如在当前线程上)允许内联执行时会被自动触发。
如果调用了 Task.Run 或 TaskFactory.StartNew 等方法,框架可能会先尝试调用 TryExecuteTaskInline,看看任务能否立即在当前线程中执行
在以下情况中,TryExecuteTaskInline 的使用是合理的:
性能优化:如果任务可以立即在当前线程中执行,而不需要排队或上下文切换,使用 TryExecuteTaskInline 可以提高性能。
避免死锁:在某些同步场景中,允许任务内联执行可以避免死锁情况的出现,尤其是在任务依赖于当前线程执行时。