首页 > 编程语言 >async_await 源码分析

async_await 源码分析

时间:2023-08-16 21:12:46浏览次数:39  
标签:__ Task await state 源码 awaiter async ref

async/await 源码解析

这篇文章主要是分析 async/await 这个语法糖,分析一下 async 和 await 是如何做到异步的。首先,我先抛出两个问题,各位可以先想一下。

  1. await 之后的方法是何时执行,如何执行的?
  2. 为什么 await 之后的代码会在不同的线程执行?

demo

要想知道 async/await 是怎么运行的,需要先写一个demo,然后进行一下反编译,就可以得到 async/await 编译后的代码,然后就可以开始分析了。
下面是简单使用 async/await 的demo:

static async Task Main(string[] args)
{
       Console.WriteLine("1"+Thread.CurrentThread.ManagedThreadId.ToString());
       await GetThreadID2();
       Console.WriteLine("2"+Thread.CurrentThread.ManagedThreadId.ToString());
}
public async static Task GetThreadID2()
{
       Console.WriteLine("3"+Thread.CurrentThread.ManagedThreadId.ToString());
       await Task.Run(() => Console.WriteLine("4"+Thread.CurrentThread.ManagedThreadId.ToString()));          
       Console.WriteLine("5"+Thread.CurrentThread.ManagedThreadId.ToString());
}

然后我们来反编译一下这段代码(可以使用 dnSpy 进行反编译):

namespace AsyncTest
{
    internal class Program
    {
        [DebuggerStepThrough]
        private static Task Main(string[] args)
        {
            Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();
            <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
            <Main>d__.args = args;
            <Main>d__.<>1__state = -1;
            <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
            return <Main>d__.<>t__builder.Task;
        }
        [DebuggerStepThrough]
        public static Task GetThreadID2()
        {
            Program.<GetThreadID2>d__2 <GetThreadID2>d__ = new Program.<GetThreadID2>d__2();
            <GetThreadID2>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
            <GetThreadID2>d__.<>1__state = -1;
            <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);
            return <GetThreadID2>d__.<>t__builder.Task;
        }

        [DebuggerStepThrough]
        private static void <Main>(string[] args)
        {
            Program.Main(args).GetAwaiter().GetResult();
        }
        [CompilerGenerated]
        private sealed class <Main>d__0 : IAsyncStateMachine
        {
            void IAsyncStateMachine.MoveNext()
            {
                int num = this.<>1__state;
                try
                {
                    TaskAwaiter awaiter;
                    if (num != 0)
                    {
                        Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());
                        awaiter = Program.GetThreadID2().GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            this.<>1__state = 0;
                            this.<>u__1 = awaiter;
                            Program.<Main>d__0 <Main>d__ = this;
                            this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
                            return;
                        }
                    }
                    else
                    {
                        awaiter = this.<>u__1;
                        this.<>u__1 = default(TaskAwaiter);
                        this.<>1__state = -1;
                    }
                    awaiter.GetResult();
                    Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
                    Console.ReadLine();
                }
                catch (Exception exception)
                {
                    this.<>1__state = -2;
                    this.<>t__builder.SetException(exception);
                    return;
                }
                this.<>1__state = -2;
                this.<>t__builder.SetResult();
            }
        }
        [CompilerGenerated]
        [Serializable]
        private sealed class <>c
        {
            internal void <GetThreadID2>b__2_0()
            {
                Console.WriteLine("4" + Thread.CurrentThread.ManagedThreadId.ToString());
            }
            public static readonly Program.<>c <>9 = new Program.<>c();

            public static Func<string> <>9__1_0;

            public static Action <>9__2_0;
        }
        [CompilerGenerated]
        private sealed class <GetThreadID2>d__2 : IAsyncStateMachine
        {
            void IAsyncStateMachine.MoveNext()
            {
                int num = this.<>1__state;
                try
                {
                    TaskAwaiter awaiter;
                    if (num != 0)
                    {
                        Console.WriteLine("3" + Thread.CurrentThread.ManagedThreadId.ToString());
                        awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__2_0)).GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            this.<>1__state = 0;
                            this.<>u__1 = awaiter;
                            Program.<GetThreadID2>d__2 <GetThreadID2>d__ = this;
                            this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);
                            return;
                        }
                    }
                    else
                    {
                        awaiter = this.<>u__1;
                        this.<>u__1 = default(TaskAwaiter);
                        this.<>1__state = -1;
                    }
                    awaiter.GetResult();
                    Console.WriteLine("5" + Thread.CurrentThread.ManagedThreadId.ToString());
                }
                catch (Exception exception)
                {
                    this.<>1__state = -2;
                    this.<>t__builder.SetException(exception);
                    return;
                }
                this.<>1__state = -2;
                this.<>t__builder.SetResult();
            }
            [DebuggerHidden]
            void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine){}

            public int <>1__state;

            public AsyncTaskMethodBuilder <>t__builder;

            private TaskAwaiter <>u__1;
        }
    }
}

