首页 > 其他分享 >【Kotlin】Channel简介

【Kotlin】Channel简介

时间:2024-12-08 12:21:44浏览次数:5  
标签:receive Kotlin send Channel 简介 println Dispatchers channel

1 前言

​ Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

​ 当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

​ Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

img

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

​ 说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

​ 说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        var iterator = channel.iterator()
        while (iterator.hasNext()) {
            var element = iterator.next()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in channel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channel 中 produce 和 actor

​ produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

fun main() {
    var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
        repeat(3) {
            println("send: $it")
            send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in receiveChannel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

fun main() {
    var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
        repeat(3) {
            var element = receive()
            println("receive: $element")
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            println("send: $it")
            sendChannel.send(it)
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

5 Channel 的关闭

​ 对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

fun main() {
    var channel = Channel<Int>(3)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
        channel.close()
        println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            var element = channel.receive()
            println("receive: $element")
        }
        println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

​ Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            for (element in channel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

​ 说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

fun main() {
    var broadcastChannel = BroadcastChannel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

​ 说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

fun main() {
    var channel = Channel<Int>()
    var broadcastChannel = channel.broadcast(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

​ 打印如下。

send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

声明:本文转自【Kotlin】Channel简介

标签:receive,Kotlin,send,Channel,简介,println,Dispatchers,channel
From: https://www.cnblogs.com/zhyan8/p/18592834

相关文章

  • 【Kotlin】select简介
    1前言​协程的select是一种用于异步操作的选择器,它允许同时等待多个挂起函数的结果,并在其中一个完成时执行相应的操作。​能够被select的事件都是SelectClause,在select.kt中有定义,如下。publicinterfaceSelectBuilder<inR>{publicoperatorfunSelec......
  • PCIe扫盲——PCIe简介
    PCI-Express是继ISA和PCI总线之后的第三代I/O总线,即3GIO。由Intel在2001年的IDF上提出,由PCI-SIG(PCI特殊兴趣组织)认证发布后才改名为“PCI-Express”。它的主要优势就是数据传输速率高,另外还有抗干扰能力强,传输距离远,功耗低等优点。注:第一代总线一般指ISA、EISA、VESA和MicroPla......
  • 快速入睡:如何获得一夜好眠0简介
    0简介睡眠是我们每个人都会做的事情;事实上,我们一生中大约有三分之一的时间是在这种奇怪的无意识状态中度过的。然而,直到最近,我们对睡眠的作用、我们需要多少睡眠以及梦在改善心理健康方面的作用仍然知之甚少。好消息是,在过去的20年里,我们对睡眠以及睡眠的重要性的认识发生了......
  • Kotlin设计模式之单例模式
    一.使用object关键字Kotlin提供了object关键字来直接创建单例对象,这是最简单和推荐的方式。//SingletonObject.ktobjectSingletonObject{fundoSomething(){println("Doingsomething...")}}二.使用companion objectcompanion object可以用于......
  • API接口简介:让前后端无缝沟通
    背景介绍:在今天的互联网世界,前后端分离的架构已经成为常态,而在这其中,API接口的作用至关重要。作为前后端沟通的桥梁,API接口确保了数据的顺利传递和业务逻辑的执行。而很多刚入门的编程爱好者可能跟我刚开始学习一样对这个概念一知半解,现在,我们就来聊聊API接口,它是如何工作的,以......
  • Go语言简介:新时代的高效编程语言
    一、什么是Go语言?Go语言(又称Golang)是一种由Google开发的开源编程语言,于2009年首次发布。它的主要设计目标是提供高效、简单和可靠的编程体验,同时结合静态类型语言的安全性和动态语言的开发效率。Go语言因其简洁的语法、内置并发支持和高性能而备受开发者青睐。1.1背景与......
  • Voxposer简介
    VoxPoserisaframeworkthatleverageslargelanguagemodels(LLMs)andvision-languagemodels(VLMs, orVision-LanguageModels,areaclassofartificialintelligencemodelsdesignedtoprocessandintegratevisualandtextualinformation.Thesemodelsa......
  • 北京泽元堂王世龙简介,如何预约及流程
    王世龙预约拨打:|78 |2○8 5○44王世龙,男,现北京泽元堂中医主治医师,毕业于河北医科大学。从医将近25年,擅长治疗于神经系统方面疾病,以一人一方辨证施治的方式进行调理治疗:帕金森病、植物神经紊乱(头晕头痛、焦虑抑郁)、特发性震颤、重症肌无力、运动神经元病、耳鸣等等。王世......
  • Android studio出现uplicate class kotlin.time.jdk8.DurationConversionsJDK8Kt foun
    Android编译KotlinSDK依赖包类重复冲突问题1、问题问题:gradle同步可以成功,但是编译运行时,出现以下异常。2、分析取以上内容中的一条进行分析可以看到在模块org.jetbrains.kotlin:kotlin-stdlib:1.8.20和org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.21中存在重复的......
  • 电商项目--分布式文件存储FastDFS简介
    对于大型互联网类型项目,其内部存在非常多的文件,会存在图片文档报表等文件。采用传统方式存储在机器磁盘上,其容量无法支撑这些文件,需要考虑分布式文件系统。一、FastDFS简介FastDFS是一个开源的轻量级分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同......