PLINQ可以自动并行化本地LINQ查询。易于使用是PLINQ的优势,因为它将工作划分和结果整理的任务交给了.NET Core。
要使用PLINQ,只需直接在输入序列上调用AsParallel()
方法,而后和先前一样编写普通的LINQ查询即可。
例如,以下范例列出了3~100 000之间的所有素数,并充分利用了目标计算机上的所有内核:
//Range方法获取两个int参数:一个起始数,一个是要生成的结果的数目
IEnumerable<int> numbers = Enumerable.Range(3, 10000 - 3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
AsParallel
是System.Linq.ParallelEnumerable
类的一个扩展方法,它将输入包装为一个以ParallelQuery<TSource>
为基类的序列,这样,后续的LINQ查询运算符就会绑定到由ParallelEnumerable
定义的另外一套扩展方法上。这些扩展方法为每一种标准查询运算符提供了并行化实现。基本上,它们的工作原理都是将输入序列划分为小块,让每一块在不同的线程上执行,并将执行结果整理为一个输出序列以供使用。
调用AsSequential()
会将ParallelQuery
序列包装解除,后续的查询运算符将会重新绑定到标准查询运算符上并顺序执行。这在调用有副作用或者非线程安全的代码之前是非常必要的。
对于接受两个输入序列的运算符(Join、GroupJoin、Concat、Union、Intersect、Except和Zip)而言,必须在两个输入序列上都调用AsParallel()方法(否则会抛出异常)。但不需要在查询时在中间再次应用AsParallel方法,因为PLINQ的查询运算符的输出是另一个ParallelQuery序列。事实上,重复调用AsParallel会导致查询强制合并并重新划分,从而降低效率:
mySequence.AsParallel()
.Where(n => n > 100) // ParallelQuery<int>序列包装
.AsParallel() // 输出另外的ParallelQuery<int>
.Select(n => n * n) // 这是不必要的
并非所有的查询运算符都可以高效并行化。对于那些不能并行化的运算符,PLINQ将会顺序执行。此外,如果有迹象表明并行化可能比顺序执行的开销更大的话,PLINQ也可能选择顺序执行。
PLINQ仅适用于本地集合:它不支持Entity Framework。因为它们都会将LINQ翻译为SQL并在数据库服务器上执行。但是在获得数据库查询结果之后就可以使用PLINQ对这些结果集进行本地查询了。
并行执行的特性
和普通的LINQ查询一样,PLINQ也是延迟计算的。即只有当消费结果时才会触发查询执行。
但是,枚举结果时,执行方式则与普通的顺序查询不同。
顺序查询的每一个元素会根据消费者的请求从输入序列中取出。而并行查询通常使用独立线程从输入序列中提取元素,并且在时间上比消费者的请求稍稍提前。接下来,它会通过查询链并行对元素进行处理,并将结果保存在一个小缓冲区中,以便让消费者按需取用。如果消费者暂停或终止枚举,则查询处理器也会相应暂停或停止,避免CPU时间和内存的浪费。
PLINQ与顺序
并行查询运算符的副作用之一是PLINQ无法像LINQ那样保持序列的原始顺序。如果需要保持序列的原始顺序,则必须在AsParallel()之后调用AsOrdered()方法:
调用AsOrdered
后PLINQ必须追踪每一个元素的原始位置,因此当元素数量巨大时就会影响性能。
调用AsUnordered
可以在稍后的查询中抵消AsOrdered的效果:它会引入“随机洗牌点”从而使查询更高效地执行。因此,如果只需要在前两次查询中保持输入序列元素的顺序,则可以使用如下写法:
inputSequence.AsParallel().AsOrdered()
.QueryOperator1()
.QueryOperator2()
.AsUnordered() // 从这里开始不需要排序
.QueryOperator3()
...
AsOrdered不是大多数查询的默认行为。对于它们来说,初始的输入顺序并不重要。
PLINQ的限制
目前能够被PLINQ并行化的内容是非常有限的。以下查询运算符默认会阻止查询并行化,除非源序列中的元素本身就位于初始的索引位置:含有索引参数的Select、SelectMany以及ElementAt运算符。
大多数查询运算符会改变元素的索引位置(包括那些会将元素移除的运算符,例如Where)。因此,如果需要这些运算符,通常需要在查询的起始处使用。
以下运算符是可以并行化的,但由于划分的策略很复杂,因此有可能在一些情况下处理速度比顺序处理还要慢:Join、GroupBy、GroupJoin、Distinct、Union、Intersect和Except。
所有其他运算符都是可以并行化的,但使用这些运算符并不能保证查询一定会并行化。如果有任何迹象表明并行化的开销会降低特定查询的速度,则PLINQ有可能会选择让查询顺序执行。当然,若需要更改这一行为,强制进行并行化,可以在调用AsParallel()方法之后加入如下的代码:.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
示例:并行拼写检查器
假定要编写一个拼写检查器,让该检查器可以利用所有的内核迅速对大型文档进行拼写检查。我们可以将算法写为一个LINQ查询语句,从而方便将算法并行化。
static void Main(string[] args)
{
// 拼写检查的第一步是将英语字典下载到一个HashSet中以便进行快速查找。
string wordLookupFile = Path.Combine(Path.GetTempPath(), "WordLookup.txt");
if (!File.Exists(wordLookupFile)) // WordLookup.txt文件包含15万个单词
new WebClient().DownloadFile(
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
var wordLookup = new HashSet<string>(
File.ReadAllLines(wordLookupFile),
StringComparer.InvariantCultureIgnoreCase); // 不区分大小写
// 接下来将用这些单词创建一份“测试文档”,该文档是一个由一百万个随机单词组成的数组
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range(0, 1000000)
.Select(i => wordList[random.Next(0, wordList.Length)])
.ToArray();
// 引入一些拼写错误
wordsToTest[12345] = "woozsh";
wordsToTest[23456] = "wubsie";
// 最后就可以基于wordLookup对wordsToTest进行并行拼写检查了。PLINQ令这种操作变得非常简单
var query = wordsToTest
.AsParallel()
.Select((word, index) => new IndexedWord { Word = word, Index = index })
.Where(iword => !wordLookup.Contains(iword.Word))
.OrderBy(iword => iword.Index);
var result = query.ToList();
Console.ReadKey();
}
struct IndexedWord
{
public string Word;
public int Index;
}
而上述代码中,谓词中的wordLookup.Contains方法增加了查询的重要性,因而值得对查询进行并行化。
使用ThreadLocal
接下来我们不妨将随机生成测试词汇列表的过程也进行并行化。我们可以简单地将这个过程组织为一个LINQ查询。以下是顺序查询版本:
string[] wordsToTest = Enumerable.Range(0, 1000000)
.Select(i => wordList[random.Next(0, wordList.Length)])
.ToArray();
但是,由于random.Next
方法并不是线程安全的,因此我们不能直接在整个查询之前添加AsParallel()
。一种解决方案是编写一个函数锁定random.Next调用,但是这样会限制并发性。更好的选择是使用本地线程存储ThreadLocal<Random>
为每一个线程创建一个Random对象。然后再执行并行化查询:
var localRandom = new ThreadLocal<Random>(
() => new Random(Guid.NewGuid().GetHashCode()));
string[] wordsToTest = Enumerable.Range(0, 1000000).AsParallel()
.Select(i => wordList[localRandom.Value.Next(0, wordList.Length)])
.ToArray();
在工厂函数实例化Random对象时我们将Guid的散列码作为种子以避免两个短时间内创建的Random对象生成相同的随机数序列。
纯函数
由于PLINQ会在并行线程上执行查询,因此必须注意保证操作的线程安全性。特别是,写入变量可能产生副作用,从而影响线程安全性。
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel()
select n * i++;
我们可以使用锁保护i的自增操作,但是这不能解决i有可能无法对应输入元素的位置的问题。若使用AsOrdered方法虽然能保证元素输出顺序和其处理顺序一致,但仍无法解决这个问题,因为其处理次序并不一定是按顺序的。
因此,正确的方案是使用带有索引版本的Select重新编写查询:
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
为了实现最佳性能,任何查询运算符调用的方法都应当是线程安全的,它们不应当更新字段或者属性的值。
设置并行级别
默认情况下,PLINQ会根据处理器的情况选择最合适的并行级别。如需修改这个设置,可在AsParallel方法之后调用WithDegreeOfParallelism
方法。
在一个PLINQ查询中只能调用一次WithDegreeOfParallelism。如果需要再次调用它,必须在查询中再次调用AsParallel()方法,从而强制合并并重新进行划分。
取消操作
若在foreach循环中消费PLINQ查询的结果,则只需要跳出foreach循环,查询就会自动取消,因为其枚举器会被隐式销毁。
对于以转换、元素类操作以及聚合操作运算符终止的查询,可使用取消令牌在另一个线程中取消该操作。如需使用取消令牌,则应在调用AsParallel方法之后调用WithCancellation方法,并以CancellationTokenSource对象的Token属性值作为参数。而后另一个线程可以调用CancellationTokenSource对象的Cancel方法。注意,该操作会在查询的消费端抛OperationCanceledException异常:
IEnumerable<int> million = Enumerable.Range(3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
from n in million.AsParallel().WithCancellation(cancelSource.Token)
where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
select n;
new Thread(() =>
{
Thread.Sleep(100); // 0.1秒后取消
cancelSource.Cancel();
}).Start();
try
{
int[] primes = primeNumberQuery.ToArray();
// 不会到达这里,因为另一个线程会取消我们。
var len = primes.Length;
}
catch (OperationCanceledException)
{
Console.WriteLine("Query canceled");
}
PLINQ在取消时会等待每一个工作线程处理完当前的元素,之后再中止查询。这意味着,查询过程中调用的任何外部方法都会完整执行。
PLINQ优化
输出端优化
PLINQ的优点之一是它能够方便地将并行化工作的结果整理到一个输出序列。但有的时候结束时要做的工作只是在序列的每一个元素上执行一些函数:
foreach(int n in parallelQuery)
DoSomething(n);
如果面对的是以上这种情况,而且并不关心元素处理的顺序,则可以使用PLINQ的ForAll方法提高效率。
ForAll
方法会在每一个ParallelQuery的输出元素上运行一个委托。该方法会直接嵌入PLINQ内部,跳过整理和枚举步骤。以下是一个常见的示例:
"abcdef".AsParallel().Select(c=>char.ToUpper(c)).ForAll(Console.write);
输入端优化
PLINQ有三种将输入元素指派到线程的划分策略:
策略 | 元素分配 | 相对性能 |
---|---|---|
块划分 | 动态 | 平均水平 |
范围划分 | 静态 | 不佳到极佳 |
散列划分 | 静态 | 不佳 |
PLINQ在那些需要进行元素比较的查询运算符(GroupBy、Join、GroupJoin、Intersect、Except、Union和Distinct)上总是会使用散列划分策略。散列划分的效率相对较低,因为它必须先计算每一个元素的散列码才能将散列码相同的元素放在同一个线程上进行处理。如果认为这样做的速度太慢,那只能调用AsSequential方法禁用并行化。
若使用其他查询运算符,则可以选择使用范围划分或者块划分策略。默认情况下:
- 如果输入序列是有索引的(它是一个数组或者它实现了IList
),则PLINQ会选择使用范围划分策略。 - 否则,PLINQ会选择使用块划分策略。
概括来说,若输入序列很长,且处理每一个元素花费的CPU时间大致相等,则范围划分策略执行更快,否则往往是块划分执行的速度更快。
若需要强制进行范围划分:
- 如果查询是以Enumerable.Range开始的,则使用ParallelEnumerable.Range来代替它。
- 否则,直接在输入序列上简单调用ToList或ToArray方法即可(但这种方式将显著影响性能,在使用时一定要将这个因素考虑在内)。
若要强制使用块划分,则需要调用Partitioner.Create方法(位于System.Collection.Concurrent命名空间)对输入序列进行包装:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (n => n % 2 == 0);
Partitioner.Create方法的第二个参数指定是否需要对查询进行负载均衡处理,而这正是块划分策略的表示方式。
块划分策略会令每一个工作线程定期提取输入序列中的小“块”元素进行处理。PLINQ一开始会分配非常小的块(每一个块一到两个元素),然后随着查询的进展而增加数量:这可以确保小的序列能够充分并行化,而大的序列也不会导致过度的重复过程。如果一个工作线程恰好拿到了容易处理的元素(处理速度非常快),那么它最终会处理更多的块。这个系统可以令各个线程保持同等忙碌的状态。唯一的劣势在于从共享的输入序列中获取元素需要进行同步(通常是排他锁),而这可能导致一些开销和竞争。
范围划分跳过了常规的输入端枚举过程,它可以预先给每一个工作线程分配同等数量的元素,从而避免了对输入序列的竞争。但是如果一些线程对元素快速完成了处理,它们就将无事可做,而其他线程则还需要继续工作。因此,对于之前的素数计算器来说,使用范围划分很可能会性能不佳。
而在计算前一千万个整数的平方根的和时,使用范围划分策略则效果非凡:
ParallelEnumerable.Range(1,10000000).Sum(i=>Math.Sqrt(i));
由于ParallelEnumerable.Range的返回值类型为ParallelQuery