上面的代码是反编译出来的所有代码,如果各位选择太长不看的话,这里还有个简化版本:

private static Task Main(string[] args) 
<Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__); 
<Main>d__0.MoveNext() 
    <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);
    <GetThreadID2>d__2.MoveNext()
    <GetThreadID2>d__2.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);
    <GetThreadID2>d__2.MoveNext()
<Main>d__0.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__); 
<Main>d__0.MoveNext() 

入口

我们先来看一下 Main 方法。

private static Task Main(string[] args)        
{
    Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();
    <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
    <Main>d__.args = args;
    <Main>d__.<>1__state = -1;
    <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
    return <Main>d__.<>t__builder.Task;
}

首先第一行,构建了一个 Program.<Main>d__0 类型实例,嗯? 这个类哪里来的,没写过呀。
哎嘿,这个类就是编译器帮我们实现的。每一个 async/await 方法,编译器都会帮我们实现一个这样的类。
简单分析一下这个方法,实例化一个 Program.<Main>d__0(), 初始化 1__state 值为-1,调用 Program.<Main>d__0()Start 方法。

Start

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    ExecutionContextSwitcher executionContextSwitcher = default(ExecutionContextSwitcher);
    RuntimeHelpers.PrepareConstrainedRegions();
    try
    {
        ExecutionContext.EstablishCopyOnWriteScope(ref executionContextSwitcher);
        stateMachine.MoveNext();
    }
    finally
    {
        executionContextSwitcher.Undo();
    }
}

Start 代码的主要作用就是启动 stateMachine,即 stateMachine.MoveNext()

MoveNext

MoveNext 是 async/await 里非常重要的一段代码,这个方法控制了整个异步任务的执行步骤。
我们可以将其分解成三个部分,即

  1. await 之前
  2. await 执行
  3. await 之后的代码
    具体的分析各位请看注释。
void IAsyncStateMachine.MoveNext()
{
    // 这里 <>1__state 之前初始化的时候是 -1,所以 num 就是-1 
    int num = this.<>1__state;
    try
    {
        TaskAwaiter awaiter;
        // 第一次进来,num 是 -1 所以接下来的逻辑
        if (num != 0)
        {
            // 这里执行 await 之前的代码
            Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());
            // 这里就是执行  await 方法
            awaiter = Program.GetThreadID2().GetAwaiter();
            // 判断是否执行完成,通常第一次进来 IsCompleted 都是 false
            if (!awaiter.IsCompleted)
            {
                // 修改状态,下次再执行 MoveNext 就不会继续走这段逻辑来了
                this.<>1__state = 0;
                this.<>u__1 = awaiter;
                Program.<Main>d__0 <Main>d__ = this;
                // 这里其实是往 Task 里添加了一个 任务结束时的回调,在任务结束时会再次调用 MoveNext  
                // 这就解释了为什么 await 之后的方法是另外一个线程,因为await 之后的方法是在 下一个 MoveNext 的里调用的
                // 而那个 MoveNext 是由线程池挑选一个线程进行执行的              
                this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
                return;
            }
        }
        else
        {
        awaiter = this.<>u__1;
            this.<>u__1 = default(TaskAwaiter);
            this.<>1__state = -1;
        }
        // GetResult 里有一个 死循环 等待结果
        awaiter.GetResult();
        // await 之后的方法 
        Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
        Console.ReadLine();
    }
    catch (Exception exception)
    {
        this.<>1__state = -2;
        this.<>t__builder.SetException(exception);
        return;
    }
    this.<>1__state = -2;
    this.<>t__builder.SetResult();
}

await 之前的代码

// 这里执行 await 之前的代码
Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());

await 之前的代码很简单,就是正常依次执行。

await 的代码

await 的代码大都以 Task 形式执行,主要分为两步。
第一步,将 Task 推入线程池中。

awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__3_0)).GetAwaiter();
public static Task Run(Action action)        
{            
    StackCrawlMark stackCrawlMark = StackCrawlMark.LookForMyCaller;            
    return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackCrawlMark);       
}
internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark)        
{                 
    Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);                      
    task.ScheduleAndStart(false);            
    return task;        
}
internal void ScheduleAndStart(bool needsProtection)
{
    try
    {
     this.m_taskScheduler.InternalQueueTask(this);
    }
    catch (ThreadAbortException exceptionObject){}
}
internal void InternalQueueTask(Task task)        
{                    
    this.QueueTask(task);       
}

第二步,线程池执行 Task。

