首页 > 编程语言 >c# 异步进阶———— paralel [二]

c# 异步进阶———— paralel [二]

时间:2023-04-22 19:23:14浏览次数:34  
标签:进阶 c# state Replica ._ new paralel null replicator

前言

简单整理一下paralel,以上是并行的意思。

正文

我们在工作中常常使用task await 和 async,也就是将线程池进行了封装,那么还有一些更高级的应用。

是对task的封装,那么来看下paralel。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints, arg =>
	{
		Console.WriteLine(arg);
	});
	
	Console.Read();
}

可以看到结果是并行的。

那么来看下实现机制。

public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
{
	if (source == null)
	{
		throw new ArgumentNullException(nameof(source));
	}
	if (body == null)
	{
		throw new ArgumentNullException(nameof(body));
	}

	return ForEachWorker<TSource, object>(
		source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}

进行参数检验,然后交给了ForEachWorker。

这是一个基本的代码思路,就是复杂的方法中可以先校验参数,然后具体实现交给另外一个方法。

然后通过不同的类型,进行分类:

然后看下具体实现是什么?

进去看就是一个taskreplicator:

看下run在做什么。

public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
{
	int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;

	TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
	new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();

	Replica nextReplica;
	while (replicator._pendingReplicas.TryDequeue(out nextReplica))
		nextReplica.Wait();

	if (replicator._exceptions != null)
		throw new AggregateException(replicator._exceptions);
}
  1. 创建了一个taskreplictor,起到管理作用

  2. 然后创建了一个Replica,然后这个start 是关键

  3. 然后通过while,让每一个Replica 都运行完毕才推出,达到同步的效果

if (replicator._exceptions != null)
	throw new AggregateException(replicator._exceptions);

可以看一下这个,这个是一个比较好的技巧。如果一个运行管理,不用抛出异常,之间在管理中进行运行处理总结。

比如结果,异常等。

那么就看下这个start。

protected Replica(TaskReplicator replicator, int maxConcurrency, int timeout)
{
	_replicator = replicator;
	_timeout = timeout;
	_remainingConcurrency = maxConcurrency - 1;
	_pendingTask = new Task(s => ((Replica)s).Execute(), this);
	_replicator._pendingReplicas.Enqueue(this);
}

public void Start()
{
	_pendingTask.RunSynchronously(_replicator._scheduler);
}

将会运行Execute,是同步的,而不是异步的,也就是说第一个task将会运行在当前线程。

那么看Execute在做什么?

public void Execute()
{
	try
	{
		if (!_replicator._stopReplicating && _remainingConcurrency > 0)
		{
			CreateNewReplica();
			_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
		}

		bool userActionYieldedBeforeCompletion;

		ExecuteAction(out userActionYieldedBeforeCompletion);

		if (userActionYieldedBeforeCompletion)
		{
			_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
			_pendingTask.Start(_replicator._scheduler);
		}
		else
		{
			_replicator._stopReplicating = true;
			_pendingTask = null;
		}
	}
	catch (Exception ex)
	{
		LazyInitializer.EnsureInitialized(ref _replicator._exceptions).Enqueue(ex);
		if (_replicator._stopOnFirstFailure)
			_replicator._stopReplicating = true;
		_pendingTask = null;
	}
}

一段一段分析:

if (!_replicator._stopReplicating && _remainingConcurrency > 0)
{
	CreateNewReplica();
	_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
}

这里当_replicator 也就是任务复制器没有停止的时候。这里有两种情况会停止,一种是任务完成,一种是任务异常且设置参数异常时候停止。

_remainingConcurrency 指的是副本数,默认是int.max。

那么就复制一个副本。

protected override void CreateNewReplica()
{
	Replica<TState> newReplica = new Replica<TState>(_replicator, _remainingConcurrency, GenerateCooperativeMultitaskingTaskTimeout(), _action);
	newReplica._pendingTask.Start(_replicator._scheduler);
}

复制完副本后,那么就开始运行我们的action了。

protected override void ExecuteAction(out bool yieldedBeforeCompletion)
{
	_action(ref _state, _timeout, out yieldedBeforeCompletion);
}

这里传入了timeout,这个timeout并不是我们限制我们单个task的运行时间,而是当运行到一定时候后,这个task就停止运行,然后另外启动一个副本。

if (CheckTimeoutReached(loopTimeout))
{
	replicationDelegateYieldedBeforeCompletion = true;
	break;
}
if (userActionYieldedBeforeCompletion)
{
	_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
	_pendingTask.Start(_replicator._scheduler);
}
else
{
	_replicator._stopReplicating = true;
	_pendingTask = null;
}

这个是为了符合操作系统的调度思想,跑的越久的,基本上优先级会低些。

那么看下这个_action主要在做什么吧。

while (myPartition.MoveNext())
{
	KeyValuePair<long, TSource> kvp = myPartition.Current;
	long index = kvp.Key;
	TSource value = kvp.Value;

	// Update our iteration index
	if (state != null) state.CurrentIteration = index;

	if (simpleBody != null)
		simpleBody(value);
	else if (bodyWithState != null)
		bodyWithState(value, state);
	else if (bodyWithStateAndIndex != null)
		bodyWithStateAndIndex(value, state, index);
	else if (bodyWithStateAndLocal != null)
		localValue = bodyWithStateAndLocal(value, state, localValue);
	else
		localValue = bodyWithEverything(value, state, index, localValue);

	if (sharedPStateFlags.ShouldExitLoop(index)) break;

	// Cooperative multitasking:
	// Check if allowed loop time is exceeded, if so save current state and return.
	// The task replicator will queue up a replacement task. Note that we don't do this on the root task.
	if (CheckTimeoutReached(loopTimeout))
	{
		replicationDelegateYieldedBeforeCompletion = true;
		break;
	}
}

就是拉取我们的enumerator的数据,然后simpleBody(value),进行运行我们写的action。

总结一下,其实Parallel 核心就是一个任务复制器,然后创建多个副本,拉取我们的数据,进行执行我们设置的action。

里面的主要功能,Parallel做到了限制副本数,因为我们知道task并不是越多越好。

第二个,如果长时间运行,那么Parallel是做了优化的,当达到timeout的时候,那么会重新启动一个副本(可以理解为一个线程)

第三点,Parallel 有一个foreach 进行迭代器的处理,这里不仅仅是让任务可以并行。

而且具备c# foreach的基本功能。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints,    (arg, state)
		=>
	{
		if (state.IsStopped)
		{
			return;   
		}
		
		if (arg > 18)
		{
			state.Break();
		}
	});
	if (result.IsCompleted)
	{
		Console.WriteLine("完成");
	}
	Console.Read();
}

