18. 多线程
18.1 多线程基础
处理器受限延迟(Processor-bound latency):假定一个计算需要执行120亿次算术运算,而总共的处理能力是每秒60亿次,那么从请求结果到获得结果至少有2秒钟的处理器受限延迟。
I/O受限延迟(IO-bound latency):从外部来源(磁盘、web服务器等)获取数据所产生的延迟,从web服务器获取内容的延迟相当于几百万个时钟周期的延迟。
设计规范
1. 不要以为多线程必然会使代码更快【上下文切换的开销】【时间分片对性能的影响】
2. 要在通过多线程来加快解决处理器受限问题时需要谨慎衡量性能
3. 不要假设在单线程代码中的原子性操作在多线程中也是原子性的
4. 不要以为所有线程看到的都是一致的共享内存(即使是共享内存,由于通信延迟的存在,多个线程同时读取内存的同一块区域也会得到不一样的结果)
5. 要确保同时拥有多个锁的代码总是以相同的顺序获取它们【以不同顺序请求锁极易造成死锁】
6. 避免所有的竞态条件,程序行为不能受操作系统调度线程方式的影响
7. 监视异步操作的状态,知道它于何时完成:为判断一个异步操作在何时完成,最好不要采取轮询的方式,也不要采取阻塞并等待的方式。
8. 使用线程池:线程池避免了太多线程的创建,防止系统将大多数时间花在线程的切换而不是运行上。
9. 避免死锁:在防止数据同时被两个不同线程访问的同时还要避免死锁。
10. 为不同的操作提供原子性并同步数据访问:锁定机制防止两个不同的线程同时访问数据。
18.2 异步任务
18.2.1 异步任务简单示例
internal static class TaskTest
public static void SimpleTaskExample()
const int repetitions = 1000;
var task = Task.Run(() =>
for (var count = 0; count < repetitions; count++)
// 主线程中的Console.Write()会与Task.Run()中的Console.Write()竞争
for (var count = 0; count < repetitions; count++)
// 主线程在此处等待task执行完成
18.2.2 使用Task.ContinueWith()延续任务
internal static class TaskTest
public static void Continuation()
var taskA1 = () => Console.WriteLine("starting...");
var taskA2 = () => Console.WriteLine("continue A...");
// 延续任务获取一个Task类型作为实参,这样才能从延续任务的代码中访问先驱任务的状态,先驱任务完成时,延续任务自动开始;
// 延续任务完成后还会返回一个Task,这样就可以建立起任意长度的连续任务链
// 使用TaskContinuationOptions枚举型指定ContinueWith的行为,详见微软官方手册
var taskA = Task.Run(taskA1).ContinueWith(_ => taskA2, TaskContinuationOptions.None);
// taskA执行完成后taskB和TaskC同时执行。注意:执行完成时间随机,这次可能是B先完成,下次可能是C先完成
var taskB = taskA.ContinueWith(_ => Console.WriteLine("Continue B..."));
var taskC = taskA.ContinueWith(_ => Console.WriteLine("Continue C..."));
Task.WaitAll(taskB, taskC);
18.2.3 处理Task抛出的AggregateException类型的异常
internal static class TaskTest
public static void ExceptionHandler()
var task1 = Task.Run(() => throw new InvalidCastException());
var task2 = Task.Run(() => throw new ArgumentException());
// 触发异常
Task.WaitAll(task1, task2);
// Task抛出的异常类型为AggregateException,可以理解为内部异常,具体异常类型包裹在AggregateException内部
catch (AggregateException exception)
// 使用Handle方法处理每个异常
exception.Handle(ex =>
switch (ex)
case InvalidCastException:
Console.WriteLine($"InvalidCastException: {ex.Message}");
case ArgumentException:
Console.WriteLine($"ArgumentException: {ex.Message}");
return false;
return true;
18.2.4 使用ContinueWith()观察未处理的异常
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
private static void Main()
var parentTaskFaulted = false;
var task = new Task(() => throw new InvalidOperationException());
// 只有在先驱任务抛出未处理异常的情况下才运行延续任务
var continuationTask = task.ContinueWith(
antecedentTask => parentTaskFaulted = antecedentTask.IsFaulted,
// 开始运行task
// 仅供演示用,没有人会等待一个先驱任务出错后才运行的延续任务
// 设置两个断言,断定parentTaskFaulted=true,task.IsFaulted=true
// 使用空包容符(!.),断定异常一定不为空
task.Exception!.Handle(eachException =>
return true;
18.2.5 登记未处理的异常
理想情况下是在任何线程常都不应该抛出未处理的异常,但在发生未处理异常时,不应该马上关闭程序,而是先保存工作数据,并记录异常以方便进行错误报告和调试。这要求一种机制来登记未处理异常通知。在.NET Core2.0及以后的版本中,每个AppDomain都提供了这样一种机制,为观察AppDomain中的未处理异常,必须添加一个UnhandledException事件处理程序。无论是主线程还是工作者线程,AppDomain中线程发生的所有未处理异常都会触发UnhandledException事件。注意:该机制的目的只是通知,不允许程序从未处理的异常中恢复并继续运行。示例如下:
using System.Diagnostics;
namespace ConsoleApp1;
internal static class Program
private static Stopwatch _clock = new Stopwatch();
private static void Main()
// 注册UnhandledException事件
AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) =>
Message("Event handler starting");
// 在新的线程中抛出未处理的异常
var thread = new Thread(() =>
Message("Throwing exception.");
throw new Exception();
// 阻塞主线程,等待子线程抛出异常
Message("Finally block running...");
private static void Delay(int i)
Message($"Sleeping for {i}ms.");
private static void Message(string text)
Console.WriteLine($"{Environment.CurrentManagedThreadId}, {_clock.ElapsedMilliseconds:0000}, {text}");
18.2.6 任务取消——使用CancelationToken令牌取消任务
对象,该对象生成传递给对象的取消令牌(CancelationToken)。调用 TaskFactory.ContinueWhenAll<TAntecedentResult,TResult>(Task<TAntecedentResult>[], Func<Task<TAntecedentResult>[],TResult>, CancellationToken)
namespace ConsoleApp1;
internal static class Program
private static void Main()
// 实例化CancellationTokenSource对象以创建取消令牌
var source = new CancellationTokenSource();
var cancellationToken = source.Token;
var randomNumberGenerator = new Random();
var tasks = new List<Task<int[]>>();
var taskFactory = new TaskFactory(cancellationToken);
for (var i = 0; i < 11; i++)
var iteration = i + 1;
int[] NewTask()
var values = new int[10];
for (var ctr = 1; ctr <= 10; ctr++)
var value = randomNumberGenerator.Next(0, 101);
if (value == 0)
Console.WriteLine($"Cancelling at task {iteration}");
values[ctr - 1] = value;
return values;
tasks.Add(taskFactory.StartNew(NewTask, cancellationToken));
// 将List<Task<int[]>>类型的tasks转化为Task<int[]>[]
// 之后将Task<int[]>[]类型数组命名为dataFromTasks传入lambda表达式计算平均值
var calculateTask = taskFactory.ContinueWhenAll(tasks.ToArray(),
dataFromTasks =>
var sum = 0d;
var n = 0;
foreach (var items in dataFromTasks)
var data = items.Result;
sum += data.Average();
return sum / n;
}, cancellationToken);
// 执行名为calculateTask的任务,并获取结果
var result = calculateTask.Result;
Console.WriteLine($"the mean is {result}");
catch (AggregateException exception)
exception.Handle(ex =>
switch (ex)
case TaskCanceledException:
Console.WriteLine($"Unable to compute mean: {ex.Message}");
Console.WriteLine("Exception: " + ex.GetType().Name);
return true;
18.2.7 Task.Run()与Task.Factory.StartNew()
Task.Factory.StartNew(SomeAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
18.2.7a TaskCreationOptions
None
AttachedToParent
1. 父任务会等待子任务结束。
2. 父任务会捕获子任务中抛出的异常。
3. 父任务的执行状态取决于子任务的执行状态。
4. 子任务和父任务并不一定运行在同一线程上。
DenyChildAttach
HideScheduler
PreferFairness
RunContinuationsAsynchronously
LongRunning
~~~ CSharp
namespace ConsoleApp1;
internal static class Program
private static void Main()
using var cancellationTokenSource = new CancellationTokenSource();
void SomeAction()
cancellationTokenSource.Token, TaskCreationOptions.LongRunning,
18.2.7b TaskScheduler
以下示例创建一个自定义任务调度程序,用于限制应用使用的线程数。 然后,它会启动两组任务(一组使用默认线程池,另一组使用自定义任务调度程序),并显示任务正在执行的任务和线程的相关信息。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
private static void Main()
var limitedConcurrencyLevelTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(2);
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
var taskFactory = new TaskFactory();
using var cancellationTokenSource = new CancellationTokenSource();
var lockObj = new object();
var outputItem = 0;
for (var tCtr = 0; tCtr < 3; tCtr++)
var iteration = tCtr;
var task = taskFactory.StartNew(() =>
for (var i = 0; i < 20; i++)
lock (lockObj)
Console.Write("{0} in task t-{1} on thread {2} ",
i, iteration, Environment.CurrentManagedThreadId);
if (outputItem % 3 == 0)
}, cancellationTokenSource.Token);
// Use it to run a second set of tasks.
for (var tCtr = 0; tCtr < 3; tCtr++)
var iteration = tCtr;
var task = taskFactory.StartNew(() =>
for (var i = 'a'; i <= 'v'; i++)
lock (lockObj)
Console.Write("'{0}' in task t1-{1} on thread {2} ",
Convert.ToChar(i), iteration, Environment.CurrentManagedThreadId);
if (outputItem % 2 == 0)
}, cancellationTokenSource.Token, TaskCreationOptions.None, limitedConcurrencyLevelTaskScheduler);
Console.WriteLine("\n\nSuccessful completion.");
internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
// 使用ThreadStatic标记的静态字段不会在线程间共享
// 指示当前线程是否正在处理任务
[ThreadStatic] private static bool _currentThreadIsProcessingItems;
// 该task链表需要lock保护
private readonly LinkedList<Task> _tasks = new();
// 最大允许线程数
public sealed override int MaximumConcurrencyLevel { get; }
// 统计调度程序正在处理的任务
private int _delegatesQueuedOrRunning;
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
MaximumConcurrencyLevel = maxDegreeOfParallelism;
protected override void QueueTask(Task task)
lock (_tasks)
if (_delegatesQueuedOrRunning >= MaximumConcurrencyLevel) return;
// 通知线程池,有任务需要执行
private void NotifyThreadPoolOfPendingWork()
ThreadPool.UnsafeQueueUserWorkItem(_ =>
_currentThreadIsProcessingItems = true;
while (true)
Task? item;
lock (_tasks)
if (_tasks.Count == 0)
item = _tasks.First?.Value;
if (item != null) base.TryExecuteTask(item);
_currentThreadIsProcessingItems = false;
}, null);
// 确定是否可以在此调用中同步执行提供的任务【先前任务执行完成后再在同一线程上执行当前任务】
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
// 如果当前线程没有运行任何任务,就不支持同步执行
if (!_currentThreadIsProcessingItems) return false;
// 如果等待内联的任务在任务等待队列中,那么就将其移除等待队列并执行
if (taskWasPreviouslyQueued)
// Try to run the task.
return TryDequeue(task) && base.TryExecuteTask(task);
return base.TryExecuteTask(task);
// Attempt to remove a previously scheduled task from the scheduler.
protected sealed override bool TryDequeue(Task task)
lock (_tasks) return _tasks.Remove(task);
// 枚举当前在队列中等待执行的任务并返回可枚举对象
protected override IEnumerable<Task> GetScheduledTasks()
var lockToken = false;
// Monitor的作用在18.5线程同步这一章节会详细介绍
Monitor.TryEnter(_tasks, ref lockToken);
if (lockToken) return _tasks;
else throw new NotSupportedException();
if (lockToken) Monitor.Exit(_tasks);
0 in task t-0 on thread 6 0 in task t-1 on thread 8 0 in task t-2 on thread 7
1 in task t-0 on thread 6 1 in task t-1 on thread 8 1 in task t-2 on thread 7
2 in task t-0 on thread 6 2 in task t-1 on thread 8 2 in task t-2 on thread 7
3 in task t-0 on thread 6 3 in task t-1 on thread 8 3 in task t-2 on thread 7
4 in task t-0 on thread 6 4 in task t-1 on thread 8 4 in task t-2 on thread 7
5 in task t-0 on thread 6 5 in task t-1 on thread 8 5 in task t-2 on thread 7
6 in task t-0 on thread 6 6 in task t-1 on thread 8 6 in task t-2 on thread 7
7 in task t-0 on thread 6 7 in task t-1 on thread 8 7 in task t-2 on thread 7
8 in task t-0 on thread 6 8 in task t-1 on thread 8 8 in task t-2 on thread 7
9 in task t-0 on thread 6 9 in task t-1 on thread 8 9 in task t-2 on thread 7
10 in task t-0 on thread 6 10 in task t-1 on thread 8 10 in task t-2 on thread 7
11 in task t-0 on thread 6 11 in task t-1 on thread 8 11 in task t-2 on thread 7
12 in task t-0 on thread 6 12 in task t-1 on thread 8 12 in task t-2 on thread 7
13 in task t-0 on thread 6 13 in task t-1 on thread 8 13 in task t-2 on thread 7
14 in task t-0 on thread 6 14 in task t-1 on thread 8 14 in task t-2 on thread 7
15 in task t-0 on thread 6 15 in task t-1 on thread 8 15 in task t-2 on thread 7
16 in task t-0 on thread 6 16 in task t-1 on thread 8 16 in task t-2 on thread 7
17 in task t-0 on thread 6 17 in task t-1 on thread 8 17 in task t-2 on thread 7
18 in task t-0 on thread 6 18 in task t-1 on thread 8 18 in task t-2 on thread 7
19 in task t-0 on thread 6 19 in task t-1 on thread 8 19 in task t-2 on thread 7
'a' in task t1-0 on thread 7 'a' in task t1-1 on thread 8
'b' in task t1-0 on thread 7 'b' in task t1-1 on thread 8
'c' in task t1-0 on thread 7 'c' in task t1-1 on thread 8
'd' in task t1-0 on thread 7 'd' in task t1-1 on thread 8
'e' in task t1-0 on thread 7 'e' in task t1-1 on thread 8
'f' in task t1-0 on thread 7 'f' in task t1-1 on thread 8
'g' in task t1-0 on thread 7 'g' in task t1-1 on thread 8
'h' in task t1-0 on thread 7 'h' in task t1-1 on thread 8
'i' in task t1-0 on thread 7 'i' in task t1-1 on thread 8
'j' in task t1-0 on thread 7 'j' in task t1-1 on thread 8
'k' in task t1-0 on thread 7 'k' in task t1-1 on thread 8
'l' in task t1-0 on thread 7 'l' in task t1-1 on thread 8
'm' in task t1-0 on thread 7 'm' in task t1-1 on thread 8
'n' in task t1-0 on thread 7 'n' in task t1-1 on thread 8
'o' in task t1-0 on thread 7 'o' in task t1-1 on thread 8
'p' in task t1-0 on thread 7 'p' in task t1-1 on thread 8
'q' in task t1-0 on thread 7 'q' in task t1-1 on thread 8
'r' in task t1-0 on thread 7 'r' in task t1-1 on thread 8
's' in task t1-0 on thread 7 's' in task t1-1 on thread 8
't' in task t1-0 on thread 7 't' in task t1-1 on thread 8
'u' in task t1-0 on thread 7 'u' in task t1-1 on thread 8
'v' in task t1-0 on thread 7 'v' in task t1-1 on thread 8
'a' in task t1-2 on thread 7 'b' in task t1-2 on thread 7
'c' in task t1-2 on thread 7 'd' in task t1-2 on thread 7
'e' in task t1-2 on thread 7 'f' in task t1-2 on thread 7
'g' in task t1-2 on thread 7 'h' in task t1-2 on thread 7
'i' in task t1-2 on thread 7 'j' in task t1-2 on thread 7
'k' in task t1-2 on thread 7 'l' in task t1-2 on thread 7
'm' in task t1-2 on thread 7 'n' in task t1-2 on thread 7
'o' in task t1-2 on thread 7 'p' in task t1-2 on thread 7
'q' in task t1-2 on thread 7 'r' in task t1-2 on thread 7
's' in task t1-2 on thread 7 't' in task t1-2 on thread 7
'u' in task t1-2 on thread 7 'v' in task t1-2 on thread 7
18.2.8 对任务进行资源清理
18.2.9 设计规范
设计规范
1. 避免程序在任何线程上产生未处理的异常
2. 考虑登记“未处理异常”事件处理程序以进行调试、记录和紧急关闭
3. 要取消未完成的任务而不是在程序关闭期间允许其运行,即:在程序退出时要先终止正在执行的任务,而不是放任不管
4. 线程池假设任务是CPU-bound类型且运行时间较短,对于IO-bound类型的任务或运行时间常的任务,要告诉任务工厂(Task.Factory)新建的任务可能会长时间运行,使得任务工厂能够恰当地管理它。比如:在等待数据返回期间运行其他短时间的任务。
5. 要少用TaskCreationOptions.LongRunning
6. 避免在正式产品中使用Thread.Sleep()
7. 要使用任务相关API(System.Threading.Tasks)来替代System.Threading中的其他API,比如:Thread、ThreadPool等
