第23章 并行编程
23.1 选择 PFX 的原因
服务器应用发挥多核心的优势十分容易(服务器应用可以通过每一个线程独立处理客户端的请求)。桌面程序发挥多核优势则比较困难了,我们通常需要对计算密集型的代码进行如下的处理:
- 将代码划分为多个小块;
- 通过多线程并行执行这些小块代码;
- 以线程安全和高效的方式在计算完毕后整理出最终的结果。
传统的多线程执行上述功能难度很高(尤其是划分和整理),并且我们需要通过锁来保证线程安全,这回造成大量竞争。
PFX 库为此而生。
Tips
并行编程(parallel programming):通过编程发挥多核或多处理器优势的方式。它是多线程编程的子集。
23.1.1 PFX 的概念
线程间划分工作的策略有两种:
- 数据并行(data parallelism):令每一个线程以相同的方式处理一部分数据。
- 任务并行(task parallelism):划分任务,每个线程处理不同的任务。
一般来说,数据并行更为简单,且在高度并行化的硬件条件下更容易扩展。理由如下:
-
这种方式降低或彻底消除了共享数据(从而解决了竞争和线程安全问题);
-
通常数据比分散的任务要多得多,数据并行可以应对这一点;
-
数据并行有助于实现结构化并行(structured parallelism)
即并行工作单元在程序中的启动和结束都是一致的
23.1.2 PFX 组件 #delay#
23.1.3 使用 PFX 的场合
PFX 主要用于并行编程,即利用多核处理器来加速计算密集型代码的执行速度。
根据 Amdahl(系统中对某一部件采用更快执行方式所能获得的系统性能改进程度,取决于这种执行方式被使用的频率,或所占总执行时间的比例。) 定律,并行化带来的性能改进幅度,取决于必须顺序执行的代码的占比。
因此在处理前应评估是否有必要,可以从如下几个方面考虑:
- 确定当前瓶颈是否可以并行化;
- 考虑代码是否真正需要密集计算;
我们可将要并行的问题分为两类:
- 易并行问题:可以非常简单的划分为可独立执行的任务,例如多种图像处理任务、光线追踪、暴力破解数学/密码问题;
- 不易并行问题:例如快速排序算法的优化版本
23.2 PLINQ
PLINQ 提供了如下几个方法用于多线程并行:
AsParallel()
方法
该方法是 System.Linq.ParallelEnumerable
类的扩展方法,将输入包装为 ParallelQuery<TSource>
的派生类,后续的查询由 ParallelEnumerable
定义的另一套扩展方法执行。
上述扩展方法为每一种标准查询运算符提供了并行化实现。基本上,它们的工作原理都是将输入序列划分为小块,并将每一块在不同的线程上执行,并将执行结果整理为一个输出序列以供使用(即数据并行)。
C7.0 核心技术指南 第7版.pdf - p949 - C7.0 核心技术指南 第 7 版-P949-20241206111932-veaon3x
下面是该方法的一个简单用例,用于获取 100,000 内的所有素数:
IEnumerable<int> numbers = Enumerable.Range(3, 100_000 - 3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
AsSequential()
方法
该方法将 ParallelQuery
序列包装解除,后续的查询在标准查询运算符上顺序执行。
其他注意事项
对于接受两个输入序列的运算符(Join、GroupJoin、Concat、Union、Intersect、Except、Zip),两个序列均需要调用 AsParallel()
方法,上述 PLINQ 运算符将输出 ParallelQuery
序列,因此查询期间无需二次调用该方法。
此外,也不应该重复调用 AsParallel()
,这会导致查询强制合并、重新划分,从而降低效率。
并非所有查询运算符都可以高效并行化。对于不能并行化的运算符,PLINQ 将会顺序执行。此外,如果有迹象表明并行比顺序执行开销更大,PLINQ 也可能选择顺序执行。
PLINQ 仅适用于本地集合,不支持 LINQ to SQL 和 EF。
23.2.1 并行执行的特性
与普通的 LINQ 查询相同,PLINQ 也是延迟计算的。不过在枚举结果时,二者稍有不同:
-
LINQ:只有枚举到该元素,才执行相应的查询方法
-
PLINQ:会使用独立线程提前查询该元素,将查询结果保存在小缓冲区中
如果消费者暂停/停止枚举,则查询处理器也暂停/停止,避免 CPU 时间和内存的浪费
在 AsParallel()
之后调用 WithMergeOptions()
方法可以调节 PLINQ 的缓冲行为,它有三个选项:
-
AutoBuffered
:默认值,使用小的缓冲区,一般会得到最佳的总体结果 -
NotBuffered
:完全禁用缓冲,通过该选项可以尽快看到结果 -
FullyBuffered
:将结果展现之前全部缓冲
OrderBy
、Reverse
以及元素相关的操作、具合操作、转换运算符使用该方式工作
Tips
我用 Benchmark 实际测试上述三者性能没啥差距。
23.2.2 PLINQ 与顺序
PLINQ 无法像 LINQ 那样保持序列的原始顺序。如果需要保持原始顺序,需要在 AsParallel()
方法后调用 AsOrdered()
方法:
myCollection.AsParallel().AsOrdered()...
调用 AsOrdered()
后 PLINQ 需要追踪每个元素的原始位置,当元素数量巨大时会影响性能。调用 AsUnodered()
可以在稍后的查询中抵消 AsOrdered()
的效果。假设我们要在前两次查询中保持顺序,可以用如下写法:
inputSequence.AsParallel().AsOrdered()
.QueryOperator1()
.QueryOperator2()
.AsUnordered() // 自此开始,元素不一定按照原始顺序,程序性能更好
.QueryOperator3()
...
23.2.3 PLINQ 的限制
PLINQ 中的部分方法无法并行执行。
以下查询运算符因需要元素索引,因此会阻止查询并行化:
-
Select
、SelectMany
、ElementAt
以下运算符可以并行化,但由于划分策略复杂,在某些情况下比顺序处理还慢:
-
Join
、GroupBy
、GroupJoin
、Distince
、Union
、Intersect
、Except
还有一些特殊方法 PLINQ 提供了专门的重载:
-
Aggregate
见23.2.8.3 优化自定义聚合(Aggregate)
所有其他运算都是可以并行化的,但这些运算符并不保证一定并行执行(PLINQ 会自主判断是否并行执行)。若要强制并行化,可在调用 AsParallel()
后调用 WithExecutionMode()
方法,并传入 ParallelExecutionMode.ForceParallelism
参数:
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
23.2.4 示例:并行拼写检查器
Info
本文的词典可在 albahari.com/ispell/allwords.txt 下载得到。
现在,假设我们有一个词典,该字典加载至 HashSet<string>
中:
var wordLookup = new HashSet<string>(
File.ReadAllLines("allwords.txt"),
StringComparer.InvariantCultureIgnoreCase);
我们将其拷贝、打乱:
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range(0, 1_000_000)
.Select(i => wordList[random.Next(0, wordList.Length)])
.ToArray();
并修改其中两个单词为错误内容:
wordsToTest[12345] = "woozsh";
wordsToTest[23456] = "wubsie";
我们可以通过 PLINQ 查找这两个错误的单词,借助“并行执行”,提升速度:
var query = wordsToTest
.AsParallel()
.Select((word, index) => new IndexedWord { Word = word, Index = index})
.Where(iword => !wordLookup.Contains(iword.Word))
.OrderBy(iword => iword.Index);
struct IndexedWord
{
public string Word { get; set; }
public int Index { get; set; }
}
Notice
上述示例我们定义了一个 struct,而非使用匿名类型。这是为了借助值类型在“栈”上分配的特点。基于栈的分配是高度并行化的,基于堆的分配会导致数据竞争(在同一个堆上),它们共用一个内存管理器和垃圾回收器托管。
23.2.4.1 使用 ThreadLocal<T>
Info
ThreadLocal<T>
的使用见 22.8.2 ThreadLocal类
前文提到的打乱数据也可以并行处理:
var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range(0, 1_000_000) .Select(i => wordList[random.Next(0, wordList.Length)]) .ToArray();
最大的问题是 Random.Next()
方法不是线程安全的。使用 ThreadLocal<Random>
我们可以进行如下优化:
var localRandom = new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range(0, 1_000_000)
.Select(i => wordList[localRandom.Value!.Next(0, wordList.Length)])
.ToArray();
何时使用 PLINQ
“找到现有的 LINQ 查询,并将其并行化”这种想法看起来诱人,实际并不现实。因为大多数问题 LINQ 本身就是最佳方案。
更好的策略是“找到大量使用 CPU 的瓶颈并考虑是否可以将其转换为 LINQ 查询”
23.2.5 纯函数
为了实现最佳性能,任何查询运算符调用的方法都应当是线程安全的,它们不应当更新字段或者属性的值(调用的方法需要没有副作用,或者是纯函数)。如果使用锁实现线程安全性,则查询的并行性就会受到限制。
以如下代码为例,其外部变量 i
无法保证有序自增:
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
此时应当使用带有索引版本的 Select
重新编写查询:
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
23.2.6 设置/更改并行级别
默认情况下 PLINQ 会根据处理器的情况选择最合适的并行级别。对于 I/O 密集型的操作(如网页下载)可以将并行数目增加到内核数量之上,我们可以在 AsParallel()
方法之后调用 WithDegreeOfParallelism()
方法修改设置:
...AsParallel().WithDegreeOfParallelism(4)...
在一个 PLINQ 查询中只能调用一次 WithDegreeOfParallelism()
方法。如果需要二次设置并行级别,需再次调用 AsParallel()
方法,强制合并、并重新进行划分:
"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3) // Forces Merge + Partition
.Select (c => char.ToUpper (c))
Warn
和 Task 不同,PLINQ 执行 I/O 密集型作业时一定会阻塞线程,因此 I/O 密集型操作不推荐使用 PLINQ,
23.2.7 取消操作
foreach 循环中的取消操作
在 foreach 循环中消费 PLINQ 查询的结果,只需跳出 foreach 循环查询就会自动取消,因为其枚举器会被隐式销毁。
转换、元素操作、聚合操作的终止查询
其取消方式和 Task
的取消相似:
-
在调用
AsParallel()
方法之后,调用 WithCancellation()
方法; -
上述方法传入
CancellationTokenSource
的 Token
属性; -
另一个线程可调用
CancellationTokenSource
的 Cancel()
方法查询的消费端会抛出
OperationCanceledException
异常。
与 Task
相同,PLINQ 不会强制终止线程,它会等待线程处理完当前元素之后再中止查询。
下面是一个简单的使用:
IEnumerable<int> tenMillion = Enumerable.Range (3, 10_000_000);
var cancelSource = new CancellationTokenSource();
cancelSource.CancelAfter (100); // Cancel query after 100 milliseconds
var primeNumberQuery =
from n in tenMillion.AsParallel().WithCancellation(cancelSource.Token)
where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
select n;
try
{
// Start query running:
int[] primes = primeNumberQuery.ToArray();
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine("Query canceled");
}
23.2.8 PLINQ 优化
23.2.8.1 输出端优化
PLINQ 可以很方便的将并行化工作的结果整理到一个输出序列中。但有时我们只是想并行执行某个方法,不需要输出:
foreach (int n in parallelQuery)
DoSomething (n);
此时我们可以使用 PLINQ 的 ForAll()
方法提高效率。该方法会在每一个 ParallelQuery 的输出元素上运行一个委托,该委托会直接嵌入 PLINQ 内部,跳过整理和枚举步骤。以下是一个常见示例:
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
C7.0 核心技术指南 第7版.pdf - p956 - C7.0 核心技术指南 第 7 版-P956-20241208153026-q4fpaik
23.2.8.2 输入端优化
PLINQ 划分输入元素的策略有三种:
策略 | 元素分配 | 相关性能 |
---|---|---|
块划分 | 动态 | 平均水平 |
范围划分 | 静态 | 不佳到极佳 |
散列划分 | 静态 | 不佳 |
这三种划分策略对应了不同存储方式的数据。
23.2.8.2.1 散列划分
对于需要进行元素比较的查询运算符(GroupBy
、Join
、GroupJoin
、Intersect
、Except
、Union
、Distinct
),总是使用散列划分。PLINQ 会保证散列码相同的元素在同一线程上进行处理,因此效率较低。
23.2.8.2.2 范围划分
如果输入序列是有索引的(如数组或 ILIst<T>
的子类),PLINQ 会选择范围划分。PLINQ 会按索引均分数据,保证每个线程分配同等数量的元素,从而避免输入序列的竞争。如果序列中的数据大小不一,会导致线程占用不均。因此它的性能表现是不定的,可能不佳,可能极佳:
C7.0 核心技术指南 第7版.pdf - p958 - C7.0 核心技术指南 第 7 版-P958-20241208154534-16dxnll
我们可以通过如下方法强制使用范围划分:
-
使用
ParallelEnumerable.Range()
方法代替Enumerable.Range()
方法该方法不是
Enumerable.Range().AsParallel()
的快捷写法,它通过激活范围划分策略改善了查询的性能 -
在输入序列上简单调用
ToList()
或 ToArray()
方法这种方法将显著影响性能,使用时需考虑这点造成的性能影响
下面是 ParallelEnumerable.Range()
的简单用例:
ParallelEnumerable.Range(1, 10000000).Sum(i => Math.Sqrt(i))
Tips
范围划分策略不一定会以连续块的形式分配元素范围,相反,它可能会选择一种“条纹式”的策略。例如,如果有两个工作线程,则很可能一个工作线程处理奇数编号的元素,而另一个工作线程处理偶数编号的元素。
TakeWhile
运算符基本上会使用条纹式策略以避免对后续元素进行不必要的处理。
23.2.8.2.3 块划分
如果数据不满足上述两种划分方式,将使用“块划分”。块划分令每个线程动态取用数据进行处理,保证各个线程保持同等忙碌状态:
C7.0 核心技术指南 第7版.pdf - p958 - C7.0 核心技术指南 第 7 版-P958-20241208162159-j31tiel
不过它也有劣势:从输入获取“块”时需要进行同步(通常是排它锁),这可能导致一些开销和竞争。
若要强制使用块划分,需要调用 Partitioner.Create()
方法对输入序列进行包装,其第二个参数指示是否对查询进行负载均衡处理,这正是块划分的表示方式:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (...)
23.2.8.3 优化自定义聚合(Aggregate
)
我们在 LINQ 的 Aggregate
有提到:
无种子聚合方法要求传入的委托满足交换性和结合性,否则计算结果将出现不一致。例如如下代码计算结果分别为 27 29:
此外,LINQ to Object 和 PLINQ 的计算逻辑也有不同。对于
f(total, n) => total + n * n
:
PLINQ 的 Aggregate
和 23.3.2.4 使用本地值进行优化提到的 For、ForEach 循环相似,它也是先对子集求和,再对子集的和求和。PLINQ 的 Aggregate
的重载需要 4 个委托:
-
seedFactory
委托:用于返回各线程的起始种子值 -
updateAccumulatorFunc
委托:子集求和 -
combineAccumulatorsFunc
委托:对子集求和的结果求和 -
resultSelector
委托:对最终结果进行加工
下面是一个简单的例子:
numbers.Aggregate (
() => 0, // seedFactory
(localTotal, n) => localTotal + n, // updateAccumulatorFunc
(mainTot, localTot) => mainTot + localTot, // combineAccumulatorsFunc
finalResult => finalResult); // resultSelector
23.3 Parallel
类
23.3.1 Parallel.Invoke()
方法
Parallel.Invoke()
方法执行一组 Action
委托并等待完成。
它有如下特点:
-
Parallel.Invoke()
会将大量元素划分为若干批次,并分派给底层 Task 执行,不会单纯为每个委托创建独立的 Task因此将一百万个
Action
传递给Parallel.Invoke()
方法它仍能有效工作。 -
所有
Parallel
方法都需要自行整理结果,需要注意线程安全以如下代码为例,就存在线程不安全。一般使用线程安全的集合(如
ConcurrentBag
)解决此问题:var data = new List<string>(); Parallel.Invoke ( () => data.Add(new WebClient().DownloadString("http://www.foo.com")), () => data.Add(new WebClient().DownloadString("http://www.far.com")));
-
可以传入
ParallelOptions
实例设置并行参数它可以插入取消令牌、限制最大并发数目、指定自定义的任务调度器
一个简单的示例如下:
Parallel.Invoke (
() => new WebClient().DownloadFile("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile("http://microsoft.com", "ms.html"));
Warn
和 PLINQ 一样,
Parallel
中的方法是针对计算密集型任务的。和 Task 不同,PLINQ 执行 I/O 密集型作业时一定会阻塞线程,因此 I/O 密集型操作不推荐使用 PLINQ,
23.3.2 Parallel.For()
方法和 Parallel.ForEach()
方法
这两个方法等价于 C# 中的 for
和 foreach
循环,不同的是每次迭代都是并行执行的。如下是这两个方法最简单的声明:
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
下面是几个普通循环及其对应的并行循环代码:
for (int i = 0; i < 100; i++)
Foo (i);
Parallel.For (0, 100, i => Foo (i));
// or
Parallel.For (0, 100, Foo);
foreach (char c in "Hello, world")
Foo (c);
Parallel.ForEach ("Hello, world", Foo);
23.3.2.1 外层循环与内层循环
Parallel.For()
和 Parallel.ForEach()
通常在外层循环的效果更好,因为外层循环有更大的工作块,可以降低管理开销。
通常,同时将内层和外层并行化是没必要的,以如下代码为例,除非有超过 100 个核心的处理器,否则不会受益:
Parallel.For (0, 100, i =>
{
Parallel.For (0, 50, j => Foo (i, j)); // 普通的 for 循环会有更好的表现
});
23.3.2.2 包含索引的 Parallel.ForEach()
方法
有时 foreach 中的索引用处很大,以如下普通循环为例,它获取索引的方式并不适用于 Parallel.ForEach()
:
int i = 0;
foreach (char c in "Hello, world")
Console.WriteLine (c.ToString() + i++);
Parallel.ForEach()
提供了相应的重载,其 Action
的第三个参数用于获取索引:
public static ParallelLoopResult ForEach<TSource> (
IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
以前文的查询错误单词的代码为例,改为使用 Parallel.ForEach()
则有:
var misspellings = new ConcurrentBag<Tuple<int, string>>();
Parallel.ForEach(wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains(word))
misspellings.Add(Tuple.Create((int)i, word));
});
与 PLINQ 不同之处在于,我们需要线程安全的集合将数据收集起来,但是这种方式比 PLINQ 的 Select
执行效率更高。
23.3.2.3 ParallelLoopState
提前跳出循环
并行的 For、ForEach 的循环体是一个委托,无法通过 break 语句结束循环。我们可以通过 ParallelLoopState
对象的 Break()
方法或 Stop()
方法跳出或结束循环:
public class ParallelLoopState
{
public void Break();
public void Stop();
public bool IsExceptional { get; }
public bool IsStopped { get; }
public long? LowestBreakIteration { get; }
public bool ShouldExitCurrentIteration { get; }
}
下面是一个简单的用例:
Parallel.ForEach("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write(c);
});
二者的区别如下:
-
Break()
:完成当前这轮工作后,已经在执行的工作会继续执行,但是不再创建新线程执行后续循环; -
Stop()
:中断正在执行的线程,并不再创建新线程执行后续循环
Info
具体的比较,可见此文:Parallel Stop 与 Break 的深入解析 - 灰信网(软件开发博客聚合)
防止文章丢失,如下是比较使用的代码:
Parallel.For(1, 13, new ParallelOptions { MaxDegreeOfParallelism = 4 }, (i, loopState) => { Console.WriteLine(i); if (i % 3 == 0) { loopState.Stop(); // 此处可更换为 Break() 比较二者差别 Console.WriteLine($"Selected Number:{i}"); } });
23.3.2.3.1 ParallelLoopState
的其他成员
-
ShouldExitCurrentIteration
属性:如果循环遇到了Break()
、Stop()
、出现取消请求、抛出异常,该属性会变成 true。我们可以在循环体中判断该属性,确定循环是否已被取消。 -
IsExceptional
属性:用于指示其他线程是否出现异常。任何未处理的异常都会导致循环的所有线程在当前迭代完成后停止。要避免循环中止,需要在代码中显式处理异常。
-
LowestBreakIteration
属性:循环跳出(break)时,最后一个迭代到的元素。如果使用Stop()
跳出循环,则该属性值为 null
。该属性为
Nullable<long>
类型,借此我们可以判断循环是被Stop()
打断的还是Break()
跳出的。
23.3.2.3.2 Parallel.For()
、Parallel.ForEach()
的返回值
这两个方法的返回值为 ParallelLoopResult
对象,它只有两个属性:
-
IsCompleted
属性:表明循环是否已经结束 -
LowestBreakIteration
属性:循环跳出(break)时,最后一个迭代到的元素。如果使用Stop()
跳出循环,则该属性值为 null
。
23.3.2.4 使用本地值进行优化
假设我们通过 Parallel.For()
方法实现这样的功能:对 1~10,000,000 之间数字的平方根求和。平方根的并行化计算很容易,求和却稍有麻烦:我们需要通过锁防止数据竞争:
object locker = new object();
double total = 0;
Parallel.For(1, 10000000,
i => { lock (locker) total += Math.Sqrt(i); });
而锁会消除并行化的优势。不过 Parallel.For()
和 Parallel.ForEach()
提供了相应的重载方法,接受 TLocal
泛型参数,其最简单的形式如下:
public static ParallelLoopResult For <TLocal> (
int fromInclusive,
int toExclusive,
Func <TLocal> localInit,
Func <int, ParallelLoopState, TLocal, TLocal> body,
Action <TLocal> localFinally);
它的工作原理是:各个线程先计算出自己的数据和,最后交由汇总委托进行结果计算。
-
localInit
委托:用于初始化 TLocal
的值 -
body
委托:循环体,它的返回值是 TLocal
类型,该循环体计算自己线程的数据结果 -
localFinally
委托:将各个线程循环体计算的结果进行汇总
前面的代码优化后如下,虽然仍需要锁定,但只在最终合并结果时锁定,整个过程更加高效:
object locker = new object();
double grandTotal = 0;
Parallel.For(1, 10000000,
() => 0.0, // 初始化本地值
(i, state, localTotal) =>
localTotal + Math.Sqrt(i), // 循环体,该方法返回本地值的和
localTotal =>
{ lock (locker) grandTotal += localTotal; } // 将本地值求和.
);
Tips
上述场景使用 PLINQ 更加高效、简洁:
ParallelEnumerable.Range (1, 10000000) .Sum (i => Math.Sqrt (i))
23.4 任务并行
23.4.1 创建并启动任务
23.4.1.1 指定状态对象
Task.Factory.StartNew()
的方法方法签名之一如下:
Task StartNew(Action<object?> action, object? state)
此处的 state
参数是状态对象,类似于 Thread.Start(object? parameter)
,它会传递给目标方法:
var task = Task.Factory.StartNew(Greet, "Greeting");
task.Wait();
void Greet(object state)
{
Console.Write(state); // 输出 Greeting
}
实际上,该 state 对象不仅会传递给目标方法,还会赋值给 Task.AsyncState
属性。我们可以利用这一点为任务指定名称,在之后使用该属性查询相应的任务:
var task = Task.Factory.StartNew(state => Greet("Hello"), "Greeting");
task.AsyncState.Dump(); // 输出 Greeting
task.Wait();
void Greet(object state)
{
Console.Write(state); // 输出 Hello
}
Tips
VS 在并行任务窗口会显示每一个任务的
AsyncState
。关于任务窗口,见 30.8.2 任务窗口
23.4.1.2 TaskCreationOptions
TaskCreationOptions
用于设置任务执行方式,Task.Factory.StartNew()
方法接受该枚举值,它有如下组合值:
-
LongRunning
:通知调度器为任务指定一个线程。适用于 I/O 密集型任务和长时间执行的任务 -
PreferFairness
:令任务调度器的调度顺序尽可能和任务的开始顺序一致。这种优化可以在不增加竞争开销的情况下创建子任务 -
AttachedToParent
:创建子任务
23.4.1.3 子任务
在任务中启动另一个任务,可通过 TaskCreationOptions
的 AttachedToParent
值令它们为父子任务关系:
Task parent = Task.Factory.StartNew(() =>
{
Console.WriteLine("I am a parent");
Task.Factory.StartNew(() => // Detached task
{
Console.WriteLine("I am detached");
});
Task.Factory.StartNew(() => // Child task
{
Console.WriteLine("I am a child");
}, TaskCreationOptions.AttachedToParent);
});
子任务是一类特殊的任务:
- 父任务必须等待所有子任务结束,才能结束
- 父任务结束时,子任务中发生的异常才会向上抛出
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => // 子
{
// 若执行如下代码,后续 Wait() 会等待 2s,待子任务结束
Thread.Sleep(2000);
Task.Factory.StartNew (() => { throw null; }, atp); // 孙
}, atp);
// 若执行如下代码,后续 Wait() 会在 1s 后再抛出异常
// Thread.Sleep(1000);
});
// 此处会抛出 NullReferenceException (包装至 AggregateExceptions)
parent.Wait();
23.4.2 等待多个子任务
23.4.2.1 Task.WaitAll()
方法
我们可以使用 Task.WaitAll()
方法等待多个任务结束,它类似于如下代码:轮流等待每个任务。但是它更高效:它至多只需进行一次上下文切换。
// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
它有如下特点:
- 如果有一个或多个任务抛出异常,
WaitAll()
仍会等待所有任务完成; - 等待的任务若抛出异常,它会将异常包装为
AggregateException
后二次抛出
Notice
不要把
WaitAll()
和WhenAll()
搞混。
23.4.2.2 Task.WaitAny()
方法
该方法会在任意一个任务完成时触发。
23.4.3 取消任务
任务的取消通过取消令牌完成。通过令牌执行的取消操作,任务会进入“已取消”状态(TaskStatus.Canceled
)。
我们可以通过 CancellationToken.ThrowIfCancellationRequested()
方法检查令牌状态、抛出 TaskCanceledException
异常以取消任务:
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter(500);
Task task = Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
token.ThrowIfCancellationRequested(); // Check for cancellation request
}, token);
try { task.Wait(); }
catch (AggregateException ex)
{
Console.WriteLine(ex.InnerException is TaskCanceledException); // True
Console.WriteLine(task.IsCanceled); // True
Console.WriteLine(task.Status); // Canceled
}
TaskCanceledException
是 OperationCanceledException
的派生类,如果希望显式抛出 OperationCanceledException
以取消任务,则 CancellationToken
必须作为 OperationCanceledException
的构造器参数传入,否则任务不会进入 TaskStatus.Canceled
状态,也不会触发标记为 OnlyOnCanceled
的延续任务。
Notice
Wait()
和CancelAndWait()
方法的的取消令牌参数用于取消等待操作,而非取消任务本身。
23.4.4 延续任务
ContineuWith()
方法将在一个任务执行完毕后立即执行一个委托:
Task task1 = Task.Factory.StartNew(() => Console.Write("antecedent.."));
Task task2 = task1.ContinueWith(ant => Console.Write("..continuation"));
其中:
-
task1
在结束、失败或取消后,task2
开始执行 - 如果
task1
先执行完毕,再运行的第二行代码,则task2
会立即开始执行; -
task2
对应的 Lambda 表达式中,传入的参数是 task1
的引用
默认情况下,task1
和 task2
不再同一线程,如果希望它们在同一线程执行,需在调用 ContinueWith()
方法时指定 TaskContinuationOptions.ExecuteSynchronously
选项(这有助于改善性能)。
23.4.4.1 延续任务和 Task<TResult>
延续任务和普通任务一样,其类型可以是 Task<TResult>
、可以返回数据。以如下代码为例,它计算了 \(Math.Sqrt(8*2)\) 的值并输出:
Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
23.4.4.2 延续任务和异常
延续任务可以用于处理前导任务的异常。处理方式有:
- 可通过前导任务的
Exception
属性确认前导任务是否失败; - 调用前导任务的
Result
/Wait()
成员以捕获AggregateException
上述两种方法都会使前导任务成为已观测状态,进而避免触发 TaskScheduler.UnobservedTaskException
事件。
我们还可以通过 TaskContinuationOptions
配置延续任务何时触发,可配置的延续方式有:
值 延续方式 组合模式 NotOnRanToCompletion
在 Task.Status
不是RanToCompletion
时延续 NotOnFaulted
在 Task.Status
不是Faulted
时延续 NotOnCanceled
在 Task.Status
不是Canceled
时延续 OnlyOnRanToCompletion
仅在 Task.Status
不是RanToCompletion
时延续 NotOnFaulted | NotOnCanceled
OnlyOnFaulted
仅在 Task.Status
不是Faulted
时延续 NotOnRanToCompletion | NotOnCanceled
OnlyOnCanceled
仅在 Task.Status
不是Canceled
时延续 NotOnRanToCompletion | NotOnFaulted
下面代码演示了处理异常/非异常状态的延续任务:
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith(ant => Console.Write("Success!"),
TaskContinuationOptions.NotOnFaulted);
下面这段扩展方法可以用于忽略任意未处理的异常:
Task.Factory.StartNew(() => { throw null; }).IgnoreExceptions();
public static class Extension
{
public static void IgnoreExceptions(this Task task)
{
task.ContinueWith(t => { var ignore = t.Exception; },
TaskContinuationOptions.OnlyOnFaulted);
}
}
23.4.4.3 延续任务与子任务
我们在 23.4.1.3 子任务提到:
- 父任务必须等待所有子任务结束,才能结束
- 父任务结束时,子任务中发生的异常才会向上抛出
延续任务也继承了该特性:
- 在所有子任务结束后再执行
- 子任务抛出的所有异常都会封送至延续任务
C7.0 核心技术指南 第7版.pdf - p974 - C7.0 核心技术指南 第 7 版-P974-20241210172431-uodieaj
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith(p => Console.WriteLine(p.Exception),
TaskContinuationOptions.OnlyOnFaulted);
23.4.4.4 条件延续任务
TaskContinuationOptions
枚举值控制延续任务的关键枚举值有:
值 | 延续方式 | 组合模式 |
---|---|---|
NotOnRanToCompletion |
在 Task.Status 不是 RanToCompletion 时延续 |
|
NotOnFaulted |
在 Task.Status 不是 Faulted 时延续 |
|
NotOnCanceled |
在 Task.Status 不是 Canceled 时延续 |
|
OnlyOnRanToCompletion |
仅在 Task.Status 不是 RanToCompletion 时延续 |
NotOnFaulted | NotOnCanceled |
OnlyOnFaulted |
仅在 Task.Status 不是 Faulted 时延续 |
NotOnRanToCompletion | NotOnCanceled |
OnlyOnCanceled |
仅在 Task.Status 不是 Canceled 时延续 |
NotOnRanToCompletion | NotOnFaulted |
需要特别指出的是:当延续任务指定了上述标志而无法执行时,它不会被遗忘或丢弃,而是会取消。这意味着后续的延续任务仍会执行(除非后续的延续任务被标记为 NotOnCanceled
):
Task t1 = Task.Factory.StartNew(() => Console.WriteLine("NotFault"));
Task t2 = t1.ContinueWith(ant => Console.WriteLine("fault"), // 因不满足延续条件,不执行
TaskContinuationOptions.OnlyOnFaulted);
Task t3 = t2.ContinueWith(ant => Console.WriteLine("t3")); // 此处会正常输出 t3。如果此处传入 NotOnCanceled,将不执行
C7.0 核心技术指南 第7版.pdf - p975 - C7.0 核心技术指南 第 7 版-P975-20241210221245-i4db1l1
23.4.4.5 具有多个前导任务的延续任务
Task.Factory
定义了 ContinueWhenAll()
和 ContinueWhenAny()
方法,可以从多个前导任务调度延续任务。但是在 Task.WhenAll()
、Task.WhenAny()
引入后上述方法就显得多余了。试比较如下两段代码:
var task1 = Task.Run (() => Console.Write("X"));
var task2 = Task.Run (() => Console.Write("Y"));
var continuation = Task.Factory.ContinueWhenAll(new[] { task1, task2 },
tasks => Console.WriteLine("Done"));
var continuation = Task.WhenAll(task1, task2)
.ContinueWith(ant => Console.WriteLine("Done"));
Eureka
二者还是不同的。如果
task1
、task2
是Task<TResult>
类型,则第一段代码中tasks
是 Task<TResult>[]
类型,而第二段代码中ant
是 Task<TResult[]>
类型。
23.4.4.6 单一前导任务上的多个延续任务
在相同的任务上多次调用 ContinueWith()
,可以在一个前导任务上创建多个延续任务。它们的执行有如下特点:
- 前导任务结束时,所有延续任务会同时开始执行;
- 若指定了
TaskContinuationOptions.ExecuteSynchronously
,延续任务会按顺序执行
以如下代码为例,它会等待 1s,然后输出“XY”或“YX”:
var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith(ant => Console.Write("X"));
t.ContinueWith(ant => Console.Write("Y"));
23.4.5 任务调度器
任务调度器负责将任务分配到线程上。任务调度器均派生自 TaskScheduler
抽象类。.NET Framework 提供了两个具体的任务调度器实现:
-
与 CLR 的线程池协同工作的默认调度器
-
同步上下文调度器
主要为 WPF、Windows Forms 这样的线程模型共同工作而设计。它会捕获同步上下文,并令任务或延续任务在这个上下文上执行:
// 假设我们是在 Windows Forms / WPF 应用程序的 UI 线程上: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
我们也可以从 TaskScheduler
类派生自己的调度器(很少会这么做),一般的自定义调度,使用 TaskCompletionSource
足够。
Tips
关于自定义调度器,我们在 14.5.5.2 OperationStarted 和 OperationCompleted 观察返回值为 void 的异步程序时使用过。
23.4.6 TaskFactory
类
Task.Factory
静态属性会返回一个默认的 TaskFactory
对象。TaskFactory
用于创建如下三种任务:
-
普通任务
通过调用
StartNew()
方法 -
具有多个前导任务的延续任务
调用
ContinueWhenAll()
和 ContinueWhenAny()
-
将符合异步编程模型(APM)的方法包装为任务
调用
FromAsync()
,见 #ref#14.7
23.4.6.1 创建自定义任务工厂
TaskFactory
可以通过 new 创建,创建时可以传入 TaskCreationOptions
和 TaskContinuationOptions
选项,后续通过该任务工厂创建的任务都将使用该配置。
var factory = new TaskFactory(
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.None);
按照上述配置,如下代码创建的任务将是“长时间运行、且附加在父任务上”:
Task task1 = factory.StartNew (Method1);
Task task2 = factory.StartNew (Method2);
...
调用 ContinueWhenAll()
和 ContinueWhenAny()
时,TaskFactory
配置的 TaskContinuationOptions
选项将生效。
23.5 处理 AggregateException
PLINQ、Parallel
类、Task
为了确保使用者可以得到所有异常,它们会将得到异常的包装为 AggregateException
异常。
AggregateException
主要有以下成员:
-
InnerExceptions
属性:包含了捕获的全部异常 -
InnerException
属性:包含了捕获的第一个异常
下面是处理 AggregateException
的简单示例:
try
{
var query = from i in ParallelEnumerable.Range(0, 1000000)
select 100 / i;
// Enumerate query
...
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine(ex.Message);
}
23.5.1 Flatten()
和 Handle()
方法
AggregateException
类提供了 Flatten()
方法和 Handle()
方法简化异常的处理过程。
23.5.1.1 Flatten()
方法
AggregateException
通常会包含其他的 AggregateException
(例如,当子任务抛出异常的时候就会出现这种情况)。调用 Flatten()
方法可以消除任意层级的嵌套以简化处理过程。这个方法会返回一个新的 AggregateException
对象,并包含展平的内部异常列表。
catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
myLogWriter.LogException(ex);
}
23.5.1.2 Handle()
方法
Handler()
方法用于“过滤”异常,它接受形为 Func<Exception, bool>
的委托。所有异常验证后会有两种情况:
- 如果所有异常都被处理了(即委托返回 true),则不会重新抛出异常;
- 如果有异常未被处理(即委托返回 false),则会抛出一个新的
AggregateException
,且其中包含所有未处理的异常。
以如下代码为例,它将会重新抛出 AggregateException
,其中包含一个 NullReferenceException
:
var parent = Task.Factory.StartNew(() =>
{
// 我们将通过子任务抛出三种异常
int[] numbers = { 0 };
var childFactory = new TaskFactory
(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
childFactory.StartNew(() => 5 / numbers[0]); // 除数为 0
childFactory.StartNew(() => numbers[1]); // 超出索引
childFactory.StartNew(() => { throw null; }); // 空引用
});
try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle(ex => // 注意:我们仍需要调用 Flatten()
{
if (ex is DivideByZeroException)
{
Console.WriteLine("Divide by zero");
return true; // 标记该异常已被处理
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine("Index out of range");
return true; // 标记该异常已被处理
}
return false; // 其他异常将二次抛出
});
}
Index out of rangeDivide by zero
AggregateException | ||||||||||||||||||||||||||
One or more errors occurred. (Index was outside the bounds of the array.) (Object reference not set to an instance of an object.) (Attempted to divide... | ||||||||||||||||||||||||||
Message | One or more errors occurred. (Index was outside the bounds of the array.) (Object reference not set to an instance of an object.) (Attempted to divide by zero.) (Object reference not set to an instance of an object.) | |||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
InnerException |
| |||||||||||||||||||||||||
InnerExceptions |
| |||||||||||||||||||||||||
StackTrace | at System.AggregateException.Handle(Func`2 predicate) at UserQuery.Main(), line 15 | |||||||||||||||||||||||||
Data |
| |||||||||||||||||||||||||
HelpLink | null | |||||||||||||||||||||||||
HResult | -2146233088 | |||||||||||||||||||||||||
Source | System.Private.CoreLib | |||||||||||||||||||||||||
TargetSite | System.Reflection.MethodBase |
23.6 并发集合
.NET Framework 4.0 提供了一系列并发集合:
并发集合 | 非并发等价集合 |
---|---|
ConcurrentStack<T> |
Stack<T> |
ConcurrentQueue<T> |
Quee<T> |
ConcurrentBag<T> |
无 |
ConcurrentDictionary<TKey, TValue> |
Dictionary<TKey, TValue> |
并发集合对高并发场景进行了优化,使用时需要注意:
-
传统集合在非并发场景下的性能要 高 于并发集合。
-
线程安全的集合并不能保证使用它们的代码是线程安全的。
-
在枚举并发集合时,如果另一个线程更新了集合的内容, 不会 抛出任何异常。相反的,我们会得到一个 新旧内容混合的 结果。
-
List<T>
没有对应的并发集合。 -
ConcurrentStack
、ConcurrentQueue
和ConcurrentBag
内部是使用 链表 实现的。因此,其内存利用不如Stack
和Queue
高效。但是它们适用于并发访问,因为 链表 更容易实现无锁算法或者少锁的算法。这是因为在链表中插入一个节点只需要更新几个引用,而在一个类似
List<T>
的结构中插入一个元素可能需要移动数以千计的现有元素。 -
并发集合提供了原子的检测并执行操作,如
TryPop()
。大部分方法是通过
IProducerConsumerCollection<T>
接口统一。
并发集合绝不仅仅是在普通集合上加一把锁这么简单。以如下代码为例,左边的代码比右边的代码慢三倍以上(我实测慢了六倍多,有锁相比无锁慢了一倍):
var d = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;
var d = new Dictionary<int, int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;
Benchmark Live Summary
Optimizations enabled.DefaultJobList<LiveResult> (2 items) | ||||||||
Case | ResultsGraph | Mean | Min | Max | Range | AllocatedBytes | Operations | Phase |
---|---|---|---|---|---|---|---|---|
Concurrent | 179.51 ms | 174.65 ms | 182.71 ms | 4% | 113,407,877 | 39 | Complete | |
Normal | 26.06 ms | 25.60 ms | 26.65 ms | 4% | 53,893,024 | 448 | Complete |
Benchmark Process Environment Information:
BenchmarkDotNet v0.13.9+228a464e8be6c580ad9408e98f18813f6407fb5aRuntime=.NET 9.0.0 (9.0.24.52809), X64 RyuJIT AVX2
GC=Concurrent Workstation
HardwareIntrinsics=AVX2,AES,BMI1,BMI2,FMA,LZCNT,PCLMUL,POPCNT,AvxVnni,SERIALIZE VectorSize=256Customize benchmarking behaviorLearn more about benchmarking in LINQPadLearn more about the BenchmarkDotNet library
23.6.1 IProducerConsumerCollection<T>
接口
生产者/消费者集合主要有两种使用场景:
- 添加一个元素(“生产”)
- 检索一个元素并删除它(“消费”)
Tips
栈和队列是典型的生产者/消费者集合。这种集合在并行编程中有重要意义:它们有利于实现高效的无锁设计。
IProducerConsumerCollection<T>
接口代表了一个线程安全的生产者/消费者接口,该接口扩展了 ICollection
接口,并添加了如下方法:
-
void CopyTo (T[] array, int index)
方法 -
T[] ToArray()
方法 -
bool TryAdd(T item)
方法:如果自定义集合不允许出现重复元素,该方法添加重复元素返回 false -
bool TryTake(out T item)
方法
ConcurrentStack<T>
、ConcurrentQueue<T>
和 ConcurrentBag<T>
都实现了该接口,上述方法它们的行为略有不同:
TryAdd() |
TryTake() |
|
---|---|---|
ConcurrentStack<T> |
必然成功,并返回 true | 删除 最近 添加的元素 |
ConcurrentQueue<T> |
必然成功,并返回 true | 删除 最早 添加的元素 |
ConcurrentBag<T> |
必然成功,并返回 true | 哪个元素 删除 效率最高,删除哪个元素 |
此外,上述三个类还提供了它们特有的方法,如 TryDequeue()
、TryPop()
。
23.6.2 ConcurrentBag<T>
类
ConcurrentBag<T>
通过 为每个线程创建私有链表 保证高效存储,并行调用 Add()
方法几乎不会出现竞争。这种实现方式带来一些特性:
-
它是 无 序的对象集合
-
如果每个线程 Take 的元素比 Add 的元素少,其
Take()
方法也是非常高效的在调用
Task()
时,ConcurrentBag<T>
会先查询当前线程的私有链表,如果至少有 一 个元素存在(在真正的实现中,为了完全避免竞争,队列中至少需要两个元素),该操作就可以在不引入竞争的情况下完成 -
枚举集合中的元素时,枚举器会遍历每一个线程的私有链表,依次返回其中的每个元素
其用法有:
var misspellings = new ConcurrentBag<Tuple<int, string>>(); Parallel.ForEach(wordsToTest, (word, state, i) => { if (!wordLookup.Contains(word)) misspellings.Add(Tuple.Create((int)i, word)); });
Suggest
ConcurrentBag<T>
不适于实现生产者/消费者队列,因为元素的添加和移除操作是在不同的线程间执行的。如果并行操作大部分是添加元素,或各个线程添加和移除的数目基本平衡,ConcurrentBag<T>
是理想的选择。
23.7 BlockingCollection<T>
类
前文介绍的三种集合,如果集合为空,TryTake()
方法会返回 false,有时我们更希望等待集合中出现新元素。为此 PFX 的设计者引入了 BlockingCollection<T>
包装类,它可以包装任意 IProducerConsumerCollection<T>
接口实例。其 Take()
方法将从内部集合取出一个元素,并在内部集合为空时 阻塞 操作。
使用 BlockingCollection<T>
步骤如下:
-
创建阻塞集合的实例,(可选)指定需要包装的
IProducerConsumerCollection<T>
实例于集合的最大数目(边界)如果不指定包装的
IProducerConsumerCollection<T>
实例,默认使用ConcurrentQueue<T>
-
调用
Add()
或 TryAdd()
方法添加元素可设定取消令牌、超时时间,如果集合大小有边界,在达到边界时会 阻塞
-
调用
Take()
或 TryTake()
方法移除(消费)元素可设定取消令牌、超时时间,如果集合为空,在消费元素时会 阻塞
BlockingCollection<T>
还提供了如下成员:
-
GetConsumingEnumerable()
方法:该方法会返回一个 无穷序列 ,当集合中有元素可以消费时,该序列就会返回该元素,否则 阻塞 。 -
CompleteAdding()
方法:禁止集合 添加元素 ,并且会 终止GetConsumingEnumerable()
返回的序列。 -
BlockingCollection<T>.AddToAny
静态方法:接受 BlockingCollection<T>
数组作为参数,第一个响应请求的集合将执行 添加 操作 -
BlockingCollection<T>.TakeFromAny
静态方法:接受 BlockingCollection<T>
数组作为参数,第一个响应请求的集合将执行 消费 操作
23.7.1 编写生产者/消费者队列
生产者/消费者队列可用精确的控制同时运行的线程数,这有助于控制 CPU 和其他资源的开销。实际上,CLR 的线程池也是一个生产者/消费者队列。其工作原理如下:
- 创建一个队列用于描述工作项目,或存储工作项目所处理的数据。
- 当需要执行一项工作时,调用者将其插入队列,并继续执行其他操作。
- 后台的若干(一个或者多个)工作线程从队列中取出工作项目并执行相应工作。
下面这段代码利用 BlockingCollection<T>
的边界特点实现了上述功能,限制了同时执行的 Task
数量:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue(int workerCount)
{
// 为每个消费者开启一个任务
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Enqueue(Action action)
{
_taskQ.Add(action);
}
void Consume()
{
// 该无限序列将在没有可用元素时阻塞,通过这一点可令该任务永远无法结束
// 它将在调用 CompleteAdding() 后结束
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
它的使用方式如下,如下代码限制同时执行的 Task
数量为 2 :
var pcQ = new PCQueue (2);
string result = await pcQ.Enqueue (() => "That was easy!");
...
23.7.1.1 活用任务
上述生产者/消费者结构不够灵活:我们无法在任务进入队列后进行跟踪。如果能实现如下功能就更完善了:
- 了解每一个工作项目的完成时间(并可以执行 await 操作)
- 取消一个工作项目
- 能够恰当处理工作项目执行过程中抛出的任何异常
一个理想的解决方案是令 Enqueue()
方法返回一个 Task
对象。我们可用使用 TaskCompletionSource
创建一个 Task
,也可用直接用 new 实例化一个 Task
:
public class PCQueue : IDisposable
{
BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
public PCQueue (int workerCount)
{
// 为每个消费者开启一个任务
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task (action, cancelToken);
_taskQ.Add (task);
return task;
}
public Task<TResult> Enqueue<TResult> (Func<TResult> func, CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task<TResult>(func, cancelToken);
_taskQ.Add(task);
return task;
}
void Consume()
{
foreach (var task in _taskQ.GetConsumingEnumerable())
{
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (InvalidOperationException)
{
// 任务可能在检查取消状态后、执行前发生取消(尽管概率很低),
// 我们需要捕获 InvalidOperationException 以处理这种情况
}
}
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
标签:Task,23,编程,并行,任务,线程,方法,Parallel,PLINQ From: https://www.cnblogs.com/hihaojie/p/18656472/chapter-23-parallel-programming-1gpskf