可以进行中断。

还有一个函数,那就是stop,这个stop 比break 停止的快,break 要记录出,最小中断位置。

而stop 就是立马停止下来。

在上述中,我们知道可以传递一个taskschedule进行,那么这个taskschedule 是干什么的,对我们的任务调度有什么影响呢? 下一节,自我实现taskschedule。

标签:进阶,c#,state,Replica,._,new,paralel,null,replicator
From: https://www.cnblogs.com/aoximin/p/17324467.html

相关文章

  • 可查的异常(checked exceptions)和不可查的异常(unchecked exceptions)区别?
    可查异常(编译器要求必须处置的异常):正确的程序在运行中,很容易出现的、情理可容的异常状况。可查异常虽然是异常状况,但在一定程度上它的发生是可以预计的,而且一旦发生这种异常状况,就必须采取某种方式进行处理。除了RuntimeException及其子类以外,其他的Exception类及其子类都属于......
  • 干货分享:用ChatGPT调教批量出Midjourney咒语,出图效率Nice ,附资料。
    Prompts就是AI绘图的核心竞争力。您是不是觉得用Midjourney生成的图不够完美?又让ChatGPT去生成Prompt,然后效果还不理想?其实ChatGPT你给他投喂资料后,经过调教的ChatGPT,生成的Prompt效果会很不错。文末附《一整套MidJourney指令大全》+《ChatGPTprompt指令大全》资料先看测试......
  • 最新解决chatgpt滥用问题
    写在前面:提示邮箱不可用,不能进入网页,提示滥用问题是因为代理不干净的原因,更换代理即可!注册sms-activate账号首先进去官网:传送门主页大概长这样进去之后点击右上角注册账号!这种操作比较简单通过QQ邮箱即可注册,这里有一点就是QQ邮箱可能会拦截该网站的验证信息:在验证过程中......
  • mybatis-plus使用聚合函数报错---------net.sf.jsqlparser.parser.ParseException: En
    错误日志: Causedby:net.sf.jsqlparser.parser.ParseException:Encounteredunexpectedtoken:"with""WITH"atline62,column20.Wasexpectingoneof:"&""::"";""<<&q......
  • mysql获取当前年月 mysql中replace into用法
    mysql获取当前年月 //1.获取年月日时分秒selectSYSDATE()AS'年月日时分秒';2020-07-0216:36:17//2.获取(年月日)selectDATE(CURDATE())as'年月日';selectCURDATE()as'年月日';selectcurrent_dateAS'年月日';2020-07-02//3.获取(时分秒):......
  • VirtualBox、Vagrant以及与Docker的区别
    VirtualBox和Vagrant都是虚拟化工具,用于在计算机上创建虚拟操作系统或虚拟开发环境。以下是它们的介绍:1.VirtualBox:VirtualBox是由Oracle公司开发的开源虚拟化软件,适用于Windows、Linux、Mac和其他操作系统。它允许用户在计算机上运行多个虚拟操作系统,这些操作系统以软件方式(......
  • BTC失守3万美元大关!加密市场迎来回调,原因何在?
       行情数据显示,比特币在一周内下跌8.48%,跌破了29000美元的关键支撑位,以太坊也丢失2000美元关口,未能延续上海升级后的涨势,飞速上升的加密市场似乎正迎来一波回调。   根据Coinglass数据,比特币在周四跌破29000美元的关键支撑位,导致在24小时内超过2.66亿美元的多头头寸被清算。......
  • C语言发展历程、第一个C程序、数据类型、常量变量、字符串
    一、C语言的发展过程计算机是硬件,能识别电信号,电信号有两种,正电和负电,转化成数字信号1/0,计算机只能识别二进制指令,二进制语言是最早的低级语言。通过查表使用,只有科学家掌握。后来人们用一串二进制数表示一个功能,这个就叫助记符,如10100001-ADD,这就是汇编语言。后来人们想能不能用......
  • Microsoft PowerPoint LTSC 2021 for Mac(ppt演示工具) v16.73 beta版
    MicrosoftPowerPointLTSC2021forMac是一款专业的幻灯片演示软件,适用于苹果电脑。是office LTSC2021套装中的一个组成部分,与Word、Excel和Outlook等其他应用程序一起提供。PowerPointLTSC2021具有许多易于使用的工具和功能,可以帮助用户创建具有吸引力的演示文稿。Microsoft......
  • #yyds干货盘点#区别WebSocket 与 Socket
    WebSocket是什么WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocketAPI中,浏览器和服务器只需要完成一次HTTP握手,两者之间就直接可以创建持久性的连接,并进行双向数......