void IThreadPoolWorkItem.ExecuteWorkItem()        
{            
    this.ExecuteEntry(false);        
}
internal bool ExecuteEntry(bool bPreventDoubleExecution)
{

    if (!this.IsCancellationRequested && !this.IsCanceled)
    {
        this.ExecuteWithThreadLocal(ref Task.t_currentTask);
    }
    return true;
}
private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
{
    Task task = currentTaskSlot;
    TplEtwProvider log = TplEtwProvider.Log;
    Guid currentThreadActivityId = default(Guid);
    bool flag = log.IsEnabled();
    try
    {
        currentTaskSlot = this;
        ExecutionContext capturedContext = this.CapturedContext;
        if (capturedContext == null)
        {
            this.Execute();
        }
        else
        {
            if (this.IsSelfReplicatingRoot || this.IsChildReplica)
            {
                this.CapturedContext = Task.CopyExecutionContext(capturedContext);
            }
            ContextCallback contextCallback = Task.s_ecCallback;
            if (contextCallback == null)
            {
                contextCallback = (Task.s_ecCallback = new ContextCallback(Task.ExecutionContextCallback));
            }
            // 这里是执行 Task 方法的地方
            ExecutionContext.Run(capturedContext, contextCallback, this, true);
        }
        this.Finish(true);
    }
    finally
    {
        currentTaskSlot = task;
    }
}

await 之后的代码

那么 await 之后的代码是如何执行的呢?
大家回头看一下 MoveNext 方法,可以看到首次执行 MoveNext 的时候,代码的逻辑是这样的

if(num!=0) -> if(!awaiter.IsCompleted) -> AwaitUnsafeOnCompleted;return;

关键就在于 AwaitUnsafeOnCompleted 这个方法,来看一下 AwaitUnsafeOnCompleted 方法。

public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
    try
    {
        AsyncMethodBuilderCore.MoveNextRunner runner = null;
        // 这里是准备一个 任务执行完的回调,简单理解成是将 MoveNext 包装成一个 action
        Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runner);
        if (this.m_coreState.m_stateMachine == null)
        {
            Task<TResult> task = this.Task;
            this.m_coreState.PostBoxInitialization(stateMachine, runner, task);
        }
                // 将 completionAction 推到 task里,具体看一下接下来的代码
        awaiter.UnsafeOnCompleted(completionAction);
    }
    catch (Exception exception)
    {
        AsyncMethodBuilderCore.ThrowAsync(exception, null);
    }
}
public void UnsafeOnCompleted(Action continuation)        
{            
    TaskAwaiter.OnCompletedInternal(this.m_task, continuation, true, false);        
}
internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext)
{
    task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref stackCrawlMark);
}
internal void SetContinuationForAwait(Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext, ref StackCrawlMark stackMark)
{
     if (!this.AddTaskContinuation(continuationAction, false))
     {
            AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
     }
}
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
    //这里就把completionAction 放到了 Task 的 m_continuationObject 里
    return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers));
}

至此为止,准备工作(将 MoveNext 放到 task 的回调里)已经做完了,接下来就是如何来触发这个方法。
简单分析一下,MoveNext 肯定是在 task 执行完触发,让我们回头看看 Task 执行的代码。会发现,哎嘿,在ExecutionContext.Run(capturedContext, contextCallback, this, true); 方法执行完后还有一句代码 this.Finish(true)

internal void Finish(bool bUserDelegateExecuted)
{
    this.FinishStageTwo();
}
internal void FinishStageTwo()
{
    this.FinishStageThree();
}
internal void FinishStageThree()
{
    this.FinishContinuations();
}
internal void FinishContinuations()
{
    // 获取之前注册的所有回调  最简单的就可以理解成 MoveNext
    object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
    if (obj != null)
    {
        bool flag = (this.m_stateFlags & 134217728) == 0 && Thread.CurrentThread.ThreadState != ThreadState.AbortRequested && (this.m_stateFlags & 64) == 0;
        Action action = obj as Action;
        if (action != null)
        {
            AwaitTaskContinuation.RunOrScheduleAction(action, flag, ref Task.t_currentTask);
            return;
        }
    }
}
internal static void RunOrScheduleAction(Action action, bool allowInlining, ref Task currentTask)
{
    // 这里调用了  MoveNextRunner.Run ,简单理解 就是  MoveNext 方法
    action();
}
void IAsyncStateMachine.MoveNext()
{    
    // 此时 num 就是 0 了,就会执行 await 之后的代码了。
    int num = this.<>1__state;
    try
    {
        TaskAwaiter awaiter;
        if (num != 0)
        {}
        else
        {
        awaiter = this.<>u__1;
            this.<>u__1 = default(TaskAwaiter);
            this.<>1__state = -1;
        }
        awaiter.GetResult();
        Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
        Console.ReadLine();
    }
    catch (Exception exception)
    {
    }
    this.<>1__state = -2;
    this.<>t__builder.SetResult();
}

