首页 > 其他分享 >Kotlin协程:Flow基础原理

Kotlin协程:Flow基础原理

时间:2023-06-22 11:07:06浏览次数:37  
标签:FlowCollector 协程 Kotlin Flow value fun 方法 emit


本文分析示例代码如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.IO) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector<in T> {
    // 可挂起,非线程安全
    public suspend fun emit(value: T)
}

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 创建SafeCollector对象,对collector进行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 调用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 释放拦截的续体
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

    ...
    // 保存上下文中元素数量,用于检查上下文是否变化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 执行结束后的续体
    private var completion: Continuation<Unit>? = null

    // 协程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 挂起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }

    // 释放拦截的续体
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 发射数据
    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }

    // 重载的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        // 从续体中获取上下文
        val currentContext = uCont.context
        // 保证当前协程的Job是active的
        currentContext.ensureActive()
        // 获取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文发生变化
        if (previousContext !== currentContext) {
            // 检查上下文是否发生异常
            checkContext(currentContext, previousContext, value)
        }
        // 保存续体
        completion = uCont
        // 调用emitFun方法,传入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

    // 检查上下文变化,防止并发
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次执行过程中发生了异常
        if (previousContext is DownstreamExceptionElement) {
            // 抛出异常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 检查上下文是否发生变化,如果变化,则抛出异常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }

    // 用于抛出异常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心执行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) {
    // 获取当前suspend方法续体
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 调用重载的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 抛出异常
            throw e
        }
    }
}

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

Kotlin协程:Flow基础原理_ide

在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 尝试获取异常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果没有异常,则恢复flow方法续体的执行
    completion?.resumeWith(result as Result<Unit>)
    // 返回挂起标识,这里挂起的是消费过程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

Kotlin协程:Flow基础原理_Kotlin_02

作者:李萧蝶

标签:FlowCollector,协程,Kotlin,Flow,value,fun,方法,emit
From: https://blog.51cto.com/u_16163453/6534404

相关文章

  • Kotlin 增量编译是怎么实现的?
    前言编译运行是一个Android开发者每天都要做的工作,增量编译对于开发者也极其重要,高命中率的增量编译可以极大的提高开发者的开发效率与体验之前写了一些文章介绍Kotlin增量编译的原理,以及Kotlin1.7支持了跨模块增量编译了解了这些基本原理之后,我们今天一起来看下Kotlin增量编译......
  • 蔬菜识别系统Python+TensorFlow+Django+卷积神经网络算法
    一、介绍蔬菜识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Django框架,开发网页端操作平台,实现用户上传一张图片识别其名称。二、效果图片三、演示视频+代码视频+完整代码:http......
  • 换个姿势,十分钟拿下Java/Kotlin泛型
    0x1、引言解完BUG,又有时间摸鱼学点东西了,最近在复习Kotlin,跟着朱涛的《Kotlin编程第一课》查缺补漏。看到泛型这一章时,想起之前面一家小公司时的面试题:说下你对泛型协变和逆变的理解?读者可以试试在不查资料的情况下能否答得上来?反正我当时是没想起来,尽管写过一篇《Kotlin刨根问底......
  • 使用TensorFlow进行自动化测试与部署
    目录标题:《使用TensorFlow进行自动化测试与部署》背景介绍:随着人工智能和机器学习技术的快速发展,TensorFlow成为了一个广泛应用的深度学习框架,被广泛用于构建神经网络、图像识别、自然语言处理等应用。在深度学习应用中,测试和部署非常重要,因为测试和部署是保证应用程序质量......
  • TensorRT-Tensorflow深度学习模型优化视频课程-全套资料分享
        该课程详细讲解如何使用TensorRT来优化Tensorflow训练的深度学习模型。我们选择了LeNet模型和YOLOv3模型作为例子,与原始模型相比,优化后的模型速度分别提高了3.7倍和1.5倍。有关详细信息以及如何运行代码,请参阅具体课程视频。    文末附课程全套视频下载地址。 课程目......
  • Android Kotlin Retrofit MVP网络请求封装(四)
    依赖implementation'com.squareup.retrofit2:retrofit:2.9.0'implementation'com.google.code.gson:gson:2.8.8'implementation'com.squareup.okhttp3:okhttp:4.9.1'implementation'com.squareup.retrofit2:retrof......
  • Distributed tensorflow实现原理
    获得更多深度学习在NLP方面应用的经典论文、实践经验和最新消息,欢迎关注微信公众号“DeepLearning_NLP”或者扫描头像二维码添加关注。分布式tensorflow:本文档将展示如何创建一个tensorflow服务的集群,如何在不同的集群之间分布式部署计算图。你好,分布式tensorflow!首先,简单实践一个......
  • Android Kotlin 底部菜单栏
    LoginSuccessActivity布局<?xmlversion="1.0"encoding="utf-8"?><LinearLayoutxmlns:android="http://schemas.android.com/apk/res/android"xmlns:app="http://schemas.android.com/apk/res-auto"xmlns:tool......
  • Android Kotlin MVP 登录实现
    一:新建MVP软件包文件 activity_main.xml界面<?xmlversion="1.0"encoding="utf-8"?><RelativeLayoutxmlns:android="http://schemas.android.com/apk/res/android"xmlns:app="http://schemas.android.com/apk/res-auto"......
  • 聊一聊 Rust 的 stack overflow
    早上看到了这篇文章智能指针有可能会让你的应用崩溃,下面分析一下会导致stackoverflow的代码structNode<T>{val:T,next:Option<Box<Node<T>>>,}structLinkedList<T>{head:Option<Box<Node<T>>>,}impl<T>LinkedList<T&......