首页 > 编程语言 >c# 异步进阶———— 自定义 taskschedule[三]

c# 异步进阶———— 自定义 taskschedule[三]

时间:2023-04-24 09:33:44浏览次数:31  
标签:task 自定义 c# lock queue 线程 taskschedule new public

前言

我们知道我们的task async 和 await 是基于线程池进行调度的。

但是async 和 await 也就是使用了默认的task调度,让其在线程池中运行。

但是线程池是榨干机器性能为本质的,但是有时候我们运行一些我们自己的需求,比如控制一下线程数(因为并不是线程数越高,就能有更高的性能),控制一下cpu使用,避免cpu使用太高。

正文

首先我们需要一个队列,因为我们需要让task进行保存到某个地方,这里选择队列,因为它简单,也一般符合我们先进先出(先到先运行)的想法。

public sealed class BlockingQueue<T>
{
	private readonly Queue<T> _queue = new Queue<T>();

	private readonly object _lock = new object();

	private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);

	public void Enqueue(T item)
	{
		lock (_lock)
		{
			_queue.Enqueue(item);
		}
	}

	public T Dequeue()
	{
		_pool.WaitOne();
		lock(_lock)
		{
			return _queue.Dequeue();
		}
	}
}

实现一个队列,那么希望是线程安全的,所以要给其进出加上lock。

同时希望,如果队列中为空的时候能够进行等待,不至于一直去轮询。

这里使用的是Semaphore,线程信号量,这个在后面会介绍到。

然后就到了实现线程池调度的时候:

public class TaskThreadPool : TaskScheduler, IDisposable
{
	private readonly BlockingQueue<Task> _queue = new BlockingQueue<Task>();

	private Thread[] _threads;
	private bool _disposed;
	private readonly object _lock = new object();

	public int ThreadCount { get; }

	public TaskThreadPool(int threadCount, bool isBackground = false)
	{
		if (threadCount < 1)
		{
			throw new ArgumentOutOfRangeException(nameof(threadCount), "Must be at least 1");
		}

		ThreadCount = threadCount;
		_threads = new Thread[threadCount];
		for (int i = 0; i < threadCount; i++)
		{
			_threads[i] = new Thread(ExcuteTasks)
			{
				IsBackground = false
			};
			_threads[i].Start();
		}
	}

	public Task Run(Action action) =>
		Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);

	private void ExcuteTasks()
	{
		while (true)
		{
			var task = _queue.Dequeue();
			if (task == null)
			{
				return;
			}

			TryExecuteTask(task);
		}
	}

	protected override IEnumerable<Task>? GetScheduledTasks()
	{
		return _queue.ToArray();
	}

	protected override void QueueTask(Task task)
	{
		_queue.Enqueue(task);
	}

	protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
	{
		if (_disposed)
		{
			throw new ObjectDisposedException(typeof(TaskThreadPool).FullName);
		}

		return !taskWasPreviouslyQueued && TryExecuteTask(task);
	}

	public void Dispose()
	{
		lock (_lock)
		{
			if (_disposed)
			{
				return;
			}

			_disposed = true;
		}
		
		for (int i = 0; i < _threads.Length; i++)
			_queue.Enqueue(null);

		foreach (var thread in _threads)
			thread.Join();

		_threads = null;
		_queue.Dispose();
	}
}

代码也很简单常规,就是初始化多少个线程作为线程池,然后Task排队运行就行了,记得要释放资源。

这里dispose让其他线程进行停止的信号为:_queue.Enqueue(null).

private void ExcuteTasks()
{
	while (true)
	{
		var task = _queue.Dequeue();
		if (task == null)
		{
			return;
		}

		TryExecuteTask(task);
	}
}

其他线程消费到null,那么就应该停止了。

然后有一个run方法,可以直接让action,放进来运行:

public Task Run(Action action) =>
		Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);

总结一下基本思路:

  1. 需要实现TaskScheduler,这样可以避免自己写一些任务运行的逻辑控制

  2. 因为使用了信号量,所以BlockingQueue,然后 TaskThreadPool 需要使用到BlockingQueue,所以需要加上IDispose

  3. 需要控制线程数,并在对象销毁的时候禁止新的task进入,运行完已经加入队列的任务

  4. 需要有一个run方法,这样对外提供方便

考虑到信号量的释放,那么也完善了blockingqueue:

public sealed class BlockingQueue<T> : IDisposable
{
	private readonly Queue<T> _queue = new Queue<T>();

	private readonly object _lock = new object();