这一段代码我删掉了很多内容,但大致的执行顺序就是如此,如果想要了解更多细节,可以查看 Task 这个类的源码。

总结

回头看看开头的两个问题,现在就可以回答了。

  1. await 之后的方法是何时执行,如何执行的?
    await 的方法在 Task 执行完成之后,通过调用 Finish 方法执行的。
    具体的执行步骤是先将 MoveNext 方法注册到 Task 的回调里,然后在 Task 执行完后调用这个方法。
  2. 为什么 await 之后的代码会在不同的线程执行?
    这个其实是因为 Task 的机制,Task 会被推到线程池里,由线程池挑选一个线程去执行,await 之后的代码其实是由这个线程去执行的,自然就跟 await 的之前的代码不是一个线程。

PS: 补一张图

标签:__,Task,await,state,源码,awaiter,async,ref
From: https://www.cnblogs.com/ccwzl/p/17636189.html

相关文章

  • 【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关
    FutureTask的基本介绍FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。FutureTask的特点用法异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执......
  • 赏帮赚系统源码平台APP开发
      近年来,赏金悬赏任务平台已经成为了互联网创业的热门领域之一。赏金悬赏平台通过将赏金分发给赏金猎人,让赏金猎人通过自己的技能和能力来获取赏金,从而实现双赢的局面。赏帮赚系统源码平台APP作为一款专业的赏金悬赏平台,其功能强大,使用简单,深受用户喜爱。  一、悬赏发布功......
  • 赏帮赚app任务源码
      人们的生活方式和消费观念也发生了翻天覆地的变化。越来越多的人选择通过互联网进行购物、娱乐、学习等消费行为。在这个过程中,赏帮赚APP作为一款新型的任务平台应运而生,成为了许多人赚取额外收入的选择之一。本文将对赏帮赚APP的任务源码功能进行分析与探讨,以了解其背后的技......
  • vite打包报错:ERROR: Top-level await is not available in the configured target env
    在开发时,vita打包报错如下: 原因:ECMAScript提案Top-levelawait由MylesBorins提出,它可以让你在模块的最高层中使用await操作符。在这之前,你只能通过在async函数或asyncgenerators中使用await操作符。Top-levelawait是个新特性,打包不支持此特性。解决方案:1.......
  • 直播平台源码实现播放视频的方法
    我们在直播平台中上传视频的时候往往会不知道自己上传的是什么格式的视频,在APP软件播放的时候有时可能会播放不出来,这时候我们就会专门按照该视频的格式进行播放,顺利的将视频展示出来,下面就给大家讲解下直播平台源码实现m3u8、flv、mp4视频播放的方法。一、直播平台源码中播放m3u......
  • 0基础微信小程序搭建教程之禾匠商城源码搭建教程
    2022年版禾匠商城V4搭建教程(重新更新一份禾匠商城V4独立版搭建教程,因为之前的版本搭建跟现在有点不一样,现在一键安装比之前简单多了,废话不多现在开始!)准备工作:1、服务器一个,要好2核4G,安装系统CentOS7.5和宝塔面板。2、Nginx1.20,插件:PHP72、数据库5.6、Redis6.2.6,其它......
  • 直播系统源码开发中圆角效果的实现
    现在直播平台越来越多,登录后发现每个直播平台的页面效果都非常的吸引人,界面都非常的美观,很多按钮都采用了圆角的设计方法,显得更加的柔和,我们就来看看直播系统源码怎么实现吧!在直播系统源码开发中,可以通过创建一个自定义的DrawableXML文件来实现给Button设置圆角的效果。以下是创......
  • 手把手教你使用LabVIEW TensorRT实现图像分类实战(含源码)
    ‍‍......
  • 大学生心理健康信息采集系统的设计与实现-计算机毕业设计源码+LW文档
    摘 要 随着计算机信息技术的发展,各种管理系统逐渐用在社会生产生活中,通过系统化管理提高办事流程,节约时间。越来越多的人习惯并依赖于通过信息技术和智能化的形式来处理日常各类事物。为了满足健康求助者的需要,以及适应现代化健康信管理的需求,决定开发基于微信小程序的大学生......
  • 互联网+AI+智慧工地管理平台源码实现
    基于微服务+Java+SpringCloud+Vue+UniApp+MySql开发的智慧工地云平台源码一、智慧工地概念智慧工地就是互联网+建筑工地,是将互联网+的理念和技术引入建筑工地,然后以物联网、移动互联网技术为基础,充分应用BIM、大数据、人工智能、移动通讯、云计算、物联网等信息技术,通过人机交......