18. 多线程
18.1 多线程基础
-
处理器受限延迟(Processor-bound latency):假定一个计算需要执行120亿次算术运算,而总共的处理能力是每秒60亿次,那么从请求结果到获得结果至少有2秒钟的处理器受限延迟。
-
I/O受限延迟(IO-bound latency):从外部来源(磁盘、web服务器等)获取数据所产生的延迟,从web服务器获取内容的延迟相当于几百万个时钟周期的延迟。
-
任务和线程的区别:任务代表需要执行的一件工作,而线程代表做这件工作的工作者。
title: 设计规范
1. 不要以为多线程必然会使代码更快【上下文切换的开销】【时间分片对性能的影响】
2. 要在通过多线程来加快解决处理器受限问题时需要谨慎衡量性能
3. 不要假设在单线程代码中的原子性操作在多线程中也是原子性的
4. 不要以为所有线程看到的都是一致的共享内存(即使是共享内存,由于通信延迟的存在,多个线程同时读取内存的同一块区域也会得到不一样的结果)
5. 要确保同时拥有多个锁的代码总是以相同的顺序获取它们【以不同顺序请求锁极易造成死锁】
6. 避免所有的竞态条件,程序行为不能受操作系统调度线程方式的影响
7. 监视异步操作的状态,知道它于何时完成:为判断一个异步操作在何时完成,最好不要采取轮询的方式,也不要采取阻塞并等待的方式。
8. 使用线程池:线程池避免了太多线程的创建,防止系统将大多数时间花在线程的切换而不是运行上。
9. 避免死锁:在防止数据同时被两个不同线程访问的同时还要避免死锁。
10. 为不同的操作提供原子性并同步数据访问:锁定机制防止两个不同的线程同时访问数据。
18.2 异步任务
18.2.1 异步任务简单示例
internal static class TaskTest
{
public static void SimpleTaskExample()
{
const int repetitions = 1000;
var task = Task.Run(() =>
{
for (var count = 0; count < repetitions; count++)
{
Console.Write("-");
}
});
// 主线程中的Console.Write()会与Task.Run()中的Console.Write()竞争
for (var count = 0; count < repetitions; count++)
{
Console.Write("+");
}
// 主线程在此处等待task执行完成
task.Wait();
}
}
18.2.2 使用Task.ContinueWith()延续任务
internal static class TaskTest
{
public static void Continuation()
{
Console.WriteLine("Before");
var taskA1 = () => Console.WriteLine("starting...");
var taskA2 = () => Console.WriteLine("continue A...");
// 延续任务获取一个Task类型作为实参,这样才能从延续任务的代码中访问先驱任务的状态,先驱任务完成时,延续任务自动开始;
// 延续任务完成后还会返回一个Task,这样就可以建立起任意长度的连续任务链
// 使用TaskContinuationOptions枚举型指定ContinueWith的行为,详见微软官方手册
var taskA = Task.Run(taskA1).ContinueWith(_ => taskA2, TaskContinuationOptions.None);
// taskA执行完成后taskB和TaskC同时执行。注意:执行完成时间随机,这次可能是B先完成,下次可能是C先完成
var taskB = taskA.ContinueWith(_ => Console.WriteLine("Continue B..."));
var taskC = taskA.ContinueWith(_ => Console.WriteLine("Continue C..."));
Task.WaitAll(taskB, taskC);
Console.WriteLine("Finished...");
}
}
18.2.3 处理Task抛出的AggregateException类型的异常
internal static class TaskTest
{
public static void ExceptionHandler()
{
var task1 = Task.Run(() => throw new InvalidCastException());
var task2 = Task.Run(() => throw new ArgumentException());
try
{
// 触发异常
Task.WaitAll(task1, task2);
}
// Task抛出的异常类型为AggregateException,可以理解为内部异常,具体异常类型包裹在AggregateException内部
catch (AggregateException exception)
{
// 使用Handle方法处理每个异常
exception.Handle(ex =>
{
switch (ex)
{
case InvalidCastException:
Console.WriteLine($"InvalidCastException: {ex.Message}");
break;
case ArgumentException:
Console.WriteLine($"ArgumentException: {ex.Message}");
break;
default:
return false;
}
return true;
});
}
}
}
18.2.4 使用ContinueWith()观察未处理的异常
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
var parentTaskFaulted = false;
var task = new Task(() => throw new InvalidOperationException());
// 只有在先驱任务抛出未处理异常的情况下才运行延续任务
var continuationTask = task.ContinueWith(
antecedentTask => parentTaskFaulted = antecedentTask.IsFaulted,
TaskContinuationOptions.OnlyOnFaulted);
// 开始运行task
task.Start();
// 仅供演示用,没有人会等待一个先驱任务出错后才运行的延续任务
continuationTask.Wait();
// 设置两个断言,断定parentTaskFaulted=true,task.IsFaulted=true
Trace.Assert(parentTaskFaulted);
Trace.Assert(task.IsFaulted);
// 使用空包容符(!.),断定异常一定不为空
task.Exception!.Handle(eachException =>
{
Console.WriteLine(eachException.Message);
return true;
});
}
}
18.2.5 登记未处理的异常
理想情况下是在任何线程常都不应该抛出未处理的异常,但在发生未处理异常时,不应该马上关闭程序,而是先保存工作数据,并记录异常以方便进行错误报告和调试。这要求一种机制来登记未处理异常通知。在.NET Core2.0及以后的版本中,每个AppDomain都提供了这样一种机制,为观察AppDomain中的未处理异常,必须添加一个UnhandledException事件处理程序。无论是主线程还是工作者线程,AppDomain中线程发生的所有未处理异常都会触发UnhandledException事件。注意:该机制的目的只是通知,不允许程序从未处理的异常中恢复并继续运行。示例如下:
using System.Diagnostics;
namespace ConsoleApp1;
internal static class Program
{
private static Stopwatch _clock = new Stopwatch();
private static void Main()
{
try
{
_clock.Start();
// 注册UnhandledException事件
AppDomain.CurrentDomain.UnhandledException += (sender, eventArgs) =>
{
Message("Event handler starting");
Delay(4000);
};
// 在新的线程中抛出未处理的异常
var thread = new Thread(() =>
{
Message("Throwing exception.");
throw new Exception();
});
thread.Start();
// 阻塞主线程,等待子线程抛出异常
Delay(2000);
}
finally
{
Message("Finally block running...");
}
}
private static void Delay(int i)
{
Message($"Sleeping for {i}ms.");
Thread.Sleep(TimeSpan.FromMilliseconds(i));
Message("Awake");
}
private static void Message(string text)
{
Console.WriteLine($"{Environment.CurrentManagedThreadId}, {_clock.ElapsedMilliseconds:0000}, {text}");
}
}
18.2.6 任务取消——使用CancelationToken令牌取消任务
以下示例使用随机数生成器模拟从11个不同仪器读取整型数据,每个仪器读取10次。如果值为零表示一个仪器已失效,在这种情况下,应取消操作,且不应计算平均值。为了处理这种可能的取消操作,需要实例化一个CancellationTokenSource
对象,该对象生成传递给对象的取消令牌(CancelationToken)。调用 TaskFactory.ContinueWhenAll<TAntecedentResult,TResult>(Task<TAntecedentResult>[], Func<Task<TAntecedentResult>[],TResult>, CancellationToken)
方法,以确保仅在成功收集所有读数后计算平均值。如果任务因取消而尚未完成,此时再调用TaskFactory.ContinueWhenAll方法将引发异常。
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
// 实例化CancellationTokenSource对象以创建取消令牌
var source = new CancellationTokenSource();
var cancellationToken = source.Token;
var randomNumberGenerator = new Random();
var tasks = new List<Task<int[]>>();
var taskFactory = new TaskFactory(cancellationToken);
//通过任务工厂创建任务
for (var i = 0; i < 11; i++)
{
var iteration = i + 1;
int[] NewTask()
{
var values = new int[10];
for (var ctr = 1; ctr <= 10; ctr++)
{
var value = randomNumberGenerator.Next(0, 101);
if (value == 0)
{
source.Cancel();
Console.WriteLine($"Cancelling at task {iteration}");
break;
}
values[ctr - 1] = value;
}
return values;
}
tasks.Add(taskFactory.StartNew(NewTask, cancellationToken));
}
try
{
// 将List<Task<int[]>>类型的tasks转化为Task<int[]>[]
// 之后将Task<int[]>[]类型数组命名为dataFromTasks传入lambda表达式计算平均值
var calculateTask = taskFactory.ContinueWhenAll(tasks.ToArray(),
dataFromTasks =>
{
var sum = 0d;
var n = 0;
foreach (var items in dataFromTasks)
{
var data = items.Result;
sum += data.Average();
n++;
}
return sum / n;
}, cancellationToken);
// 执行名为calculateTask的任务,并获取结果
var result = calculateTask.Result;
Console.WriteLine($"the mean is {result}");
}
catch (AggregateException exception)
{
exception.Handle(ex =>
{
switch (ex)
{
case TaskCanceledException:
Console.WriteLine($"Unable to compute mean: {ex.Message}");
break;
default:
Console.WriteLine("Exception: " + ex.GetType().Name);
break;
}
return true;
});
}
finally
{
source.Dispose();
}
}
}
18.2.7 Task.Run()与Task.Factory.StartNew()
Task.Run是Task.Factory.StartNew的简化方法,Task.Run具有了一系列默认参数,使得方法更加简洁
Task.Run(SomeAction);
实际上等价于:
Task.Factory.StartNew(SomeAction, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
Task.Factory.StartNew的完整声明如下:
![[Task.Factory.StartNew完整声明.png]]
18.2.7a TaskCreationOptions
title: None
默认行为
title: AttachedToParent
1. 父任务会等待子任务结束。
2. 父任务会捕获子任务中抛出的异常。
3. 父任务的执行状态取决于子任务的执行状态。
4. 子任务和父任务并不一定运行在同一线程上。
title: DenyChildAttach
如果使用DenyChildAttach选项配置父任务,那么这一选项会阻止子任务附加到父任务中,即使在子任务中设置了AttachedToParent选项也不起任何作用,子任务将作为分离的子任务执行。
title: HideScheduler
在父任务里再创建的子任务将使用默认的TaskScheduler,而不是父任务的TaskScheduler。这相当于在创建Task时隐藏了自己当前的TaskScheduler。如果父任务本身就是使用默认的TaskScheduler(线程池任务调度程序)创建的,那么这个选项不起作用。
title: PreferFairness
让TaskScheduler以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。
title: RunContinuationsAsynchronously
强制添加到当前任务的延续任务异步执行。
title: LongRunning
如果事先知道一个任务要长时间运行,会长时间“霸占”一个底层线程资源,就可以通知调度器任务不会太快结束。这个通知有两方面作用,它提醒调度器应该为该任务创建专用线程,而不是使用来自线程池的线程;它提醒调度器可能应调度比平时更多的线程。
~~~ CSharp
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
using var cancellationTokenSource = new CancellationTokenSource();
void SomeAction()
{
//...
}
Task.Factory.StartNew(SomeAction,
cancellationTokenSource.Token, TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
~~~
18.2.7b TaskScheduler
以下示例创建一个自定义任务调度程序,用于限制应用使用的线程数。 然后,它会启动两组任务(一组使用默认线程池,另一组使用自定义任务调度程序),并显示任务正在执行的任务和线程的相关信息。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
var limitedConcurrencyLevelTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(2);
var tasks1 = new List<Task>();
var tasks2 = new List<Task>();
var taskFactory = new TaskFactory();
using var cancellationTokenSource = new CancellationTokenSource();
var lockObj = new object();
var outputItem = 0;
for (var tCtr = 0; tCtr < 3; tCtr++)
{
var iteration = tCtr;
var task = taskFactory.StartNew(() =>
{
for (var i = 0; i < 20; i++)
{
lock (lockObj)
{
Console.Write("{0} in task t-{1} on thread {2} ",
i, iteration, Environment.CurrentManagedThreadId);
Thread.Sleep(500);
outputItem++;
if (outputItem % 3 == 0)
Console.WriteLine();
}
}
}, cancellationTokenSource.Token);
tasks1.Add(task);
}
Task.WaitAll(tasks1.ToArray());
// Use it to run a second set of tasks.
for (var tCtr = 0; tCtr < 3; tCtr++)
{
var iteration = tCtr;
var task = taskFactory.StartNew(() =>
{
for (var i = 'a'; i <= 'v'; i++)
{
lock (lockObj)
{
Console.Write("'{0}' in task t1-{1} on thread {2} ",
Convert.ToChar(i), iteration, Environment.CurrentManagedThreadId);
Thread.Sleep(500);
outputItem++;
if (outputItem % 2 == 0)
Console.WriteLine();
}
}
}, cancellationTokenSource.Token, TaskCreationOptions.None, limitedConcurrencyLevelTaskScheduler);
tasks2.Add(task);
}
Task.WaitAll(tasks2.ToArray());
Console.WriteLine("\n\nSuccessful completion.");
}
}
internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
// 使用ThreadStatic标记的静态字段不会在线程间共享
// 指示当前线程是否正在处理任务
[ThreadStatic] private static bool _currentThreadIsProcessingItems;
// 该task链表需要lock保护
private readonly LinkedList<Task> _tasks = new();
// 最大允许线程数
public sealed override int MaximumConcurrencyLevel { get; }
// 统计调度程序正在处理的任务
private int _delegatesQueuedOrRunning;
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
MaximumConcurrencyLevel = maxDegreeOfParallelism;
}
protected override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning >= MaximumConcurrencyLevel) return;
_delegatesQueuedOrRunning++;
NotifyThreadPoolOfPendingWork();
}
}
// 通知线程池,有任务需要执行
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task? item;
lock (_tasks)
{
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
item = _tasks.First?.Value;
_tasks.RemoveFirst();
}
if (item != null) base.TryExecuteTask(item);
}
}
finally
{
_currentThreadIsProcessingItems = false;
}
}, null);
}
// 确定是否可以在此调用中同步执行提供的任务【先前任务执行完成后再在同一线程上执行当前任务】
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 如果当前线程没有运行任何任务,就不支持同步执行
if (!_currentThreadIsProcessingItems) return false;
// 如果等待内联的任务在任务等待队列中,那么就将其移除等待队列并执行
if (taskWasPreviouslyQueued)
// Try to run the task.
return TryDequeue(task) && base.TryExecuteTask(task);
return base.TryExecuteTask(task);
}
// Attempt to remove a previously scheduled task from the scheduler.
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// 枚举当前在队列中等待执行的任务并返回可枚举对象
protected override IEnumerable<Task> GetScheduledTasks()
{
var lockToken = false;
try
{
// Monitor的作用在18.5线程同步这一章节会详细介绍
Monitor.TryEnter(_tasks, ref lockToken);
if (lockToken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockToken) Monitor.Exit(_tasks);
}
}
}
运行结果如下:
0 in task t-0 on thread 6 0 in task t-1 on thread 8 0 in task t-2 on thread 7
1 in task t-0 on thread 6 1 in task t-1 on thread 8 1 in task t-2 on thread 7
2 in task t-0 on thread 6 2 in task t-1 on thread 8 2 in task t-2 on thread 7
3 in task t-0 on thread 6 3 in task t-1 on thread 8 3 in task t-2 on thread 7
4 in task t-0 on thread 6 4 in task t-1 on thread 8 4 in task t-2 on thread 7
5 in task t-0 on thread 6 5 in task t-1 on thread 8 5 in task t-2 on thread 7
6 in task t-0 on thread 6 6 in task t-1 on thread 8 6 in task t-2 on thread 7
7 in task t-0 on thread 6 7 in task t-1 on thread 8 7 in task t-2 on thread 7
8 in task t-0 on thread 6 8 in task t-1 on thread 8 8 in task t-2 on thread 7
9 in task t-0 on thread 6 9 in task t-1 on thread 8 9 in task t-2 on thread 7
10 in task t-0 on thread 6 10 in task t-1 on thread 8 10 in task t-2 on thread 7
11 in task t-0 on thread 6 11 in task t-1 on thread 8 11 in task t-2 on thread 7
12 in task t-0 on thread 6 12 in task t-1 on thread 8 12 in task t-2 on thread 7
13 in task t-0 on thread 6 13 in task t-1 on thread 8 13 in task t-2 on thread 7
14 in task t-0 on thread 6 14 in task t-1 on thread 8 14 in task t-2 on thread 7
15 in task t-0 on thread 6 15 in task t-1 on thread 8 15 in task t-2 on thread 7
16 in task t-0 on thread 6 16 in task t-1 on thread 8 16 in task t-2 on thread 7
17 in task t-0 on thread 6 17 in task t-1 on thread 8 17 in task t-2 on thread 7
18 in task t-0 on thread 6 18 in task t-1 on thread 8 18 in task t-2 on thread 7
19 in task t-0 on thread 6 19 in task t-1 on thread 8 19 in task t-2 on thread 7
'a' in task t1-0 on thread 7 'a' in task t1-1 on thread 8
'b' in task t1-0 on thread 7 'b' in task t1-1 on thread 8
'c' in task t1-0 on thread 7 'c' in task t1-1 on thread 8
'd' in task t1-0 on thread 7 'd' in task t1-1 on thread 8
'e' in task t1-0 on thread 7 'e' in task t1-1 on thread 8
'f' in task t1-0 on thread 7 'f' in task t1-1 on thread 8
'g' in task t1-0 on thread 7 'g' in task t1-1 on thread 8
'h' in task t1-0 on thread 7 'h' in task t1-1 on thread 8
'i' in task t1-0 on thread 7 'i' in task t1-1 on thread 8
'j' in task t1-0 on thread 7 'j' in task t1-1 on thread 8
'k' in task t1-0 on thread 7 'k' in task t1-1 on thread 8
'l' in task t1-0 on thread 7 'l' in task t1-1 on thread 8
'm' in task t1-0 on thread 7 'm' in task t1-1 on thread 8
'n' in task t1-0 on thread 7 'n' in task t1-1 on thread 8
'o' in task t1-0 on thread 7 'o' in task t1-1 on thread 8
'p' in task t1-0 on thread 7 'p' in task t1-1 on thread 8
'q' in task t1-0 on thread 7 'q' in task t1-1 on thread 8
'r' in task t1-0 on thread 7 'r' in task t1-1 on thread 8
's' in task t1-0 on thread 7 's' in task t1-1 on thread 8
't' in task t1-0 on thread 7 't' in task t1-1 on thread 8
'u' in task t1-0 on thread 7 'u' in task t1-1 on thread 8
'v' in task t1-0 on thread 7 'v' in task t1-1 on thread 8
'a' in task t1-2 on thread 7 'b' in task t1-2 on thread 7
'c' in task t1-2 on thread 7 'd' in task t1-2 on thread 7
'e' in task t1-2 on thread 7 'f' in task t1-2 on thread 7
'g' in task t1-2 on thread 7 'h' in task t1-2 on thread 7
'i' in task t1-2 on thread 7 'j' in task t1-2 on thread 7
'k' in task t1-2 on thread 7 'l' in task t1-2 on thread 7
'm' in task t1-2 on thread 7 'n' in task t1-2 on thread 7
'o' in task t1-2 on thread 7 'p' in task t1-2 on thread 7
'q' in task t1-2 on thread 7 'r' in task t1-2 on thread 7
's' in task t1-2 on thread 7 't' in task t1-2 on thread 7
'u' in task t1-2 on thread 7 'v' in task t1-2 on thread 7
从运行结果可以看出,自定义任务调度程序限制了应用使用的线程数,最多只有两个任务在同时运行。默认调度程序(线程池)自动分配了三个线程,有三个任务可以同时运行。其实,作为基类的TaskScheduler已经定义了一组任务调度程序,在大多数情况下可以直接使用,不再需要定义自己的任务调度程序了。TaskScheduler这一主题,在18.3.4d中还会介绍。
18.2.8 对任务进行资源清理
Task类型还支持IDisposable,但之前的示例代码既没有包含Dispose调用,也没有通过using语句来隐式调用。相反,依赖的是程序退出时的终结器。这导致了两个后果。首先,句柄(handler)存活时间变长了,因而会消耗更多资源。其次,垃圾回收器的效率变低了,因为被终结的对象活到了下一代。但在Task的情况下,除非要终结大量任务,否则这两方面的问题都不大。如果要手动dispose,务必确保任务已经结束且没有其他代码在使用它们。
18.2.9 设计规范
title: 设计规范
1. 避免程序在任何线程上产生未处理的异常
2. 考虑登记“未处理异常”事件处理程序以进行调试、记录和紧急关闭
3. 要取消未完成的任务而不是在程序关闭期间允许其运行,即:在程序退出时要先终止正在执行的任务,而不是放任不管
4. 线程池假设任务是CPU-bound类型且运行时间较短,对于IO-bound类型的任务或运行时间常的任务,要告诉任务工厂(Task.Factory)新建的任务可能会长时间运行,使得任务工厂能够恰当地管理它。比如:在等待数据返回期间运行其他短时间的任务。
5. 要少用TaskCreationOptions.LongRunning
6. 避免在正式产品中使用Thread.Sleep()
7. 要使用任务相关API(System.Threading.Tasks)来替代System.Threading中的其他API,比如:Thread、ThreadPool等
标签:task,thread,读书笔记,C#,t1,任务,Task,var,本质论
From: https://www.cnblogs.com/z7hwHqeY/p/18563878