18.3 基于任务的异步编程模式
18.3.1 使用任务并行库(TPL)实现异步执行高延迟操作
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
const string defaultUrl = "https://www.baidu.com";
var task1 = Task.Run(() => AsynchronousDownload.Test(defaultUrl));
var task2 = Task.Run(() => SynchronousDownload.Test(defaultUrl));
Task.WaitAll(task1, task2);
}
}
internal static class SynchronousDownload
{
public static void Test(string defaultUrl)
{
using var httpClient = new HttpClient
{
Timeout = TimeSpan.FromSeconds(10),
};
Console.WriteLine("SynchronousDownload: httpClient 开始运行");
// 虽然调用了异步方法,但每次调用后都等待异步方法执行完成,可以认为是同步执行的
var result = httpClient.GetAsync(defaultUrl).Result.Content.ReadAsStringAsync().Result;
Console.WriteLine(TextOccurrence(result, "html"));
Console.WriteLine("SynchronousDownload: httpClient 运行结束");
}
private static int TextOccurrence(string downloadString, string findText)
{
if (!downloadString.Contains(findText)) return 0;
var strReplaced = downloadString.Replace(findText, "");
return (downloadString.Length - strReplaced.Length) / findText.Length;
}
}
internal static class AsynchronousDownload
{
public static void Test(string defaultUrl)
{
using var httpClient = new HttpClient
{
Timeout = TimeSpan.FromSeconds(10),
};
// 调用一个返回httpResponseMessage的Task任务 [约定形式的任务: 调用即启动,不能再次触发Start操作]
// 在获取到httpResponseMessage之后再执行后续任务
Console.WriteLine("AsynchronousDownload: httpClient 开始运行");
var httpResponseMessage = httpClient.GetAsync(defaultUrl).ContinueWith(message =>
{
var downloadString = message.Result.Content.ReadAsStringAsync().Result;
Console.WriteLine(TextOccurrence(downloadString, "!DOCTYPE html"));
Console.WriteLine("AsynchronousDownload: httpClient 运行结束");
});
// 在处理httpClient的同时运行其他任务
var task1 = Task.Run(() =>
{
Console.WriteLine("task1开始运行");
// 模拟耗时操作[线程阻塞300ms]
Thread.Sleep(300);
Console.WriteLine("task1运行结束");
});
// 在处理httpClient的同时运行其他任务
var task2 = Task.Run(() =>
{
Console.WriteLine("task2开始运行");
// 模拟耗时操作[线程阻塞2000ms]
Thread.Sleep(2000);
Console.WriteLine("task2运行结束");
});
Task.WaitAll(task1, task2, httpResponseMessage);
}
private static int TextOccurrence(string downloadString, string findText)
{
if (!downloadString.Contains(findText)) return 0;
var strReplaced = downloadString.Replace(findText, "");
return (downloadString.Length - strReplaced.Length) / findText.Length;
}
}
18.3.2 使用async和await实现基于任务的异步模式
using System;
using System.Diagnostics;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
// 任何具有async关键词的方法都必须返回一个有效的异步返回值类型,Main方法也不例外
private static async Task Main()
{
const string defaultUrl = "https://www.baidu.com";
await AsynchronousDownload.TestAsync(defaultUrl);
}
}
internal static class AsynchronousDownload
{
// 使用async指定该方法是异步的
public static async Task TestAsync(string defaultUrl)
{
// 超出作用范围后自动dispose
using var httpClient = new HttpClient
{
Timeout = TimeSpan.FromSeconds(8),
};
var stopWatch = new Stopwatch();
stopWatch.Start();
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: httpClient开始运行");
// httpClient开始异步执行
var httpMessage = httpClient.GetAsync(defaultUrl);
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}:其他任务开始运行");
// 在处理httpClient的同时运行其他任务
var task1 = Task.Run(() =>
{
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: task1开始运行");
// 模拟耗时操作[线程阻塞300ms]
Thread.Sleep(300);
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: task1运行结束");
});
// 在处理httpClient的同时运行其他任务
var task2 = Task.Run(() =>
{
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: task2开始运行");
// 模拟耗时操作[线程阻塞2000ms]
Thread.Sleep(2000);
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: task2运行结束");
});
// 开始处理httpClient返回的结果
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}: 开始处理httpClient返回的结果");
// 等待获取httpResponseMessage
var message = await httpMessage;
// 等待ReadAsString异步方法的返回结果
var resultString = await message.Content.ReadAsStringAsync();
// 输出查找的结果
Console.WriteLine(TextOccurrence(resultString, "html"));
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}:httpClient任务运行结束");
// await等待其他任务的结果返回
await task1;
await task2;
Console.WriteLine($"{stopWatch.ElapsedMilliseconds:0000}:全部任务运行结束");
stopWatch.Stop();
}
private static int TextOccurrence(string downloadString, string findText)
{
if (!downloadString.Contains(findText)) return 0;
var strReplaced = downloadString.Replace(findText, "");
return (downloadString.Length - strReplaced.Length) / findText.Length;
}
}
18.3.2a await可用于任何正确的模型
await
关键字后面的表达式的类型通常为Task
、Task<T>
、ValueTask
、ValueTask<T>
,偶尔为void
,在C#8.0中也支持IAsyncEnumerable<T>
或IAsyncEnumerator<string>
。从规则上讲,await支持的返回类型比这些更泛化一点:
它要求类型为可等待类型,即包含一个GetAwaiter()
方法和bool IsCompleted
属性,并且
GetAwaiter()
需要返回一个特定的对象。这个对象要包含 GetResult()
方法、bool IsCompleted
属性,还要实现 INotifyCompletion
接口。
通过这个一般化规则,系统可以很容易地被第三方扩展。换句话说,如果你想设计一个非基于任务的异步系统,可以遵循上述规则来实现支持await的自定义可等待类型。
18.3.2b async与await的内部原理
一段简单的代码:
using System;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static async Task Main(string[] args)
{
Console.WriteLine("Let's Go");
await TestAsync();
Console.WriteLine("World");
}
private static Task TestAsync()
{
var task = Task.Run(() => { Console.Write("Hello "); });
return task;
}
}
上述代码的反编译结果(C#表示)
Main方法:
[DebuggerStepThrough]
private static void <Main>(string[] args)
{
Main(args).GetAwaiter().GetResult();
}
[AsyncStateMachine(typeof(<Main>d__0)), DebuggerStepThrough]
private static Task Main(string[] args)
{
<Main>d__0 stateMachine = new <Main>d__0
{
<>t__builder = AsyncTaskMethodBuilder.Create(),
args = args,
<>1__state = -1
};
stateMachine.<>t__builder.Start<<Main>d__0>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
// 实现了 IAsyncStateMachine 接口
[CompilerGenerated]
private sealed class <Main>d__0 : IAsyncStateMachine
{
// Fields
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
public string[] args;
private TaskAwaiter <>u__1;
// Methods
private void MoveNext() { }
[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine) { }
}
程序入口点仍然是void Main,Task Main是编译器改写的方法。另外,Task Main被标记成了AsyncStateMachine(异步状态机),异步Main方法主要做了三件事:
- 创建了一个类型为
<Main>d__0
的状态机,初始化了公共变量<>t__builder
、args
、<>1__state = -1
,其中:<>t__builder
:负责异步相关的操作,是实现异步 Main 方法异步的核心<>1__state = -1
:状态机当前的状态
- 调用
Start
方法,借助状态机, 执行在异步Main中写的代码。 - 返回指示异步Main方法运行状态的Task对象。
Start方法:
// 所属结构体:AsyncTaskMethodBuilder
[SecuritySafeCritical, DebuggerStepThrough, __DynamicallyInvokable]
public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine: IAsyncStateMachine
{
if (((TStateMachine) stateMachine) == null)
{
throw new ArgumentNullException("stateMachine");
}
ExecutionContextSwitcher ecsw = new ExecutionContextSwitcher();
RuntimeHelpers.PrepareConstrainedRegions();
try
{
ExecutionContext.EstablishCopyOnWriteScope(ref ecsw);
// 状态机状态流转
stateMachine.MoveNext();
}
finally
{
ecsw.Undo();
}
}
MoveNext方法:
[CompilerGenerated]
private sealed class <Main>d__0 : IAsyncStateMachine
{
// Fields
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
public string[] args;
private TaskAwaiter <>u__1;
// Methods
private void MoveNext()
{
// 在 Main 方法中,我们初始化 <>1__state = -1,所以此时 num = -1
int num = this.<>1__state;
try
{
TaskAwaiter awaiter;
if (num != 0)
{
Console.WriteLine("Let's Go!");
// 调用 TestAsync(),获取 awaiter,用于后续监控 TestAsync() 运行状态
awaiter = Program.TestAsync().GetAwaiter();
// 一般来说,异步任务不会很快就完成,所以大多数情况下都会进入该分支
if (!awaiter.IsCompleted)
{
// 状态机状态从 -1 流转为 0
this.<>1__state = num = 0;
this.<>u__1 = awaiter;
Program.<Main>d__0 stateMachine = this;
// 配置 TestAsync() 完成后的延续
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref stateMachine);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = new TaskAwaiter();
this.<>1__state = num = -1;
}
awaiter.GetResult();
Console.Write(" World!");
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
异步状态机:
![[状态机.png]]
先简单理一下内部逻辑:
- 设置变量 num = -1,此时 num != 0,则会进入第一个if语句,
- 首先,执行
Console.WriteLine("Let's Go!")
- 然后,调用异步方法
TestAsync
,TestAsync
方法会在另一个线程池线程中执行,并获取指示该方法运行状态的 awaiter - 如果此时
TestAsync
方法已执行完毕,则像没有异步一般:- 继续执行接下来的
Console.Write(" World!")
- 最后设置 <>1__state = -2,并设置异步 Main 方法的返回结果
- 继续执行接下来的
- 如果此时
TestAsync
方法未执行完毕,则:- 设置 <>1__state = num = 0
- 调用
AwaitUnsafeOnCompleted
方法,用于配置当TestAsync
方法完成时的延续,即Console.Write(" World!")
- 返回指示异步 Main 方法执行状态的 Task 对象,由于同步 Main 方法中通过使用
GetResult()
同步阻塞主线程等待任务结束,所以不会释放主线程(废话,如果释放了程序就退出了)。不过对于其他子线程,一般会释放该线程
大部分逻辑我们都可以很容易的理解,唯一需要深入研究的就是AwaitUnsafeOnCompleted
,那我们接下来就看看它的内部实现
AwaitUnsafeOnCompleted方法【重点】:
// 所属结构体:AsyncTaskMethodBuilder
[__DynamicallyInvokable]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter: ICriticalNotifyCompletion where TStateMachine: IAsyncStateMachine
{
this.m_builder.AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref awaiter, ref stateMachine);
}
// 所属结构体:AsyncTaskMethodBuilder<TResult>
[SecuritySafeCritical, __DynamicallyInvokable]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter: ICriticalNotifyCompletion where TStateMachine: IAsyncStateMachine
{
try
{
// 用于流转状态机状态的 runner
AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize = null;
Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runnerToInitialize);
if (this.m_coreState.m_stateMachine == null)
{
// 此处构建指示异步 Main 方法执行状态的 Task 对象
Task<TResult> builtTask = this.Task;
this.m_coreState.PostBoxInitialization((TStateMachine) stateMachine, runnerToInitialize, builtTask);
}
awaiter.UnsafeOnCompleted(completionAction);
}
catch (Exception exception)
{
AsyncMethodBuilderCore.ThrowAsync(exception, null);
}
}
先看一下GetCompletionAction
的实现:
// 所属结构体:AsyncMethodBuilderCore
[SecuritySafeCritical]
internal Action GetCompletionAction(Task taskForTracing, ref MoveNextRunner runnerToInitialize)
{
Action defaultContextAction;
MoveNextRunner runner;
Debugger.NotifyOfCrossThreadDependency();
//
ExecutionContext context = ExecutionContext.FastCapture();
if ((context != null) && context.IsPreAllocatedDefault)
{
defaultContextAction = this.m_defaultContextAction;
if (defaultContextAction != null)
{
return defaultContextAction;
}
// 构建 runner
runner = new MoveNextRunner(context, this.m_stateMachine);
// 返回值
defaultContextAction = new Action(runner.Run);
if (taskForTracing != null)
{
this.m_defaultContextAction = defaultContextAction = this.OutputAsyncCausalityEvents(taskForTracing, defaultContextAction);
}
else
{
this.m_defaultContextAction = defaultContextAction;
}
}
else
{
runner = new MoveNextRunner(context, this.m_stateMachine);
defaultContextAction = new Action(runner.Run);
if (taskForTracing != null)
{
defaultContextAction = this.OutputAsyncCausalityEvents(taskForTracing, defaultContextAction);
}
}
if (this.m_stateMachine == null)
{
runnerToInitialize = runner;
}
return defaultContextAction;
}
接着来到new MoveNextRunner(context, this.m_stateMachine)
这里初始化了MoveNextRunner
,构造函数如下:
[SecurityCritical]
internal MoveNextRunner(ExecutionContext context, IAsyncStateMachine stateMachine)
{
// 将 ExecutionContext 保存了下来
this.m_context = context;
// 将 stateMachine 保存了下来(不过此时为 null)
this.m_stateMachine = stateMachine;
}
往下来到defaultContextAction = new Action(runner.Run)
,你可以发现,最终咱们返回的就是这个 defaultContextAction ,所以这个runner.Run
至关重要,不过别着急,我们等用到它的时候我们再来看其内部实现。
最后,回到AwaitUnsafeOnCompleted
方法,继续往下走。构建指示异步 Main 方法执行状态的 Task 对象,设置当前的状态机后,来到awaiter.UnsafeOnCompleted(completionAction);
,要记住,入参 completionAction 就是刚才返回的runner.Run
:
// 所属结构体:TaskAwaiter
[SecurityCritical, __DynamicallyInvokable]
public void UnsafeOnCompleted(Action continuation)
{
OnCompletedInternal(this.m_task, continuation, true, false);
}
[MethodImpl(MethodImplOptions.NoInlining), SecurityCritical]
internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext)
{
if (continuation == null)
{
throw new ArgumentNullException("continuation");
}
StackCrawlMark lookForMyCaller = StackCrawlMark.LookForMyCaller;
if (TplEtwProvider.Log.IsEnabled() || Task.s_asyncDebuggingEnabled)
{
continuation = OutputWaitEtwEvents(task, continuation);
}
// 配置延续方法
task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref lookForMyCaller);
}
直接来到代码最后一行,看到延续方法的配置
// 所属类:Task
[SecurityCritical]
internal void SetContinuationForAwait(Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext, ref StackCrawlMark stackMark)
{
TaskContinuation tc = null;
if (continueOnCapturedContext)
{
// 这里我们用的是不进行流动的 SynchronizationContext
SynchronizationContext currentNoFlow = SynchronizationContext.CurrentNoFlow;
// 像 Winform、WPF 这种框架,实现了自定义的 SynchronizationContext,
// 所以在 Winform、WPF 的 UI线程中进行异步等待时,一般 currentNoFlow 不会为 null
if ((currentNoFlow != null) && (currentNoFlow.GetType() != typeof(SynchronizationContext)))
{
// 如果有 currentNoFlow,那么我就用它来执行延续方法
tc = new SynchronizationContextAwaitTaskContinuation(currentNoFlow, continuationAction, flowExecutionContext, ref stackMark);
}
else
{
TaskScheduler internalCurrent = TaskScheduler.InternalCurrent;
if ((internalCurrent != null) && (internalCurrent != TaskScheduler.Default))
{
tc = new TaskSchedulerAwaitTaskContinuation(internalCurrent, continuationAction, flowExecutionContext, ref stackMark);
}
}
}
if ((tc == null) & flowExecutionContext)
{
tc = new AwaitTaskContinuation(continuationAction, true, ref stackMark);
}
if (tc != null)
{
if (!this.AddTaskContinuation(tc, false))
{
tc.Run(this, false);
}
}
// 这里会将 continuationAction 设置为 awaiter 中 task 对象的延续方法,所以当 TestAsync() 完成时,就会执行 runner.Run
else if (!this.AddTaskContinuation(continuationAction, false))
{
AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
}
}
对于我们的示例来说,既没有自定义 SynchronizationContext,也没有自定义 TaskScheduler,所以会直接来到最后一个else if (...)
,重点在于this.AddTaskContinuation(continuationAction, false)
,这个方法会将我们的延续方法添加到 Task 中,以便于当 TestAsync 方法执行完毕时,执行runner.Run
方法,是时候看看runner.Run
的内部实现了:
[SecuritySafeCritical]
internal void Run()
{
if (this.m_context != null)
{
try
{
// 我们并未给 s_invokeMoveNext 赋值,所以 callback == null
ContextCallback callback = s_invokeMoveNext;
if (callback == null)
{
// 将回调设置为下方的 InvokeMoveNext 方法
s_invokeMoveNext = callback = new
ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext);
}
ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);
return;
}
finally
{
this.m_context.Dispose();
}
}
this.m_stateMachine.MoveNext();
}
[SecurityCritical]
private static void InvokeMoveNext(object stateMachine)
{
((IAsyncStateMachine) stateMachine).MoveNext();
}
来到ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);
,这里的 callback 是InvokeMoveNext
方法。所以,当TestAsync
执行完毕后,就会执行延续方法 runner.Run,也就会执行stateMachine.MoveNext()
促使状态机继续进行状态流转,这样逻辑就打通了:
private void MoveNext()
{
// num = 0
int num = this.<>1__state;
try
{
TaskAwaiter awaiter;
if (num != 0)
{
Console.WriteLine("Let's Go!");
awaiter = Program.TestAsync().GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = num = 0;
this.<>u__1 = awaiter;
Program.<Main>d__0 stateMachine = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref stateMachine);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = new TaskAwaiter();
// 状态机状态从 0 流转到 -1
this.<>1__state = num = -1;
}
// 结束对 TestAsync() 的等待
awaiter.GetResult();
// 执行延续方法
Console.Write(" World!");
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
// 状态机状态从 -1 流转到 -2
this.<>1__state = -2;
// 设置异步 Main 方法最终返回结果
this.<>t__builder.SetResult();
}
至此,整个异步方法的执行就结束了,通过一张图总结一下:
![[async与await的内部原理总结.png]]
18.3.2c 多个 async await 嵌套
如果有多个 async await 嵌套,那会出现什么情况呢?
static Task TestAsync()
{
return Task.Run(async () =>
{
// 增加了这行
await Task.Run(() =>
{
Console.Write("Say: ");
});
Console.Write("Hello");
});
}
主要看看TestAsync()
:
private static Task TestAsync() =>
Task.Run(delegate {
<>c.<<TestAsync>b__1_0>d stateMachine = new <>c.<<TestAsync>b__1_0>d {
<>t__builder = AsyncTaskMethodBuilder.Create(),
<>4__this = this,
<>1__state = -1
};
stateMachine.<>t__builder.Start<<>c.<<TestAsync>b__1_0>d>(ref stateMachine);
return stateMachine.<>t__builder.Task;
});
得出结论:async await的嵌套也就是状态机的嵌。
18.3.2d 多个 async await 在同一方法中顺序执行
如果有多个 async await 在同一方法中顺序执行?
static async Task Main(string[] args)
{
Console.WriteLine("Let's Go!");
await Test1Async();
await Test2Async();
Console.Write(" World!");
}
static Task Test1Async()
{
return Task.Run(() =>
{
Console.Write("Say: ");
});
}
static Task Test2Async()
{
return Task.Run(() =>
{
Console.Write("Hello");
});
}
直接看状态机:
[CompilerGenerated]
private sealed class <Main>d__0 : IAsyncStateMachine
{
// Fields
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
public string[] args;
private TaskAwaiter <>u__1;
// Methods
private void MoveNext()
{
int num = this.<>1__state;
try
{
TaskAwaiter awaiter;
TaskAwaiter awaiter2;
if (num != 0)
{
if (num == 1)
{
awaiter = this.<>u__1;
this.<>u__1 = default(TaskAwaiter);
this.<>1__state = -1;
goto IL_D8;
}
Console.WriteLine("Let's Go!");
awaiter2 = Program.Test1Async().GetAwaiter();
if (!awaiter2.IsCompleted)
{
this.<>1__state = 0;
this.<>u__1 = awaiter2;
Program.<Main>d__0 <Main>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter2, ref <Main>d__);
return;
}
}
else
{
awaiter2 = this.<>u__1;
this.<>u__1 = default(TaskAwaiter);
this.<>1__state = -1;
}
awaiter2.GetResult();
// 待 Test1Async 完成后,继续执行 Test2Async
awaiter = Program.Test2Async().GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = 1;
this.<>u__1 = awaiter;
Program.<Main>d__0 <Main>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
return;
}
IL_D8:
awaiter.GetResult();
Console.Write(" World!");
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
[DebuggerHidden]
private void SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
可见,就是一个状态机状态一直流转。
18.3.3 ValueTask
18.3.3a 为什么需要ValueTask
Task作为一个类型,它非常灵活而且带来了许多好处。例如,一个Task可以await多次,被任意数量的消费者并发消费。您可以将其存储到字典中,让任意数量的后续使用者在将来await,这就允许了使用字典来作为异步结果的缓存。如果场景需要,还可以阻塞等待一个Task直至其完成。而且,可以编写和使用针对Task的多种操作(有时称为“组合器”,combinators),例如“WhenAny”操作,即异步等待最先完成的Task。然而,在多数常见用法中——简单地执行一个异步操作然后await得到的Task,灵活性并不是必需的,如:
TResult result = await SomeOperationAsync();
UseResult(result);
在这种用法下,我们不需要:
- 多次await Task
- 处理并发地await
- 处理同步阻塞
- 写组合器
现在,由于 Task是一个类(class)【引用类型】,从异步方法返回一个 Task 对象意味着每次调用该方法时都会在托管堆(managed heap)上分配该对象,分配的object对象越多,垃圾收集器(GC)所需要做的工作就越多,因此,在使用 Task 时需要注意的一点是,每次从方法返回 Task 对象时都需要在托管堆中分配内存。如果某一方法执行操作的结果立即可用或同步完成,则不需要这种分配。
为了解决这一问题,微软在.NET Core 2.0引入了ValueTask<TResult>
类型,该类型是一个包装TResult
或Task<TResult>
的只读结构体(Readonly Struct),这意味着它可以从异步方法中返回,并且如果该方法同步成功完成,则不需要分配任何内容:我们可以简单地使用TResult
初始化ValueTask<TResult>
结构体并返回它。仅当该方法异步完成时,才要分配Task<TResult>
实例,并使用ValueTask<TResult>
来包装该实例。这样可以在一定程度上避免不必要的堆内存分配。
18.3.3b Task、Task<T>、ValueTask<T>如何选择
- 一个异步方法没有返回值则首选Task
- 如果异步方法中的操作有可能异步完成,或者有可能缓存常见的结果Task对象,则首选
Task<T>
- 如果操作可能同步完成并且不能有效地缓存所有常见的返回值,则
ValueTask<T>
是更适合的选择
18.3.3c ValueTask的一些限制
- ValueTask只能等待一次
- ValueTask不能并发await,在并发条件下等待ValueTask会产生竞争,且可能会永远等待(任务已经完成但有些线程无法收到通知)
- 在使用
.GetAwaiter().GetResult()
时必须确保操作已经完成。原因在于:IValueTaskSource
接口和IValueTaskSource<TResult>
接口的实现中并没有强制要求支持阻塞,所以这些接口的实现类类型很可能也不会支持阻塞,这样就造成了一种可能的竞争状态,让程序不按照调用方的意愿去执行。相反,Task/Task<TResult>
支持此功能,可以阻塞调用者,直到任务完成,所以Task类型可以使用.GetAwaiter().GetResult()
- 如果需要阻塞或者多次等待要使用AsTask方法将ValueTask类型转换为Task类型
- AsTask方法只能使用一次,因为转换完成后原先的ValueTask已经不存在了
// 以这个方法为例
public ValueTask<int> SomeValueTaskReturningMethodAsync();
…
// GOOD
int result = await SomeValueTaskReturningMethodAsync();
// GOOD
int result = await SomeValueTaskReturningMethodAsync().ConfigureAwait(false);
// GOOD
Task<int> t = SomeValueTaskReturningMethodAsync().AsTask();
// WARNING
ValueTask<int> vt = SomeValueTaskReturningMethodAsync();
... // 将实例存储到本地会使它被滥用的可能性更大,
// 不过这还好,适当使用没啥问题
// BAD: await 多次
ValueTask<int> vt = SomeValueTaskReturningMethodAsync();
int result = await vt;
int result2 = await vt;
// BAD: 并发 await (and, by definition then, multiple times)
ValueTask<int> vt = SomeValueTaskReturningMethodAsync();
Task.Run(async () => await vt);
Task.Run(async () => await vt);
// BAD: 在不清楚操作是否完成的情况下使用 GetAwaiter().GetResult()
ValueTask<int> vt = SomeValueTaskReturningMethodAsync();
int result = vt.GetAwaiter().GetResult();
18.3.3d ValueTask中的IsCompleted属性和IsCompletedSuccessfully属性
虽然ValueTask没有强制要求支持阻塞,但确实提供了一些与任务当前状态有关的属性:
IsCompleted
,如果操作尚未完成,则返回false
;如果操作已完成,则返回true
(这意味着该操作不再运行,并且可能已经成功完成或以其他方式完成)IsCompletedSuccessfully
,当且仅当它已完成并成功完成才返回true
(意味着尝试等待它或访问其结果不会导致引发异常)
示例如下:
int bytesRead;
ValueTask<int> readTask = _connection.ReadAsync(buffer);
if (readTask.IsCompletedSuccessfully)
{
// 如果成功完成就直接读取结果
bytesRead = readTask.Result;
}
else
{
// 如果没成功完成就监测取消令牌,看是否取消任务,如果不取消任务就等待任务返回的结果
using (_connection.RegisterCancellation())
{
bytesRead = await readTask;
}
}
18.3.4 异步流
18.3.4a 异步流要解决的问题
有时,我们希望一个异步方法可以用IEnumerable<T>
或IEnumerator<T>
作为返回值,并且在方法内部异步地产生可供调用者获取的集合元素。但是异步方法返回的对象必须支持GetAwaiter方法,就像Task
、Task<T>
和ValueTask<T>
所作的那样。因此这里就产生了矛盾:你不能同时使用异步方法和迭代器,如果要在异步方法中迭代一个集合,你不能将结果随时传递给调用者,只能先将所需的迭代做完。为了解决这一问题,C#8.0提供了异步流(AsyncStream),帮助实现异步集合以及异步迭代。异步流(AsyncStream)是可枚举类(Enumerable)的异步变体,它会在遍历下一个元素的时候(Next)会涉及异步操作,只要一个类实现了IAsyncEnumerable接口就能使用异步流。示例如下:
假设要实现一个异步方法,该方法接收一个给定的目录,然后对目录中的文件进行加密
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
// 任何具有async关键词的方法都必须返回一个有效的异步返回值类型,Main方法也不例外
private static async Task Main()
{
const string directoryPath = @"D:\Games\Genshin Impact";
const string searchPattern = "*";
// 获取当前文件夹下的所有文件的名称(包含绝对路径)
var files = Directory.EnumerateFiles(directoryPath, searchPattern);
// 设置取消令牌,该令牌在2秒钟后自动动作
using var cancellationTokenSource = new CancellationTokenSource(delay: TimeSpan.FromSeconds(2));
// 选择对称加密套件,这里用Aes
using var cryptographer = Aes.Create();
try
{
await foreach (var item in EncryptFileAsync(files, cryptographer, cancellationTokenSource.Token))
{
Console.WriteLine(item);
}
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
}
// 对单个文件加密
private static async ValueTask<string> EncryptFileAsync(string fileName, SymmetricAlgorithm cryptographer)
{
// 创建加密文件
var encryptedFileName =
$"{Directory.GetCurrentDirectory()}{fileName.Remove(0, @"D:\Games\Genshin Impact".Length)}.encrypt";
await using var outputFileStream = new FileStream(encryptedFileName, FileMode.Create);
// 构建加密数据流
await using var cryptoStream =
new CryptoStream(outputFileStream, cryptographer.CreateEncryptor(), CryptoStreamMode.Write);
// 构建写入数据流
await using var streamWriter = new StreamWriter(cryptoStream);
// 读取待加密文件的信息
var data = await File.ReadAllTextAsync(fileName);
// 向写入数据流注入数据 [原始文件数据流=>写入数据流=>加密数据流=>加密文件数据流]
await streamWriter.WriteAsync(data);
// 返回加密前后文件的绝对路径
return $"{fileName} => {encryptedFileName}";
}
// 对多个文件加密
// EnumeratorCancellation属性表示返回值为IAsyncEnumerable<T>方法
// 应从 GetAsyncEnumerator(CancellationToken)接收取消令牌的状态
private static async IAsyncEnumerable<string> EncryptFileAsync(IEnumerable<string> files,
SymmetricAlgorithm cryptographer,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (var fileName in files)
{
// 调用对单个文件加密的EncryptFileAsync方法
yield return await EncryptFileAsync(fileName, cryptographer);
// 如果取消令牌动作就抛出"操作被取消"这一异常
cancellationToken.ThrowIfCancellationRequested();
}
}
}
18.3.4b yield return中的语句执行顺序问题
有如下示例:
using System;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
var list = new int[] { 1, 3, 5, 6, 7, 8, 9 };
foreach (var item in Test(list))
{
Console.WriteLine("yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务");
Console.WriteLine(item);
Console.WriteLine("主方法体中,和yield return 关联的任务已完成");
}
}
private static IEnumerable<int> Test(IEnumerable<int> list)
{
var listArray = list as int[] ?? list.ToArray();
var lenght = listArray.Length;
foreach (var item in listArray)
{
Console.WriteLine("yield return 之前的任务");
yield return item;
Console.WriteLine("开始执行子方法中yield return 之后的任务");
Console.WriteLine($"{item}, {lenght}");
Console.WriteLine("子方法中yield return 之后的任务运行完成");
}
}
}
在上述示例中,yield return 之前的语句在返回之前执行,但yield return 之后的语句要等到主方法(使用了yield return的返回值)处理完本次的返回值之后才会执行。具体表现为:
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
1
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
1, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
3
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
3, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
5
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
5, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
6
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
6, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
7
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
7, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
8
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
8, 7
子方法中yield return 之后的任务运行完成
yield return 之前的任务
yield return返回主方法后,接着执行主方法体中的与yield return 关联的任务
9
主方法体中,和yield return 关联的任务已完成
开始执行子方法中yield return 之后的任务
9, 7
子方法中yield return 之后的任务运行完成
如果在Main方法没有使用Test方法返回的IEnumerable对象,即使调用了Test方法,yield return语句块中的语句,无论在yield return之前还是之后,均不会被执行。示例如下:
using System;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
var list = new int[] { 1, 3, 5, 6, 7, 8, 9 };
// 调用Test方法但不使用该方法的返回值
var enumerableObject = Test(list);
}
private static IEnumerable<int> Test(IEnumerable<int> list)
{
var listArray = list as int[] ?? list.ToArray();
var lenght = listArray.Length;
foreach (var item in listArray)
{
// 主方法没有使用yield return的返回值,该语句不会被执行
Console.WriteLine("yield return 之前的方法");
yield return item;
// 主方法没有使用yield return的返回值,之后的方法不会被执行
Console.WriteLine("开始执行子方法中yield return 之后的任务");
Console.WriteLine($"{item}, {lenght}");
Console.WriteLine("子方法中yield return 之后的任务运行完成");
}
}
}
18.3.4c IAsyncDisposable接口以及await using语句
在18.3.4a的示例中使用了await using语句,这一语句是语法糖,它简化了具有IasyncDisposable接口的对象的内存分配和释放。这一小结会从如何实现IAsyncDisposable和IDisposable接口入手,介绍await using的工作原理,同时在下一小结(18.3.4d)中会着重介绍在这一小结的示例中使用的ConfigureAwait方法具体有什么作用。
设计规范要求在实现IAsyncDisposable接口时必须同时实现IDisposable接口,如果实现IAsyncDisposable接口,但不实现IDisposable接口,则应用可能会泄漏资源。 原因在于,如果一个类实现了 IAsyncDisposable,但没有实现 IDisposable,并且使用者只调用了Dispose,那么实现将永远不会调用DisposeAsync,这将导致资源泄漏。
同时实现释放模式(IDisposable接口)和异步释放模式(IAsyncDisposable接口)的模板如下:
internal class AsyncDisposableSchedule : IAsyncDisposable, IDisposable
{
public async ValueTask DisposeAsync()
{
// dispose of managed resource asynchronously.
await DisposeAsyncCore();
// dispose of unmanaged resources synchronously.
// Note: unmanaged resource can't be released asynchronously.
Dispose(false);
GC.SuppressFinalize(this);
}
private async ValueTask DisposeAsyncCore()
{
// release managed resources asynchronously.
}
private void ReleaseUnmanagedResources()
{
// release unmanaged resources synchronously.
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private bool _isDisposed;
private void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
// release managed resources asynchronously.
}
ReleaseUnmanagedResources();
_isDisposed = true;
}
~AsyncDisposableSchedule()
{
// dispose of unmanaged resources synchronously.
Dispose(false);
// managed resource will be released by GC automatically.
}
}
【示例1】用ExampleAsyncDisposable自定义类包裹Utf8JsonWriter类型,由于Utf8JsonWriter类型实现了IAsyncDisposable和IDisposable接口,所以ExampleAsyncDisposable自定义类也要实现这些接口,示例如下:
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static async Task Main()
{
// 主动调用Dispose方法释放资源
Test();
// 主动调用DisposeAsync方法释放资源
await TestAsync();
// 语法糖:使用using隐式调用Dispose方法释放资源
using var testInstance = new AsyncDisposableExample();
// 语法糖: 使用await using隐式调用DisposeAsync方法释放资源
await using var testInstanceAsync = new AsyncDisposableExample();
}
private static void Test()
{
var testInstance = new AsyncDisposableExample();
testInstance.Dispose();
}
private static async ValueTask TestAsync()
{
var testInstance = new AsyncDisposableExample();
await testInstance.DisposeAsync();
}
}
internal class AsyncDisposableExample : IAsyncDisposable, IDisposable
{
private Utf8JsonWriter? _jsonWriter = new Utf8JsonWriter(new MemoryStream());
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore().ConfigureAwait(false);
Dispose(false);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;
_jsonWriter?.Dispose();
_jsonWriter = null;
}
protected virtual async ValueTask DisposeAsyncCore()
{
if (_jsonWriter is not null)
await _jsonWriter.DisposeAsync().ConfigureAwait(false);
_jsonWriter = null;
}
~AsyncDisposableExample()
{
// 终结器只负责释放非托管资源
Dispose(false);
}
}
【示例2】如果一个自定义类包裹了一个实现了IAsyncDisposable和IDisposable接口的类型,又包裹了只实现了IDisposable接口的类型,那么要确保在自定义类中正确地级联清理这些调用,示例如下:
using System;
using System.IO;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static async Task Main()
{
// 主动调用Dispose方法释放资源
Test();
// 主动调用DisposeAsync方法释放资源
await TestAsync();
// 语法糖:使用using隐式调用Dispose方法释放资源
using var testInstance = new ExampleConjunctiveDisposableUsing();
// 语法糖: 使用await using隐式调用DisposeAsync方法释放资源
await using var testInstanceAsync = new ExampleConjunctiveDisposableUsing();
}
private static void Test()
{
var testInstance = new ExampleConjunctiveDisposableUsing();
testInstance.Dispose();
}
private static async ValueTask TestAsync()
{
var asyncTestInstance = new ExampleConjunctiveDisposableUsing();
await asyncTestInstance.DisposeAsync();
}
}
internal class ExampleConjunctiveDisposableUsing : IAsyncDisposable, IDisposable
{
private IDisposable? _disposableResource;
private IAsyncDisposable? _asyncDisposableResource;
public ExampleConjunctiveDisposableUsing()
: this(new MemoryStream(), new MemoryStream())
{
}
public ExampleConjunctiveDisposableUsing(IDisposable? disposableResource, IAsyncDisposable? asyncDisposableResource)
{
_disposableResource = disposableResource;
_asyncDisposableResource = asyncDisposableResource;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposing) return;
_disposableResource?.Dispose();
(_asyncDisposableResource as IDisposable)?.Dispose();
_disposableResource = null;
_asyncDisposableResource = null;
}
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore().ConfigureAwait(false);
// 调用Dispose(bool disposing)释放非托管资源
Dispose(false);
GC.SuppressFinalize(this);
}
protected virtual async ValueTask DisposeAsyncCore()
{
if (_asyncDisposableResource is not null)
await _asyncDisposableResource.DisposeAsync().ConfigureAwait(false);
if (_disposableResource is IAsyncDisposable disposable)
await disposable.DisposeAsync().ConfigureAwait(false);
else _disposableResource?.Dispose();
_disposableResource = null;
_asyncDisposableResource = null;
}
~ExampleConjunctiveDisposableUsing()
{
Dispose(false);
}
}
18.3.4d ConfigureAwait的原理和使用
在18.3.4c的示例中使用了ConfigureAwait方法,这一小结会从同步上下文(SynchronizationContext)开始讲起,然后过渡到ConfigureAwait方法。
18.3.4d1 同步上下文(SynchronizationContext)
当前任务所在的同步上下文以及后续任务的同步上下文很重要,对任务的等待需要参考同步上下文中的信息才能高效且安全地执行。有如下示例:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static void Main()
{
DisplayStatus("Before");
var taskA = Task.Run(() => DisplayStatus("Starting..."))
.ContinueWith(_ => DisplayStatus("Continuing A..."));
var taskB = taskA.ContinueWith(_ => DisplayStatus("Continuing B..."));
var taskC = taskA.ContinueWith(_ => DisplayStatus("Continuing C..."));
Task.WaitAll(taskB, taskC);
DisplayStatus("Finished! ");
}
private static void DisplayStatus(string message)
{
var currentSynchronizationContext = SynchronizationContext.Current;
var tempString = currentSynchronizationContext is null ? "null" : currentSynchronizationContext.ToString()!;
Console.WriteLine($@"{Environment.CurrentManagedThreadId}: {message} ({tempString})");
}
}
在这种简单的控制台应用程序中,同步上下文默认为null,说明线程完全由线程池自动分配(线程无关的应用程序)。简而言之,同步上下文的作用就是确保委托在正确的位置、时间和线程上执行,如果不需要指定线程,那么交给线程池来决策就好。但如果要指定某一委托在某一线程上运行就要用到同步上下文了。
例如,WinForm有一个派生自SynchronizationContext的类,重写了Post
方法,内部执行Control.BeginInvoke
,这样,调用该Post
方法就会在该控件的UI线程上执行接收的委托。WinForm依赖Win32的消息处理机制,并在UI线程上运行“消息循环”,该线程就是简单的等待新消息到达,然后去处理。这些消息可能是鼠标移动和点击、键盘输入、系统事件、可供调用的委托等。所以,只需要将委托传递给SynchronizationContext
实例的Post
方法,就可以在控件的UI线程中执行。
和WinForm一样,WPF也有一个派生自SynchronizationContext的类,重写了Post
方法,通过Dispatcher.BeginInvoke
将接收的委托封送到UI线程。与WinForm通过控件管理不同的是,WPF是由Dispatcher管理的。
Windows运行时(WinRT)也不例外,它有一个派生自SynchronizationContext的类,重写了Post
方法,通过CoreDispatcher
将接收的委托排队送到UI线程。
当然,不仅仅“在UI线程中执行该委托”这一种用法,任何人都可以重写SynchronizationContext的Post或Send方法做任何事。例如,我可能不会关心委托在哪个线程上执行,但是我想确保任何在我自定义的SynchronizationContext实例中执行的任何委托都可以在一定的并发程度下执行。那么,我可以实现这样一个自定义类:
internal sealed class MaxConcurrencySynchronizationContext : SynchronizationContext
{
private readonly SemaphoreSlim _semaphoreSlim;
public MaxConcurrencySynchronizationContext(int maxConcurrencyLevel)
{
_semaphoreSlim = new SemaphoreSlim(initialCount: 0, maxCount: maxConcurrencyLevel);
}
// 异步方法
public override void Post(SendOrPostCallback d, object? state)
{
// 等待可用信号量,如果有可用信号量就执行ContinueWith中的方法
_semaphoreSlim.WaitAsync().ContinueWith(_ =>
{
// Try-finally语句块的作用
// 1. 回调方法中的任何异常都会向上传递
// 2. 回调方法执行成功与否均不会影响信号量的释放
try
{
// 回调方法d
// 传递给回调方法的参数state
d(state);
}
finally
{
// 当前任务执行完成后释放一个信号量
_semaphoreSlim.Release();
}
}, default, TaskContinuationOptions.None, TaskScheduler.Default);
}
// 同步方法
public override void Send(SendOrPostCallback d, object? state)
{
_semaphoreSlim.Wait();
try
{
d(state);
}
finally
{
_semaphoreSlim.Release();
}
}
}
与抽象的优点一样:它提供了一个API,可用于将委托排队进行处理,无需了解该实现的细节,这是实现者所期望的。所以,如果我正在编写一个库,想要停下来做一些工作,然后将委托排队送回“原始上下文”继续执行,那么我只需要获取他们的SynchronizationContext
,存下来。当完成工作后,在该上下文上调用Post
去传递我想要调用的委托即可。我不需在WinForm中知道要获取一个控件并调用BeginInvoke
,不需要在WPF中知道要对Dispatcher
进行BeginInvoke
,也不需要在xunit中知道要以某种方式获取其上下文并排队,只需要获取当前的SynchronizationContext
并在以后使用它就可以了。为此,借助SynchronizationContext
提供的Current
属性。如果想要设置自定义的同步上下文类为当前线程的同步上下文则需要使用:
SynchronizationContext.SetSynchronizationContext(customSc);
其中customSc为SynchronizationContext
类的派生类,示例如下:
public void DoWork(Action worker, Action completion)
{
// 设置MaxConcurrencySynchronizationContext为当前线程的同步上下文
var customSc = new MaxConcurrencySynchronizationContext();
SynchronizationContext.SetSynchronizationContext(customSc);
// 获取当前线程的同步上下文
var sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
worker();
}
finally
{
sc.Post(_ => completion(), null);
}
});
}
注:在18.3.7还会再提到同步上下文(synchronization context)这一概念
18.3.4d2 TaskScheduler
SynchronizationContext
是对“调度程序(scheduler)”的通用抽象。个别框架会有自己的抽象调度程序,比如System.Threading.Tasks
。当Tasks通过委托的形式进行排队和执行时,会用到System.Threading.Tasks.TaskScheduler
。和SynchronizationContext
提供了一个virtual Post
方法用于将委托排队调用一样,TaskScheduler
也提供了一个abstract QueueTask
方法。
通过TaskScheduler.Default
我们可以获取到Task
默认的调度程序ThreadPoolTaskScheduler
。并且可以通过继承TaskScheduler
来重写相关方法来实现在任意时间任意地点进行Task调用。
例如,核心库中有个类,名为System.Threading.Tasks.ConcurrentExclusiveSchedulerPair
,其实例公开了两个TaskScheduler
属性,一个叫ExclusiveScheduler
,另一个叫ConcurrentScheduler
。调度给ConcurrentScheduler
的任务可以并发,但是要在构造ConcurrentExclusiveSchedulerPair
时就要指定最大并发数(类似于前面演示的MaxConcurrencySynchronizationContext
);相反,在ExclusiveScheduler
执行任务时,那么将只允许运行一个排他任务(这个行为很像读写锁)。
和SynchronizationContext
一样,TaskScheduler
也有一个Current
属性,会返回当前调度程序。不过,和SynchronizationContext
不同的是,它没有设置当前调度程序的方法,而是在启动Task时就要提供,因为当前调度程序是与当前运行的Task相关联的。所以,下方的示例程序会输出“True
”,这是因为和StartNew
一起使用的lambda表达式是在ConcurrentExclusiveSchedulerPair
的ExclusiveScheduler
上执行的(我们手动指定ConcurrentExclusiveSchedulerPair.ExclusiveScheduler),并且TaskScheduler.Current
也会指向该ExclusiveScheduler
:
using System;
using System.Threading.Tasks;
internal static class Program
{
private static void Main()
{
var cesp = new ConcurrentExclusiveSchedulerPair();
Task.Factory.StartNew(() =>
{
Console.WriteLine(TaskScheduler.Current == cesp.ExclusiveScheduler);
}, default, TaskCreationOptions.None, cesp.ExclusiveScheduler)
.Wait();
}
}
TaskScheduler
提供了一个静态的FromCurrentSynchronizationContext
方法,该方法会创建一个SynchronizationContextTaskScheduler
实例并返回,以便在原始的SynchronizationContext.Current
上的Post
方法对任务进行排队执行。
18.3.4d3 SynchronizationContext和TaskScheduler是如何与await关联起来的
假设有一个UI App,它有一个按钮。当点击按钮后,会从网上下载一些文本并将其设置为按钮的内容。我们应当只在UI线程中访问该按钮,因此当我们成功下载新的文本后,我们需要从拥有按钮控制权的的线程中将其设置为按钮的内容。如果不这样做的话,会得到一个这样的异常:
System.InvalidOperationException: 'The calling thread cannot access this object because a different thread owns it.'
如果我们自己手动实现,那么可以使用前面所述的同步上下文将按钮内容的设置传回原始线程,可以借助TaskScheduler
:
private static readonly HttpClient _httpClient = new HttpClient();
private void downloadBtn_Click(object sender, RoutedEventArgs e)
{
_httpClient.GetStringAsync("http://example.com/currenttime").ContinueWith(downloadTask =>
{
downloadBtn.Content = downloadTask.Result;
}, TaskScheduler.FromCurrentSynchronizationContext());
}
或直接使用SynchronizationContext
:
private static readonly HttpClient _httpClient = new HttpClient();
private void downloadBtn_Click(object sender, RoutedEventArgs e)
{
SynchronizationContext sc = SynchronizationContext.Current;
_httpClient.GetStringAsync("http://example.com/currenttime").ContinueWith(downloadTask =>
{
// Post(SendOrPostCallback d, object? state)
sc.Post(delegate
{
downloadBtn.Content = downloadTask.Result;
}, null);
});
}
不过,这两种方式都需要显式指定回调,更好的方式是通过async/await
自然地进行编码:
private static readonly HttpClient _httpClient = new HttpClient();
private async void downloadBtn_Click(object sender, RoutedEventArgs e)
{
string text = await _httpClient.GetStringAsync("http://example.com/currenttime");
downloadBtn.Content = text;
}
就这样,成功在UI线程上设置了按钮的内容,与上面手动实现的版本一样,await Task
默认会关注SynchronizationContext.Current
和TaskScheduler.Current
两个参数。当你在C#中使用await
时,编译器会进行代码转换,通过调用GetAwaiter
,向"可等待对象"(这里为Task
)索要"awaiter"(这里为TaskAwaiter<string>
)。这个awaiter负责挂接回调(也称为continuation),当等待的对象完成时,该回调将被封送到状态机,并使用在注册回调时捕获的上下文或调度程序来执行此回调。尽管与实际代码不完全相同(实际代码还进行了其他优化和调整),但大体上是这样的:
object scheduler = SynchronizationContext.Current;
if (scheduler is null && TaskScheduler.Current != TaskScheduler.Default)
{
scheduler = TaskScheduler.Current;
}
也就是说,它先检查有没有设置当前SynchronizationContext
,如果没有设置同步上下文,则再判断当前调度程序是否为默认的TaskScheduler
。如果也不是默认的线程池回调,那么当准备好调用回调时,会使用自定义的调度程序执行回调。如果上述条件均不满足,则使用默认的线程池回调。即:如果有当前上下文,则使用当前上下文执行回调;如果当前上下文为空,且使用的是默认调度程序ThreadPoolTaskScheduler
,则会启用线程池线程执行回调(就像18.3.4d1中的第一个示例一样)。
18.3.4d4 ConfigureAwait(false)做了什么
ConfigureAwait
方法并没有什么特别的,它仅仅是一个返回了名为ConfiguredTaskAwaitable
结构体的普通方法,该结构体包装了调用它的原始任务以及调用者指定的布尔值。当编译器访问实例的GetAwaiter方法时,它是根据ConfigureAwait返回的类型进行操作的,而不是直接使用Task,此外,还提供了一个钩子(hook),用于更改await的行为。具体来说,如果等待ConfigureAwait(continueOnCapturedContext:false)
返回的类型ConfiguredTaskAwaitable
,而非直接等待Task
会让18.3.4d3中的最后一个示例发生如下变化:
object scheduler = null;
if (continueOnCapturedContext)
{
scheduler = SynchronizationContext.Current;
if (scheduler is null && TaskScheduler.Current != TaskScheduler.Default)
{
scheduler = TaskScheduler.Current;
}
}
也就是说,通过指定参数为false,即使有当前上下文或调度程序用于回调,它也会假装没有。也可以说,它强制让当前程序使用线程池线程执行回调。
18.3.4d5 为什么要使用ConfigureAwait(false)
ConfigureAwait(continueOnCapturedContext: false)用于避免强制在原始上下文或调度程序中进行回调,有以下好处:
- 提升性能
- 比起直接调用,排队进行回调会更加耗费性能,一个是因为会有一些额外的工作(一般是额外的内存分配),另一个是因为无法使用我们本来希望在运行时中采用的某些优化(当我们确切知道回调将如何调用时,我们可以进行更多优化,但如果将其移交给抽象的任意实现,则有时会受到限制)。对于大多数情况,即使检查当前的
SynchronizationContext
和TaskScheduler
也可能会增加一定的开销(两者都会访问线程静态变量)。如果await
之后的代码并不需要在原始上下文中运行,那么使用ConfigureAwait(false)
就可以避免上述花销:它不用排队,且可以利用所有可以进行的优化,还可以避免不必要的线程静态访问。
- 比起直接调用,排队进行回调会更加耗费性能,一个是因为会有一些额外的工作(一般是额外的内存分配),另一个是因为无法使用我们本来希望在运行时中采用的某些优化(当我们确切知道回调将如何调用时,我们可以进行更多优化,但如果将其移交给抽象的任意实现,则有时会受到限制)。对于大多数情况,即使检查当前的
- 避免死锁
- 假如有一个方法,使用
await
等待网络下载结果,你需要通过同步阻塞的方式调用该方法等待其完成,比如使用.Wait()
、.Result
或.GetAwaiter().GetResult()
。如果限制当前同步上下文的并发数为1,则会直接锁死线程。 - 在默认情况下, 等待
Task
会捕获当前SynchronizationContext
,所以,当网络下载完成时,它会将回调排队返回到SynchronizationContext
中执行剩下的操作。但是,当前唯一可以处理排队回调的线程却还阻塞着等待操作完成,不幸的是,在回调处理完毕之前,该操作永远不会完成。结果是:这一个线程永远阻塞。 - 即使不将上下文并发数限制为1,而是通过其他任何方式对资源进行了限制,结果也是如此。比如,我们将
MaxConcurrencySynchronizationContext
限制为4,这时,我们对该上下文进行4次排队调用,每个调用都会进行阻塞等待操作完成。现在,我们在等待异步方法完成时仍阻塞了所有资源,这些异步方法能否完成取决于是否可以在已经完全消耗掉的上下文中处理它们的回调。由于没有多余的线程能处理回调,结果是:这4个线程永远阻塞。 - 如果该方法改为使用
ConfigureAwait(false)
,那么它就不会将回调排队送回原始上下文,而是交给线程池处理,线程池会自动分配线程处理回调,这样就不会死锁了。
- 假如有一个方法,使用
18.3.4d6 什么时候应该使用ConfigureAwait(false)
指导原则1
编写应用程序级代码,不要轻易使用ConfigureAwait(false)
。 如果应用模型或环境(例如WinForm,WPF,ASP.NET Core等)具有自定义的同步上下文,那么这些同步上下文一定有其作用,就像18.3.4d3中倒数第二个示例一样。代码downloadBtn.Content = text
需要在原始上下文中执行,但如果代码违反了该准则,在错误的情况下使用了ConfigureAwait(false)
,例如:
private static readonly HttpClient _httpClient = new HttpClient();
private async void downloadBtn_Click(object sender, RoutedEventArgs e)
{
// 非UI线程处理了回调方法
string text = await _httpClient.GetStringAsync("http://example.com/currenttime").ConfigureAwait(false);
downloadBtn.Content = text;
}
这导致了下列错误:
System.InvalidOperationException: 'The calling thread cannot access this object because a different thread owns it.'
这是因为await _httpClient.GetStringAsync
之后,其结果返回到了其他线程中然后在其他线程中执行了downloadBtn.Content = text
,而只有UI线程才能访问downloadBtn.Content,其他线程无权更改对应的内容,这样的回调必然失败。
指导原则2
如果要编写通用库或与应用程序模型无关的代码,请使用ConfigureAwait(false)
。通用库不关心使用它们的环境,那么,无关就意味着它不会做任何需要以特定方式与应用程序模型进行交互的事情,例如:它不会访问UI控件,因为通用库对UI控件一无所知。由于我们不需要在任何特定环境中运行代码,也就不用将回调方法强制送回到原始上下文,通过使用ConfigureAwait(false)
会提升程序性能并增强代码的可靠性。这就解释了为什么ConfigureAwait(false)
在.NET Core运行时库中随处可见。
18.3.4d7 ConfigureAwait(false)不能保证回调不会在原始上下文中运行
前面说过,ConfigureAwait(false)不会将回调送回原始上下文中处理,但这并不意味着await task.ConfigureAwait(false)
后的代码仍不会在原始上下文中运行。因为当等待已经完成的可等待对象时(参考ValueTask:异步方法同步完成),后续代码将会保持同步运行,而无需强制排队等待。所以,如果您等待的任务在等待时就已经完成了,那么无论是否使用了ConfigureAwait(false)
,紧随其后的代码也会在拥有当前上下文的当前线程上继续执行。
18.3.4d8 使用了GetAwaiter().GetResult()之后就不再需要ConfigureAwait(false)了
- 具体来说,awaiter模式要求awaiters公开
IsCompleted
属性、GetResult
方法和OnCompleted/UnsafeOnCompleted
方法。而ConfigureAwait
只会影响OnCompleted/UnsafeOnCompleted
的行为。 - 因此,如果只是直接调用等待者的
GetResult()
方法,那么无论是在TaskAwaiter
上还是在ConfiguredTaskAwaitable.ConfiguredTaskAwaiter
上进行操作,都没有任何区别。 - 综上所述,如果在代码中看到
task.ConfigureAwait(false).GetAwaiter().GetResult()
,则可以将其替换为task.GetAwaiter().GetResult()
。
18.3.4e Semaphore 和 SemaphoreSlim
在18.3.4d1的第二个示例中使用了SemaphoreSlim,它是对可同时访问资源或资源池的线程数加以限制的Semaphore的轻量替代,Semaphore被称为信号量,它限制可同时访问某一资源或资源池的线程数。
18.3.4e1 Semaphore
每次线程进入信号量时,信号量上的计数都会递减,并在线程释放信号量时递增。 当计数为零时,后续请求会阻止,直到其他线程释放信号量。 当所有线程都释放了信号量时,计数处于创建信号量时指定的最大值。
下面的代码示例创建一个Semaphore类型,最大计数为3,初始计数为0。该示例启动五个线程,所有这些任务都阻塞并等待信号量 。主线程使用Release(Int32)方法将信号量计数增加到最大值,允许三个线程进入信号量。每个线程都使用Thread.Sleep该方法等待一秒,以模拟工作,然后调用Release()方法来释放信号量。控制台消息跟踪信号量的使用情况。每个线程的模拟工作间隔略有增加,以使输出更易于读取。
using System;
using System.Threading;
namespace ConsoleApp1;
internal static class Program
{
private static int _padding;
private static Semaphore? _semaphore;
private static void Main()
{
_semaphore = new Semaphore(0, 3);
for (var i = 1; i <= 5; i++)
{
var t = new Thread(Worker);
t.Start(i);
}
Thread.Sleep(500);
Console.WriteLine("Main thread calls Release(3)");
_semaphore.Release(3);
Console.WriteLine("Main thread exits.");
}
private static void Worker(object? num)
{
Console.WriteLine($"Thread {num} begins and waits for the semaphore.");
_semaphore?.WaitOne();
var padding = Interlocked.Add(ref _padding, 100);
Console.WriteLine($"Thread {num} enters the semaphore.");
Thread.Sleep(1000 + padding);
Console.WriteLine($"Thread {num} releases the semaphore.");
_semaphore?.Release();
}
}
18.3.4e2 SemaphoreSlim
以下示例创建一个SemaphoreSlim类型,最大计数为2,初始计数为0。该示例启动5个任务,所有这些任务都阻塞并等待信号量。主线程调用Release(Int32)重载以将信号量计数增加到其最大值,这允许2个任务同时运行。每个线程都使用Thread.Sleep该方法等待一秒,以模拟工作,然后调用Release()方法来释放信号量。控制台消息跟踪信号量的使用。每个线程的模拟工作间隔略有增加,以使输出更易于读取。
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal static class Program
{
private static int _padding;
private static void Main()
{
// 初始计数0 最大计数3
var semaphoreSlim = new SemaphoreSlim(0, 3);
Console.WriteLine($"{semaphoreSlim.CurrentCount} tasks can enter the semaphore.");
// 创建五个线程
var tasks = new Task[5];
for (var i = 0; i < 5; i++)
{
tasks[i] = Task.Run(() =>
{
Console.WriteLine($"Task {Task.CurrentId} begins and waits for the semaphore.");
// 等待信号量(如果当前可用信号量为0则阻塞)
semaphoreSlim.Wait();
try
{
// 以原子操作的形式添加两个整数,并用两者的和替换第一个整数
Interlocked.Add(ref _padding, 100);
Console.WriteLine($"Task {Task.CurrentId} enters the semaphore.");
// 让下一个线程阻塞时间比当前线程略长
Thread.Sleep(1000 + _padding);
}
finally
{
// 任务运行完成,释放一个信号量
semaphoreSlim.Release();
Console.WriteLine(
$"Task {Task.CurrentId} releases the semaphore");
}
});
}
// 阻塞主线程,让所有Task任务进入阻塞并等待信号量的状态
Thread.Sleep(500);
// 释放2个信号量
semaphoreSlim.Release(2);
// 等待所有任务执行完成
Task.WaitAll(tasks);
Console.WriteLine("Main thread exits.");
}
}
SemaphoreSlim是一个轻量、快速的信号量,可在等待时间预计很短的情况下用于在单个进程内等待。SemaphoreSlim 尽可能多地依赖公共语言运行时 (CLR) 提供的同步基元。但是,它还提供延迟初始化、基于内核的等待句柄,作为在多个信号量上进行等待的必要支持。SemaphoreSlim 也支持使用取消标记,但不支持命名信号量或使用用于同步的等待句柄。除非不可替代,否则应当使用SemaphoreSlim而不是Semaphore。
18.3.5 返回void的异步方法(async void)
title: 指导原则
尽量避免使用async void方法,除非它们是事件处理程序的订阅者,这些事件处理程序还应该避免抛出异常。
在通常情况下应当避免使用async void方法,但在编写异步事件处理程序时,为了让异步处理程序与EventHandler<T>
的签名匹配,不得不使用async void方法。其中EventHandler<T>
的签名如下:
void EventHandler<TEventArgs>(object sender, TEventArgs e);
依据指导原则,事件处理程序应该避免抛出异常,但如果事件处理程序抛出异常了又该怎么处理呢?这时就要提供一个同步上下文来接收同步事件通知,如启动任务(Task.Run)以及处理所有未处理的异常。下面的示例演示了如何利用同步上下文捕获async void方法中的异常:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1;
internal class AsyncVoidSynchronizationContext : SynchronizationContext
{
public Exception? Exception { get; private set; }
// 表示线程同步事件,收到信号时,必须手动重置该事件。
public ManualResetEventSlim ManualResetEventSlim { get; set; } = new ManualResetEventSlim();
// 同步方法
public override void Send(SendOrPostCallback callback, object? state)
{
try
{
Console.WriteLine($@"Send notification invoked... (Thread ID: {Environment.CurrentManagedThreadId})");
callback(state);
}
// 捕获异常并发出线程同步的通知
catch (Exception exception)
{
Exception = exception;
ManualResetEventSlim.Set();
}
}
// 异步方法
public override void Post(SendOrPostCallback callback, object? state)
{
try
{
Console.WriteLine($@"Post notification invoked... (Thread ID: {Environment.CurrentManagedThreadId})");
callback(state);
}
// 捕获异常并发出线程同步的通知
catch (Exception exception)
{
Exception = exception;
ManualResetEventSlim.Set();
}
}
}
internal static class Program
{
private static void Main()
{
var originalSynchronizationContext = SynchronizationContext.Current;
// 在同步上下文中处理异步事件处理程序抛出的异常
try
{
// 设置新的同步上下文
var newSynchronizationContext = new AsyncVoidSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(newSynchronizationContext);
// 发出OnEvent事件
OnEvent(typeof(Program), EventArgs.Empty);
//等待线程同步
newSynchronizationContext.ManualResetEventSlim.Wait();
// 没出现异常就直接返回
if (newSynchronizationContext.Exception is null) return;
// 重置线程同步信号
newSynchronizationContext.ManualResetEventSlim.Reset();
Console.WriteLine($@"Throwing expected exception...(Thread ID: {Environment.CurrentManagedThreadId})");
// 重新抛出异常
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(newSynchronizationContext.Exception).Throw();
}
catch (Exception exception)
{
Console.WriteLine(
$@"{exception.Message} throw as expected. (Thread ID: {Environment.CurrentManagedThreadId})");
}
finally
{
// 恢复主线程的同步上下文
SynchronizationContext.SetSynchronizationContext(originalSynchronizationContext);
}
}
// 抛出异常的异步事件处理程序
private static async void OnEvent(object sender, EventArgs eventArgs)
{
Console.WriteLine($@"Invoking Task.Run... (Thread ID: {Environment.CurrentManagedThreadId})");
await Task.Run(() =>
{
Console.WriteLine($@"Running task... (Thread ID: {Environment.CurrentManagedThreadId})");
throw new Exception("Expected Exception");
});
}
}
运行结果如下:
Invoking Task.Run... (Thread ID: 1)
Running task... (Thread ID: 6)
Post notification invoked... (Thread ID: 6)
Post notification invoked... (Thread ID: 6)
Throwing expected exception...(Thread ID: 1)
Expected Exception throw as expected. (Thread ID: 1)
问题:为什么自定义同步上下文中的Post方法被调用两次?
解答:对上述代码逐过程调试:上述代码起初同步执行,直到OnEvent方法(抛出异常的异步事件处理程序)中的await Task.Run开始运行。该异步任务完成后,将控制权转移给自定义同步上下文中的Post方法(Post方法中的 Console.WriteLine 第一次执行),之后Post方法中的callback回调方法回调自身(Console.WriteLine第二次执行),之后callback回调方法调用ExceptionDispatchInfo进行异常处理,异常处理完成后(只是将异常记录下来)返回到主线程中,在主线程中继续处理由同步上下文返回的异常。这也解释了为什么自定义同步上下文中的Post方法被调用两次。
18.3.6 ExecutionContext vs SynchronizationContext
18.3.6a 什么是ExecutionContext,流动它有什么意义?
对于绝大多数开发者来说,不需要关注ExecutionContext
。它的存在就像空气一样:虽然它很重要,但我们一般是不会关注它的,除非有必要(例如出现问题时)。ExecutionContext
本质上只是一个用于盛放其他上下文的容器。这些被盛放的上下文中有一些仅仅是辅助性的,而另一些则对于.NET的执行模型至关重要,不过它们都和ExecutionContext
一样:除非你不得不知道他们存在,或你正在做某些特别高级的事情,或者出了什么问题,否则你没必要关注它。
ExecutionContext
是与“环境”信息相关的,也就是说它会存储与你当前正在运行的环境或“上下文”相关的数据。在许多系统中,这类环境信息使用线程本地存储(TLS)来维护,例如ThreadStatic
标记的字段或ThreadLocal<T>
。在同步的世界里,这种线程本地信息就足够了:所有的一切都运行在该线程上,因此,无论你在该线程上使用什么栈帧、正在执行什么功能,等等,在该线程上运行的所有代码都可以查看并受该线程特定数据的影响。例如,ExecutionContext
盛放的一个上下文叫做SecurityContext
,它维护了诸如当前“principal”之类的信息以及有关代码访问安全性(CAS)拒绝和允许的信息。这类信息可以与当前线程相关联,这样的话,如果一个栈帧的访问被某个权限拒绝了然后调用另一个方法,那么该调用的方法仍会被拒绝:当尝试执行需要该权限的操作时,CLR会检查当前线程是否允许该操作,并且它也会找到调用者放入的数据。
当从同步世界过渡到异步世界时,事情就变得复杂了起来。突然之间,线程本地存储(TLS)变得无关紧要。在同步的世界里,如果我先执行操作A,然后再执行操作B,最后执行操作C,那么这三个操作都会在同一线程上执行,所以这三个操作都会受该线程上存储的环境数据的影响。但是在异步的世界里,我可能在一个线程上启动A,然后在另一个线程上完成它,这样操作B就可以在不同于A的线程上启动或运行,并且类似地C也可以在不同于B的线程上启动或运行。 这意味着我们用来控制执行细节的环境不再可行,因为TLS不会在这些异步点上“流动”。线程本地存储特定于某个线程,而这些异步操作并不与特定线程绑定。不过,我们希望有一个逻辑控制流,且环境数据可以与该控制流一起流动,以便环境数据可以从一个线程移动到另一个线程。这就是ExecutionContext
发挥的作用。
ExecutionContext
实际上只是一个状态包,可用于从一个线程捕获所有当前状态,然后在控制逻辑继续流动的同时将其还原到另一个线程。通过静态Capture
方法来捕获ExecutionContext
:
// 把环境状态捕捉到ec中
ExecutionContext ec = ExecutionContext.Capture();
在调用委托时,通过静态Run
方法将环境状态还原回来:
ExecutionContext.Run(ec, delegate
{
… // 此处的代码会将ec的状态视为环境
}, null);
.NET Framework中所有异步工作的方法都是以这种方式捕获和还原ExecutionContext
的(除了那些以“Unsafe”为前缀的方法,这些方法都是不安全的,因为它们显式的不流动ExecutionContext
)。例如,当你使用Task.Run
时,对Run
的调用会捕获调用线程的ExecutionContext
,并将该ExecutionContext
实例存储到Task
对象中。稍后,当传递给Task.Run
的委托作为该Task
执行的一部分被调用时,会通过调用ExecutionContext.Run
方法,使委托在刚才存储的上下文中执行。Task.Run
、ThreadPool.QueueUserWorkItem
、Delegate.BeginInvoke
、Stream.BeginRead
、DispatcherSynchronizationContext.Post
,以及你可以想到的任何其他异步API,都是这样的。它们全都会捕获ExecutionContext
,存储起来,然后在调用某些代码时使用它。
当我们讨论“流动ExecutionContext
”时,指的就是这个过程,即获取一个线程上的环境状态,然后在执行传递的委托时,将状态还原到执行线程上。
18.3.6b 什么是SynchronizationContext,捕获和使用它有什么意义?
在软件开发中,我们喜欢抽象。我们几乎不会愿意对特定的实现进行硬编码。因此,在编写大型系统时,我们更愿意将特定实现的细节抽象化,以便以后可以插入其他实现,而不必更改我们的大型系统。这就是我们有接口、抽象类,虚方法等的原因。
SynchronizationContext
只是一种抽象,代表你要执行某些操作的特定环境。举个例子,WinForm拥有UI线程(虽然可能有多个,但出于讨论目的,这并不重要),需要使用UI控件的任何操作都需要在上面执行。为了处理需要先在线程池线程上执行然后再封送回UI线程以便该操作可以与UI控件一起处理的情形,WinForm提供了Control.BeginInvoke
方法。你可以向控件的BeginInvoke
方法传递一个委托,该委托将在与该控件关联的线程上被调用。
因此,如果我正在编写一个需要在线程池线程执行一部分工作,然后在UI线程上再进行一部分工作的组件,那我可以使用Control.BeginInvoke
。但是,如果我现在要在WPF应用程序中使用我的组件该怎么办?WPF具有与WinForm相同的UI线程约束,但封送回UI线程的机制不同:不是通过Control.BeginInvoke
,而是在Dispatcher
实例上调用Dispatcher.BeginInvoke
(或InvokeAsync
)。
现在,我们有两个不同的API用于实现相同的基本操作,那么如何编写与UI框架无关的组件呢?当然是通过使用SynchronizationContext
。SynchronizationContext
提供了一个虚Post
方法,该方法只接收一个委托,并在任何地点,任何时间运行它,当然SynchronizationContext
的实现要认为是合适的。WinForm提供了WindowsFormSynchronizationContext
类,该类重写了Post
方法来调用Control.BeginInvoke
。WPF提供了DispatcherSynchronizationContext
类,该类重写Post
方法来调用Dispatcher.BeginInvoke
,等等。这样,我现在可以在组件中使用SynchronizationContext
,而不需要将其绑定到特定框架了。
如果我要专门针对WinForm编写组件,则可以像这样来实现。先进入线程池,然后返回到UI线程的逻辑:
public static void DoWork(Control c)
{
ThreadPool.QueueUserWorkItem(delegate
{
… // 在线程池中执行
c.BeginInvoke(delegate
{
… // 在UI线程中执行
});
});
}
然而上面的这个组件在WPF上无法工作, 如果我把组件改成使用SynchronizationContext
,就可以这样写:
public static void DoWork(SynchronizationContext sc)
{
ThreadPool.QueueUserWorkItem(delegate
{
… // 在线程池中执行
sc.Post(delegate
{
… // 在UI线程中执行
}, null);
});
}
这样这一组件可以既可以在WinForm上工作也可以在WPF上工作,如果以后有了新的框架并且这个框架具有合适的同步上下文,那么这个组件还可以在新的框架上工作,做到了组件与框架的解耦。
上面的编程模型存在一个较大的问题,对于某些所需的编程模型而言,需要通过函数形参传递同步上下文是无法容忍的,况且这种做法有时候相当烦人。因此,SynchronizationContext
提供了Current
属性,该属性使你可以从当前线程中寻找上下文,如果存在的话,它会把你返回到该环境。换句话说,你可以从SynchronizationContext.Current
中读取引用,并存储该引用以供以后使用。示例如下:
public static void DoWork()
{
var sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(delegate
{
… // 在线程池中执行
sc.Post(delegate
{
… // 在原始上下文中执行
}, null);
});
}
18.3.6c 流动ExecutionContext vs 使用SynchronizationContext
现在,我们有一个非常重要的发现:流动ExecutionContext
(将ExecutionContext
传递到新的执行线程)在语义上与捕获SynchronizationContext
,然后Post,不完全相同。当流动ExecutionContext
时,你是从一个线程中捕获状态,然后在提供的委托执行期间将该状态恢复回来,但是捕获并使用SynchronizationContext
时,不会出现这种情况。
它们的相同点在于,均捕获了与“环境”信息相关(当前正在运行的环境或“上下文”相关的数据)。不同点在于后续使用这种捕获信息的方式不同。不同之处在于:
SynchronizationContext
是通过SynchronizationContext.Post
来使用捕获的状态调用委托,而不是在委托调用期间将状态恢复为当前状态。该委托在何时何地以及如何运行完全取决于Post
方法的实现。ExecutionContext
将委托连带着捕获的信息传递给新线程,然后在新线程中利用捕获的信息将状态还原。
18.3.6d 这是如何运用于async/await的?
async
和await
关键字背后的框架支持自动与ExecutionContext
和SynchronizationContext
交互。每当代码等待一个awaiter,awaiter说它尚未完成(例如awaiter.IsCompleted
返回false
)时,该方法需要暂停,然后通过awaiter的延续(Continuation)来恢复,这是我之前提到的异步点之一。因此,ExecutionContext
需要从发出等待的代码一直流动到延续委托的执行,这会由框架自动处理。当异步方法即将挂起时,基础架构会捕获ExecutionContext
。传递给awaiter的委托会拥有该ExecutionContext
实例的引用,并在恢复该方法时使用它。这就是使ExecutionContext
表示的重要“环境”信息跨等待流动的原因。框架对ExecutionContext
的支持内置于表示异步方法的“构建器”中(例如System.Runtime.CompilerServices.AsyncTaskMethodBuilder
),并且不管使用哪种等待方式,这些构建器均可确保ExecutionContext
跨等待点流动。对SynchronizationContext
的支持也内置在了Task
和Task <TResult>
中。自定义awaiter可以自己添加类似的逻辑,但不会自动获取。这是设计使然,因为自定义何时以及后续如何调用是自定义awaiter使用的原因之一。
默认情况下,当你等待Task时,awaiter将捕获当前的SynchronizationContext
,当Task完成时,会将提供的延续(Continuation)委托封送到该上下文去执行,而不是在任务完成的线程上,或在ThreadPool
上执行该委托。如果开发人员不希望这种封送行为,则可以通过更改使用的awaiter来进行控制。虽然在等待Task
或Task <TResult>
时始终会采用这种行为,但你可以改为等待task.ConfigureAwait(…)
。ConfigureAwait
方法返回一个awaitable,它可以阻止默认的封送处理行为。是否阻止由传递给ConfigureAwait
方法的布尔值控制。如果continueOnCapturedContext为true
,就是默认行为;否则,如果为false
,那么awaiter不会检查SynchronizationContext
,假装好像没有一样。注意,等待完成的Task完成后,无论ConfigureAwait
如何,运行时(runtime)可能会检查正在恢复的线程上的当前上下文,以确定是否可以在此处同步运行延续,或必须从那时开始异步安排延续。
注意,尽管ConfigureAwait
提供了显式的与等待相关的编程模型支持, 让开发者比较容易地修改或自定义与SynchronizationContext
相关的行为,但并没有提供任何用于阻止ExecutionContext
流动的与等待相关的编程模型支持。
这同样是设计使然,开发者在编写异步代码时无需关注ExecutionContext
,这是基础架构关心的事情。基础架构为ExecutionContext
提供了必要的支持,帮助开发者在异步环境中模拟同步语义,即:在异步环境中模拟线程本地存储(TLS)。其实大多数人可以并且应该完全忽略ExecutionContext
的存在,除非明确知道自己在做什么。事实上,开发者在编写异步代码时,通常关心的是代码在哪里运行,异步任务运行完成后要不要返回到开启异步任务的那个线程(比如:要不要返回到UI线程继续执行后续操作),这就让同步上下文显得尤为重要,也让同步上下文得到编程模型的显式支持。当然,类库开发者编写的代码大多数情况下是线程无关的,同步上下文对于他们来说也不重要,这些开发者应该考虑在每次Task等待时使用ConfigureAwait(false)
绕过同步上下文。
18.3.6e SynchronizationContext不是ExecutionContext的一部分吗?
实际上,ExecutionContext
能够流动的所有上下文,例如:
SecurityContext
HostExecutionContext
CallContext
SynchronizationContext
- ……………………
是的,SynchronizationContext
也是其中之一。当你调用公共的ExecutionContext.Capture()
方法时,该方法将检查当前的SynchronizationContext
,如果有,则将其存储到返回的ExecutionContext
实例中。然后,当你使用公共的ExecutionContext.Run
方法时,在执行提供的委托期间,捕获的SynchronizationContext
会被恢复为Current
。
这有什么问题?作为ExecutionContext
的一部分流动的SynchronizationContext
更改了SynchronizationContext.Current
的含义。SynchronizationContext.Current
应该可以使你返回到访问Current
时所处的环境,因此,如果SynchronizationContext
流到了另一个线程上成为Current
,那么你就无法信任SynchronizationContext.Current
的含义。在这种情况下,它可能用于返回到当前环境,也可能用于回到流中先前某个时刻所处的环境。举一个可能出现这种问题的例子,请参考以下代码:
private void button1_Click(object sender, EventArgs e)
{
button1.Text = await Task.Run(async delegate
{
string data = await DownloadAsync();
return Compute(data);
});
}
我的思维模式告诉我,这段代码会发生这种情况:用户单击button1,UI框架在UI线程上调用button1_Click。然后,代码启动一个在ThreadPool
上运行的操作(通过Task.Run)。该操作将开始一些下载工作,并异步等待其完成。然后,ThreadPool
上的延续操作会对下载的结果进行一些计算密集型操作,并返回结果,最终使正在UI线程上等待的Task完成。接着,UI线程会处理该button1_Click方法的其余部分,并将计算结果存储到button1的Text属性中。
如果SynchronizationContext
不会作为ExecutionContext
的一部分流动,那么这是我所期望的。但是,如果流动了,我会感到非常失望。Task.Run
会在调用时捕获ExecutionContext
,并使用它来执行传递给它的委托。这意味着调用Task.Run
时所处的UI线程的SynchronizationContext
将流入Task,并且在await DownloadAsync
时再次作为Current
流入。这意味着await将会找到UI的SynchronizationContext.Current
,并Post
该方法的剩余部分作为在UI线程上运行的延续。也就表示我的Compute
方法很可能会在UI线程上运行,而不是在ThreadPool
上运行,从而导致我的应用程序出现响应问题。即:由于同步上下文随着ExecutionContext
流动,在等待完成后,Task中后续的方法会回到UI线程中执行,这使得应用程序响应迟缓;开发者期待的是Task中的任务在其他线程全部执行完成后才返回到UI线程,执行UI线程中的后续方法。
然而,上面示例中的情况在实际中不会出现。前面说过,如果同步上下文不作为ExecutionContext
的一部分流动就不会出现这种情况,那同步上下文到底是“流动”还是不“流动”呢?
现在,这个故事有点混乱了:ExecutionContext
实际上有两个Capture
方法,但是只公开了一个。mscorlib公开的大多数异步功能所使用的是内部的(mscorlib内部的)Capture
方法,并且它可选地允许调用方阻止捕获SynchronizationContext
作为ExecutionContext
的一部分;对应于Run
方法的内部重载也支持忽略存储在ExecutionContext
中的SynchronizationContext
,实际上是假装没有被捕获(同样,这是mscorlib中大多数功能使用的重载)。这意味着几乎所有在mscorlib中的异步操作的核心实现都不会将SynchronizationContext
作为ExecutionContext
的一部分进行流动,但是在其他任何地方的任何异步操作的核心实现都将捕获SynchronizationContext
作为ExecutionContext
的一部分。我上面提到了,异步方法的“构建器”是负责在异步方法中流动ExecutionContext
的类型,这些构建器是存在于mscorlib中的,并且使用的确实是内部重载……(当然,这与task awaiter捕获SynchronizationContext
并将其Post回去是互不影响的)。为了处理ExecutionContext
确实流动了SynchronizationContext
的情况,异步方法基础结构会尝试忽略由于流动而设置为Current
的SynchronizationContexts
。、
简而言之,SynchronizationContext.Current
不会在等待点之间“流动”,上面示例中的问题事实上不会出现,毕竟Task.Run也在mscorlib中,ExecutionContext
中的同步上下文已经被忽略了,你放心好了。所以,现在就不用考虑SynchronizationContext
也在ExecutionContext
中了,把它们当作两个相互独立的上下文就好了。