首页 > 其他分享 >.NET 6 实现一个任务队列,且在不同线程中调用队列,队列始终都是串行执行

.NET 6 实现一个任务队列,且在不同线程中调用队列,队列始终都是串行执行

时间:2024-01-23 10:33:37浏览次数:32  
标签:executingTask Task 队列 await 任务 线程 async NET

在.NET 6中,要实现一个任务队列,确保队列中的任务始终串行执行,即使它们是由不同线程调用的,你可以使用Channel<T>结合Task.Run或者更简单地使用BlockingCollection<T>Task.Factory.StartNewasync/await模式。不过,为了保持代码的简洁性和现代性,我会推荐使用Channel<T>结合async/await

这里是一个简单的示例,演示如何使用Channel<Func<Task>>来实现这样一个串行执行的任务队列:

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class SerialTaskQueue
{
    private readonly Channel<Func<Task>> _channel;
    private Task _executingTask;
    private readonly SemaphoreSlim _semaphore = new(1, 1);

    public SerialTaskQueue()
    {
        _channel = Channel.CreateUnbounded<Func<Task>>(
            new UnboundedChannelOptions
            {
                SingleReader = true,
                SingleWriter = false,
                AllowSynchronousContinuations = false
            }
        );

        // 启动一个后台任务来处理队列中的任务
        _ = Task.Run(async () => await ProcessQueueAsync());
    }

    public async Task Enqueue(Func<Task> task)
    {
        if (task == null) throw new ArgumentNullException(nameof(task));

        await _channel.Writer.WriteAsync(task, CancellationToken.None);
    }

    private async Task ProcessQueueAsync()
    {
        while (await _channel.Reader.WaitToReadAsync())
        {
            while (_channel.Reader.TryRead(out var task))
            {
                // 使用SemaphoreSlim确保任务串行执行
                await _semaphore.WaitAsync();
                try
                {
                    // 如果当前没有执行的任务,或者当前任务已经完成了,那么启动一个新的任务
                    if (_executingTask == null || _executingTask.IsCompleted)
                    {
                        _executingTask = task();
                    }
                    else
                    {
                        // 否则,等待当前任务完成,然后执行下一个任务
                        await _executingTask;
                        _executingTask = task();
                    }
                }
                finally
                {
                    _semaphore.Release();
                }
            }

            // 确保我们等待当前执行的任务完成
            if (_executingTask != null)
            {
                await _executingTask;
            }
        }
    }
}

// 使用示例
class Program
{
    static async Task Main()
    {
        var serialQueue = new SerialTaskQueue();

        // 模拟从不同线程添加任务到队列
        Task.Run(() => EnqueueTasks(serialQueue, "Task A"));
        Task.Run(() => EnqueueTasks(serialQueue, "Task B"));

        // 等待任务执行完成
        await Task.Delay(TimeSpan.FromSeconds(5));
    }

    static async Task EnqueueTasks(SerialTaskQueue queue, string taskName)
    {
        for (int i = 0; i < 3; i++)
        {
            int localI = i;
            await queue.Enqueue(async () =>
            {
                Console.WriteLine($"{taskName} {localI} is running on thread {Thread.CurrentThread.ManagedThreadId}.");
                await Task.Delay(1000); // 模拟工作
                Console.WriteLine($"{taskName} {localI} is completed.");
            });
        }
    }
}

请注意,上述代码中有几个关键点:

  1. 使用Channel<Func<Task>>作为任务队列,它支持异步操作,并且配置为单读者多写者模式。
  2. 使用SemaphoreSlim来确保同时只有一个任务在执行,即使ProcessQueueAsync方法可能在Enqueue方法被多个线程同时调用时尝试启动新任务。
  3. _executingTask变量用于跟踪当前正在执行的任务,以便我们可以在其完成后启动下一个任务。
  4. Enqueue方法允许外部代码将任务添加到队列中,而ProcessQueueAsync方法则负责处理队列中的任务。

然而,上面的代码片段实际上不是完全正确的串行执行实现,因为它可能会在_executingTask上同时执行多个任务。我们需要稍微修改一下ProcessQueueAsync方法,以确保它不会同时启动多个任务。

下面是修正后的ProcessQueueAsync方法:

