在.NET 6中,要实现一个任务队列,确保队列中的任务始终串行执行,即使它们是由不同线程调用的,你可以使用Channel<T>
结合Task.Run
或者更简单地使用BlockingCollection<T>
与Task.Factory.StartNew
或async/await
模式。不过,为了保持代码的简洁性和现代性,我会推荐使用Channel<T>
结合async/await
。
这里是一个简单的示例,演示如何使用Channel<Func<Task>>
来实现这样一个串行执行的任务队列:
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
public class SerialTaskQueue
{
private readonly Channel<Func<Task>> _channel;
private Task _executingTask;
private readonly SemaphoreSlim _semaphore = new(1, 1);
public SerialTaskQueue()
{
_channel = Channel.CreateUnbounded<Func<Task>>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
}
);
// 启动一个后台任务来处理队列中的任务
_ = Task.Run(async () => await ProcessQueueAsync());
}
public async Task Enqueue(Func<Task> task)
{
if (task == null) throw new ArgumentNullException(nameof(task));
await _channel.Writer.WriteAsync(task, CancellationToken.None);
}
private async Task ProcessQueueAsync()
{
while (await _channel.Reader.WaitToReadAsync())
{
while (_channel.Reader.TryRead(out var task))
{
// 使用SemaphoreSlim确保任务串行执行
await _semaphore.WaitAsync();
try
{
// 如果当前没有执行的任务,或者当前任务已经完成了,那么启动一个新的任务
if (_executingTask == null || _executingTask.IsCompleted)
{
_executingTask = task();
}
else
{
// 否则,等待当前任务完成,然后执行下一个任务
await _executingTask;
_executingTask = task();
}
}
finally
{
_semaphore.Release();
}
}
// 确保我们等待当前执行的任务完成
if (_executingTask != null)
{
await _executingTask;
}
}
}
}
// 使用示例
class Program
{
static async Task Main()
{
var serialQueue = new SerialTaskQueue();
// 模拟从不同线程添加任务到队列
Task.Run(() => EnqueueTasks(serialQueue, "Task A"));
Task.Run(() => EnqueueTasks(serialQueue, "Task B"));
// 等待任务执行完成
await Task.Delay(TimeSpan.FromSeconds(5));
}
static async Task EnqueueTasks(SerialTaskQueue queue, string taskName)
{
for (int i = 0; i < 3; i++)
{
int localI = i;
await queue.Enqueue(async () =>
{
Console.WriteLine($"{taskName} {localI} is running on thread {Thread.CurrentThread.ManagedThreadId}.");
await Task.Delay(1000); // 模拟工作
Console.WriteLine($"{taskName} {localI} is completed.");
});
}
}
}
请注意,上述代码中有几个关键点:
- 使用
Channel<Func<Task>>
作为任务队列,它支持异步操作,并且配置为单读者多写者模式。 - 使用
SemaphoreSlim
来确保同时只有一个任务在执行,即使ProcessQueueAsync
方法可能在Enqueue
方法被多个线程同时调用时尝试启动新任务。 _executingTask
变量用于跟踪当前正在执行的任务,以便我们可以在其完成后启动下一个任务。Enqueue
方法允许外部代码将任务添加到队列中,而ProcessQueueAsync
方法则负责处理队列中的任务。
然而,上面的代码片段实际上不是完全正确的串行执行实现,因为它可能会在_executingTask
上同时执行多个任务。我们需要稍微修改一下ProcessQueueAsync
方法,以确保它不会同时启动多个任务。
下面是修正后的ProcessQueueAsync
方法:
private async Task ProcessQueueAsync()
{
while (await _channel.Reader.WaitToReadAsync())
{
while (_channel.Reader.TryRead(out var task))
{
// 等待上一个任务完成(如果有的话)
if (_executingTask != null)
{
await _executingTask;
}
// 现在启动新任务
_executingTask = task();
// 等待当前任务完成
await _executingTask;
// 重置_executingTask以便下一个循环可以检测它是否为null
_executingTask = null;
}
}
}
现在,修正后的ProcessQueueAsync
方法确保了任务会一个接一个地串行执行,即使它们是由不同线程添加到队列中的。