	private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);

	public void Enqueue(T item)
	{
		lock (_lock)
		{
			_queue.Enqueue(item);
		}
	}

	public T Dequeue()
	{
		_pool.WaitOne();
		lock(_lock)
		{
			return _queue.Dequeue();
		}
	}

	public IEnumerable<T> ToArray()
	{
		return _queue.ToArray();
	}

	public void Dispose()
	{
		_pool.Dispose();
	}
}

简单写了一下自定义的线程池,上文中介绍到了Semaphore 这个信号量,下一节为Semaphore 的介绍和实现原理。

标签:task,自定义,c#,lock,queue,线程,taskschedule,new,public
From: https://www.cnblogs.com/aoximin/p/17324471.html

相关文章

  • 【VINKA原厂技术支持】电源供电系列高稳定性抗干扰VK36E4 脚位更少的四键感应触摸/4路
    1.概述VK36E4具有4个触摸按键,可用来检测外部触摸按键上人手的触摸动作。该芯片具有较高的集成度,仅需极少的外部组件便可实现触摸按键的检测。提供了4路直接输出功能。芯片内部采用特殊的集成电路,具有高电源电压抑制比,可减少按键检测错误的发生,此特性保证在不利环境条件的应用......
  • Acrel-2000系列监控系统的功能介绍
    安科瑞虞佳豪摘要:智能化配电监控系统是数字化和信息化时代应运而生的产物,已经被广泛应用于电网用户侧楼宇、体育场馆、科研设施、机场、交通、医院、电力和石化行业等诸多领域的高/低压变配电系统中。安科瑞自研的Acrel-2000系列监控系统可监控高压开关柜、低压开关柜、应急发电机......
  • SpringSecurity从入门到精通:简介
    SpringSecurity从入门到精通:简介SpringSecurity是Spring家族中的一个安全管理框架,想比另外一个安全框架Shiro,它提供了更丰富的功能,社区资源也比Shiro丰富一般来说中大型的项目都是使用SpringSecurity来做安全框架,小项目有Shiro的比较多,因为想比与SpringSecurity,Shiro的......
  • argc和argv学习
    转自:https://stackoverflow.com/questions/3024197/what-does-int-argc-char-argv-mean1.介绍argc:c是指count,传参个数,至少为1,表示执行的文件名;argv:v指vector,即传参向量。#include<iostream>intmain(intargc,char**argv){std::cout<<"Have"<<ar......
  • CA2 Evolutionary Computation
    CA2ContinuousAssignment-EvolutionaryComputation20221.Averageandmaximumpathlengthofgraphs.(a)Calculatetheaveragepathlengthofthefollowinggraphs(withNvertices):line,ring,star,fullyconnectedgraph.Showallyourreasoning.(You......
  • 手动拷贝文件至nextcloud中并扫描(docker)
    出于不能在nextcloud的web页面手动上传,需要写脚本实现自动上传文件至nextcloud的目的,直接在服务器上将文件手动拷贝到nextcloud目录:dockercp/data/filesnextcloud:/var/www/html/data/账号名/files但是一般人不这么做,一般挂载到-v卷岂不更好在nextcloud目录中我们可以这么扫......
  • Getselection能不能接受keyword?
    这个玩意绝对是个坑,CAD对Getselection的支持并不充分,需要通过keywordinput事件来弄,比较麻烦,而且很容易出问题。所以我的做法是,不使用,哈哈!下面这个是kean的代码:[CommandMethod("SELKW")]publicvoidGetSelectionWithKeywords(){Documentdoc=AcadApp.......
  • c#程序员必学清单补充
    作为C#程序员,除了上述经典书籍和开源框架外,还需要掌握以下技术:1..NETCore和ASP.NETCore:了解并熟练掌握.NETCore和ASP.NETCore框架,这将使您能够开发跨平台的Web应用程序和服务。2.EntityFrameworkCore:深入学习并掌握EntityFrameworkCore,这是一款功能强大的......
  • C#微服务必学清单
    在C#领域,有一些不错的微服务书籍和开源框架,对于学习微服务相关知识非常有帮助。以下是一些建议您阅读的微服务书目和开源框架。微服务书目:1.《BuildingMicroservices》(SamNewman):这本书详细介绍了微服务的基本概念、实践方法和优缺点,并通过实际案例分析来展示如何构建微服务......
  • HW2:classification
    HW2任务描述音位分类预测(Phonemeclassification),我们有音频->音位这样的训练数据,想要训练一个模型,学习这样的对应关系,然后给定音频,预测其音位音位音位(phoneme),是人类某一种语言中能够区别意义的最小语音单位,是音位学分析的基础概念。每种语言都有一套自己的音位系统。音频处......