private async Task ProcessQueueAsync()
{
    while (await _channel.Reader.WaitToReadAsync())
    {
        while (_channel.Reader.TryRead(out var task))
        {
            // 等待上一个任务完成(如果有的话)
            if (_executingTask != null)
            {
                await _executingTask;
            }

            // 现在启动新任务
            _executingTask = task();

            // 等待当前任务完成
            await _executingTask;

            // 重置_executingTask以便下一个循环可以检测它是否为null
            _executingTask = null;
        }
    }
}

现在,修正后的ProcessQueueAsync方法确保了任务会一个接一个地串行执行,即使它们是由不同线程添加到队列中的。

标签:executingTask,Task,队列,await,任务,线程,async,NET
From: https://www.cnblogs.com/donethome/p/17981781

相关文章

  • RabbitMq批量删除队列
    RabbitMq批量删除队列​ 由于部分公司同事使用RabbitMq时,没有将Client设置为autodelete,导致大量冗余队列。其中这些队列又是无routekey队列,收到了批量的订阅消息,占用服务器内存。​ 如何将这些无用的队列删除成为一个问题?经过多次摸索,在rabbitmqmanagementapi里面找到了方案:u......
  • hivesql报错:java.net.SocketTimeoutException: Read timed out
    场景:在dolphin里面执行hivesql脚本,报错java.net.SocketTimeoutException:Readtimedout解决方式:hive.metastore.client.socket.timeout=1000s 扩展:"Hive报Readtimedout"错误可能也与YARN的资源限制有关。可以通过增加YARN的配置参数来解决此错误。<property><name......
  • 兴达易控EtherCAT主站转Profinet网关超级推荐
    兴达易控EtherCAT主站转Profinet网关超级推荐网关XD-ETHPNM20为EtherCAT主站转Profinet从站的协议网关,两种工业实时以太网网络之间双向传输IO数据。适用于具有EtherCAT协议网络与Profinet协议网络跨越网络界限进行数据交换的解决方案。网关的第一通讯接口作为简单IO设备集成在......
  • Failed to create CoreCLR, HRESULT: 0x80070008--.net core 8 run in docker
    这几天计划打算上传微服务网关和微服务注册中心的docker镜像:Taurus.Gateway、Taurus.RegistryCenter却发现dockerhub在国内无法使用了。目前使用阿里云个人免费版来上传镜像。用.netcore8打包本地测试正常后,上传阿里云,再从虚拟机测试下载运行时出现以下问题:Failedtocre......
  • .NET Framework中关于WPF的更新信息
    .NETFramework3.0版中的新增功能更新:2007年11月单独发布.NETFramework3.0版是为了在.NETFramework和Windows软件开发包(SDK)中包含以下技术。WindowsCommunicationFoundationWindowsPresentationFoundationUnderstandingWindowsWorkflowFoundation......
  • STL-stack和queue堆栈和队列
    STL-stack和queue堆栈和队列目录STL-stack和queue堆栈和队列堆栈和队列特性堆栈主要操作构造函数主要操作栈顶插入和删除大小相关简单案例队列的主要操作构造函数大小相关索引访问入队/出队优先队列priority_queue初始化构造小顶堆自定义结构体排序参考资料堆栈和队列特性stack......
  • STL-deque双端队列
    STL-deque双端队列目录STL-deque双端队列创建初始化插入元素删除元素遍历容器函数总览deque和vector参考资料deque是double-endedqueue的缩写,又称双端队列容器,可以对其两段的数据进行操作,因为它没有capacity属性,因此不会像vector那样”旧空间不足而重新配置一块更大空间,然后......
  • 线程池
    目录简单的线程池简单的线程池在C++11中,你可以使用 std::thread 和 std::mutex 等标准库来实现一个简单的线程池。以下是一个简单的示例代码:#include<iostream>#include<vector>#include<thread>#include<mutex>#include<queue>classThreadPool{private:......
  • VUE框架CLI组件化配置Router路由局部守卫path和componet和router完整项目实现------VU
    <template><div><!--组件分为普通组件和路由组件--><divclass="s2"><h2>县区</h2><ul><!--query形式接收--><!--<li>{{$route.......
  • java线程池-1
    1.概述Java线程的创建非常昂贵,需要JVM和OS(操作系统)配合完成大量的工作:必须为线程堆栈分配和初始化大量内存块,其中包含至少1MB的栈内存。需要进行系统调用,以便在OS(操作系统)中创建和注册本地线程。Java高并发应用频繁创建和销毁线程的操作将是非常低效的,而且是不被......