简介:
编程需要对基本的数据结构和算法有所了解。程序员为并发情况选择最合适的数据结 构,那就需要知道很多事情,例如算法运行时间、空间复杂度,以及大写0标记法等。在不 同的广为人知的场景中,我们总知道哪种数据结构更高效。对于并行计算,我们需要使用适当的数据结构。这些数据结构具备可伸缩性,尽可能地 避免锁,同时还能提供线程安全的访问。.NET framework版本4引入了 System.Collections. Concunent命名空间,其中包含了一些数据结构。在本章中,我们将展示这些数据结构并通 过简单的例子来说明如何使用它们
a. ConcurrentQueue
ConcurrentQueue集合使用了原子的比较和交换(Compare and Swap, 简称CAS)操作,以及SpinWait来保证线程安全。它实现了一个先进先出(First In First Out,简称FIFO)的集合,这意味着元素出队列的顺序与加入队列的顺序是一致的。可以 调用Enqueue方法向队列中加入元素 TryDequeue方法试图取出队列中的第一个元素,而 TryPeek方法则试图得到第一个元素但并不从队列中删除该元素
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using static System.Console; namespace Chapter6.Recipe2 { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { //使用ConcurrentQueue集合实例创建了一个任务队列 var taskQueue = new ConcurrentQueue<CustomTask>(); //创建了 一个取消标志,它是用来在我们将任务放入队列后停止工作的 var cts = new CancellationTokenSource(); //接下来启动了一个单独的工作者线程来将任务放入任务队列中 var taskSource = Task.Run(() => TaskProducer(taskQueue)); //现在定义该程序中消费任务的部分。我们创建了四个工作者、它们会随机等待一段时 间,然后从任务队列中获取一个任务,处理该任务, //一直重复整个过程直到我们发出取消标 志信号 Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i-1] = Task.Run( () => TaskProcessor(taskQueue, $"Processor {processorId}", cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); //我们看到队列中的任务按从前到后的顺序被处理,但一个后面的任务是有可能会比前面 的任务先处理的,因为我们有四个工作者独立地运行, //而且任务处理时间并不是恒定的。我 们看到访问该队列是线程安全的,没有一个元素会被提取两次 await Task.WhenAll(processors); Read(); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask {Id = i}; queue.Enqueue(workItem); WriteLine($"Task {workItem.Id} has been posted"); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { WriteLine($"Task {workItem.Id} has been processed by {name}"); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } } }展示创建能被多个工作者异步处理的一组任务的例子
b. Concurrentstack
Concurrentstack的实现也没有使用任何锁,只釆用了 CAS操作。它是一个后进先出 (Last In First Out,简称LIFO)的集合,这意味着最近添加的元素会先返回。可以使用Push 和PushRange方法添加元素,使用TryPop和TryPopRange方法获取元素,以及使用TryPeek 方法检查元素。
c. ConcurrentBag
ConcurrentBag是一个支持重复元素的无序集合。它针对这样以下情况进行了优化.即 多个线程以这样的方式工作:每个线程产生和消费自己的任务,极少与其他线程的任务交互 (如果要交互则使用锁)。添加元素使用Add方法,检查元素使用TryPeek方法、获取元素使 用TryTake方法,请避免使用上面提及的集合的Count属性。实现这些集合使用的是链表,Count操作 的时间复杂度为0(N)。如果想检查集合是否为空,请使用IsEmpty属性,其时间复 杂度为0(1)。
d. ConcurrentDictionary
ConcurrentDictionary是一个线程安全的字典集合的实现。对于读操作无需使用锁。但是 对于写操作则需要锁,该并发字典使用多个锁,在字典桶之上实现了一个细粒度的锁模型。 使用参数concurrencyLevel可以在构造函数中定义锁的数量、这意味着预估的线程数量将并 发地更新该字典。就由于并发字典使用锁,所以一些操作需要获取该字典中的所有锁:如果没必要请避免 使用以下操作:Count, IsEmpty. Keys、Values. CopyTo 及 To Array。
using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using static System.Console; namespace Chapter6.Recipe1 { class Program { const string Item = "Dictionary item"; const int Iterations = 1000000; public static string CurrentItem; static void Main(string[] args) { //当程序启动时我们创建了两个集合,其中一个是标准的字典集合,另一个是新的并发字典集合 var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < Iterations; i++) { //采用锁的机制向标准的字典中添加元素,并测量完成100万次迭代的时间 lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); WriteLine($"Writing to dictionary with a lock: {sw.Elapsed}");//Writing to dictionary with a lock: 00:00:00.0449525 sw.Restart(); for (int i = 0; i < Iterations; i++) { //采用同样的场景来测量ConcurrentDictionary的性能, concurrentDictionary[i] = Item; } sw.Stop(); WriteLine($"Writing to a concurrent dictionary: {sw.Elapsed}");//Writing to a concurrent dictionary: 00:00:00.2662710 //最后比较从两个集合中获取值的性能 sw.Restart(); for (int i = 0; i < Iterations; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); WriteLine($"Reading from dictionary with a lock: {sw.Elapsed}");//Reading from dictionary with a lock: 00:00:00.0192289 sw.Restart(); for (int i = 0; i < Iterations; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); WriteLine($"Reading from a concurrent dictionary: {sw.Elapsed}");//Reading from a concurrent dictionary: 00:00:00.0108079 ReadLine(); } //通过这个非常简单的场景,我们发现ConcurrentDictionary写操作比使用锁的通常 的字典要慢得多, //而读操作则要快些。因此如果对字典需要大量的线程安全的读操作, ConcurrentDictionary 是最好的选择 //如果你对字典只需要多线程访问只读元素,则没必要执行线程安全的读操作。在此场 景中最好只使用通常的字典或ReadOnlyDictionary集合。 //ConcurrentDictionary的实现使用了细粒度锁(fine-grained locking)技术,这在多线程写 入方面比使用锁的通常的字典(也被称为粗粒度锁)的可伸缩性更好。 //正如本例中所示,当 只用一个线程时,并发字典非常慢,但是扩展到5到6个线程(如果有足够的CPU核心来同 时运行它们),并发字典的性能会更好 } }(比较在单线程环境中使用通常的字典集合与使用并发 字典的性能)
e. BlockingCollection
BlockingCollection 是对 IProducerConsumerCollection 泛型接 口的实现的一个高级封装。 它有很多先进的功能来实现管道场景,即当你有一些步骤需要使用之前步骤运行的结果时: BlockingCollectione类支持如下功能:分块、调整内部集合容量、取消集合操作、从多个块 集合中获取元素
标签:Task,dictionary,C#,sw,---,线程,集合,字典 From: https://www.cnblogs.com/apple-hu/p/18472843