首页 > 其他分享 >Go语言精进之路读书笔记第34条——了解channel的妙用

Go语言精进之路读书笔记第34条——了解channel的妙用

时间:2024-02-24 15:55:15浏览次数:27  
标签:读书笔记 int goroutine chan 34 func time Go channel

c := make(chan int)     // 创建一个无缓冲(unbuffered)的int类型的channel
c := make(chan int, 5)  // 创建一个带缓冲的int类型的channel
c <- x          // 向channel c中发送一个值
<- c            // 从channel c中接收一个值
x = <- c        // 从channel c接收一个值并将其存储到变量x中
x, ok = <- c    // 从channel c中接收一个值。若channel关闭了,ok将置为false
for i := range c { ... } // 将for range与channel结合使用
close(c)        // 关闭channel c

c := make(chan chan int) // 创建一个无缓冲的chan int类型的channel
func stream(ctx context.Context, out chan<- Value) error //将只发送(send-only)channel作为函数参数
func spawn(...) <-chan T //将只接收(receive-only)channel作为返回值

34.1 无缓冲channel

  • 无缓冲channel的接收和发送操作是同步的,单方面的操作会让对应的goroutine陷入阻塞状态
  • 发送动作一定在接收动作完成之前
  • 接收动作一定在发送动作完成之前
var c = make(chan int)
var a string

func f() {
    a = "hello, world"
    <-c
}

func main() {
    go f()
    c <- 5
    println(a) // 输出:hello, world
}

1.用户信号传递

(1) 一对一通知信号

main goroutine在调用spawn函数后一直阻塞在对这个通知信号的接收动作上

type signal struct{}

func worker() {
    println("worker is working...")
    time.Sleep(1 * time.Second)
}

func spawn(f func()) <-chan signal {
    c := make(chan signal)
    go func() {
        println("worker start to work...")
        f()
        c <- signal(struct{}{})
    }()
    return c
}

func main() {
    println("start a worker...")
    c := spawn(worker)
    <-c
    fmt.Println("worker work done!")
}

(2) 一对多通知信号

main goroutine通过close(groupSignal)向所有worker goroutine广播“开始工作”的信号

type signal struct{}

func worker(i int) {
    fmt.Printf("worker %d: is working...\n", i)
    time.Sleep(1 * time.Second)
    fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
    c := make(chan signal)
    var wg sync.WaitGroup

    for i := 0; i < num; i++ {
        wg.Add(1)
        go func(i int) {
            <-groupSignal
            fmt.Printf("worker %d: start to work...\n", i)
            f(i)
            wg.Done()
        }(i + 1)
    }

    go func() {
        wg.Wait()
        c <- signal(struct{}{})
    }()
    return c
}

func main() {
    fmt.Println("start a group of workers...")
    groupSignal := make(chan signal)
    c := spawnGroup(worker, 5, groupSignal)
    time.Sleep(5 * time.Second)
    fmt.Println("the group of workers start to work...")
    close(groupSignal)
    <-c
    fmt.Println("the group of workers work done!")
}

通知一组worker goroutine退出

type signal struct{}

func worker(i int, quit <-chan signal) {
    fmt.Printf("worker %d: is working...\n", i)
LOOP:
    for {
        select {
        default:
            // 模拟worker工作
            time.Sleep(1 * time.Second)

        case <-quit:
            break LOOP
        }
    }
    fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(int, <-chan signal), num int, groupSignal <-chan signal) <-chan signal {
    c := make(chan signal)
    var wg sync.WaitGroup

    for i := 0; i < num; i++ {
        wg.Add(1)
        go func(i int) {
            fmt.Printf("worker %d: start to work...\n", i)
            f(i, groupSignal)
            wg.Done()
        }(i + 1)
    }

    go func() {
        wg.Wait()
        c <- signal(struct{}{})
    }()
    return c
}

func main() {
    fmt.Println("start a group of workers...")
    groupSignal := make(chan signal)
    c := spawnGroup(worker, 5, groupSignal)
    fmt.Println("the group of workers start to work...")

    time.Sleep(5 * time.Second)
    // 通知workers退出
    fmt.Println("notify the group of workers to exit...")
    close(groupSignal)
    <-c
    fmt.Println("the group of workers work done!")
}

2.用于替代锁机制

传统的基于共享内存+锁模式的goroutine安全的计数器实现

type counter struct {
    sync.Mutex
    i int
}

var cter counter

func Increase() int {
    cter.Lock()
    defer cter.Unlock()
    cter.i++
    return cter.i
}

func main() {
    for i := 0; i < 10; i++ {
        go func(i int) {
            v := Increase()
            fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
        }(i)
    }

    time.Sleep(5 * time.Second)
}

无缓存channel代替锁(通过通信来共享内存)

type counter struct {
    c chan int
    i int
}

var cter counter

func InitCounter() {
    cter = counter{
        c: make(chan int),
    }

    go func() {
        for {
            cter.i++
            cter.c <- cter.i
        }
    }()
    fmt.Println("counter init ok")
}

func Increase() int {
    return <-cter.c
}

func init() {
    InitCounter()
}

func main() {
    for i := 0; i < 10; i++ {
        go func(i int) {
            v := Increase()
            fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
        }(i)
    }

    time.Sleep(5 * time.Second)
}

34.2 带缓冲channel

  • 对带缓冲channel的发送操作在缓存区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收无须阻塞等待)
  • 对一个带缓冲的channel
    • 在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的goroutine不会阻塞
    • 在缓冲区已满的情况下,对其进行发送操作的goroutine会阻塞
    • 在缓冲区为空的情况下,对其进行接收操作的goroutine亦会阻塞

1.用作消息队列

  • 无论是单收单发、还是多收多发,带缓存channel的收发性能都要好于无缓存channel
  • 对于带缓冲channel而言,选择适当容量会在一定程度上提升收发性能

2.用作计数信号量

同时允许处于活动状态的最大goroutine数量为3

var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)

func main() {
    go func() {
        for i := 0; i < 8; i++ {
            jobs <- (i + 1)
        }
        close(jobs)
    }()

    var wg sync.WaitGroup

    for j := range jobs {
        wg.Add(1)
        go func(j int) {
            active <- struct{}{}
            log.Printf("handle job: %d\n", j)
            time.Sleep(2 * time.Second)
            <-active
            wg.Done()
        }(j)
    }
    wg.Wait()
}

3.len(channel)的应用

  • len(channel)语义
    • 当s为无缓冲channel时,len(s)总是返回0
    • 当s为带缓冲channel时,len(s)返回当前channel s中尚未被读取的元素个数

使用select原语的default分支语义,当channel空的时候,tryRecv不会阻塞;当channel满的时候,trySend也不会阻塞

  • 有一个问题是这种方法改变了channel的状态:接收或者发送了一个元素
  • 特定的场景下,可以用len(channel)来实现
    • 多发送单接收的场景,即有多个发送者,但只有一个接收者。可以在接收者goroutine中根据len(channel)是否大于0来判断channel中是否有数据需要接收
    • 多接收单发送的场景,即有多个接收者,但只有一个发送者。可以在发送者goroutine中根据len(channel)是否小于cap(channel)
func producer(c chan<- int) {
    var i int = 1
    for {
        time.Sleep(2 * time.Second)
        ok := trySend(c, i)
        if ok {
            fmt.Printf("[producer]: send [%d] to channel\n", i)
            i++
            continue
        }
        fmt.Printf("[producer]: try send [%d], but channel is full\n", i)
    }
}

func tryRecv(c <-chan int) (int, bool) {
    select {
    case i := <-c:
        return i, true

    default:
        return 0, false
    }
}

func trySend(c chan<- int, i int) bool {
    select {
    case c <- i:
        return true
    default:
        return false
    }
}

func consumer(c <-chan int) {
    for {
        i, ok := tryRecv(c)
        if !ok {
            fmt.Println("[consumer]: try to recv from channel, but the channel is empty")
            time.Sleep(1 * time.Second)
            continue
        }
        fmt.Printf("[consumer]: recv [%d] from channel\n", i)
        if i >= 3 {
            fmt.Println("[consumer]: exit")
            return
        }
    }
}

func main() {
    c := make(chan int, 3)
    go producer(c)
    go consumer(c)

    select {} // 故意阻塞在此
}

34.3 nil channel的妙用

对没有初始化的channel(nil channel)进行读写操作将会发生阻塞

func main() {
    c1, c2 := make(chan int), make(chan int)
    go func() {
        time.Sleep(time.Second * 5)
        c1 <- 5
        close(c1)
    }()

    go func() {
        time.Sleep(time.Second * 7)
        c2 <- 7
        close(c2)
    }()

    var ok1, ok2 bool
    for {
        select {
        case x := <-c1:
            ok1 = true
            fmt.Println(x)
        case x := <-c2:
            ok2 = true
            fmt.Println(x)
        }

        if ok1 && ok2 {
            break
        }
    }
    fmt.Println("program end")
}

显式地将c1或c2置为nil,利用对一个nil channel执行获取操作,该操作将被阻塞的特性,已经被置为nil的c1或c2的分支将再也不会被select选中执行

func main() {
    c1, c2 := make(chan int), make(chan int)
    go func() {
        time.Sleep(time.Second * 5)
        c1 <- 5
        close(c1)
    }()

    go func() {
        time.Sleep(time.Second * 7)
        c2 <- 7
        close(c2)
    }()

    for {
        select {
        case x, ok := <-c1:
            if !ok {
                c1 = nil
            } else {
                fmt.Println(x)
            }
        case x, ok := <-c2:
            if !ok {
                c2 = nil
            } else {
                fmt.Println(x)
            }
        }
        if c1 == nil && c2 == nil {
            break
        }
    }
    fmt.Println("program end")
}

34.4 与select结合使用的一些惯用法

1.利用default分支避免阻塞

// $GOROOT/src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
    // 无阻塞地向c发送当前时间
    ...
    select {
        case c.(chan Time) <- Now():
        default:
    }
}

2.实现超时机制

要注意timer使用后的释放

  • timer实质上是由Go运行时自行维护的,不是操作系统的定时器资源。Go启动了一个单独goroutine来维护一个“最小堆”,定期唤醒并读取堆顶的timer对象,执行完毕后删除
  • timer.Timer实则是在这个最小堆中添加一个timer对象实例,而调用timer.Stop方法则是从堆中删除对应的timer对象
func worker() {
    select {
        case <- c:
            //...
        case <-time.After(30 * time.Second):
            return
    }
}

3.实现心跳机制

func wokrer() {
    heartbeat := time.NewTicker(30 * time.second)
    defer heartbeat.Stop()
    for {
        select {
            case <-c:
                //处理业务逻辑
            case <- heartbeat.C:
                //处理心跳
        }
    }
}

标签:读书笔记,int,goroutine,chan,34,func,time,Go,channel
From: https://www.cnblogs.com/brynchen/p/18031170

相关文章

  • 《程序是怎样跑起来的》第7章读书笔记
    第7章就把重点放到了这本书程序是怎么跑起来的重点上,但同时也难理解了许多。我们知道的是程序要在特定的运行环境上才能运行,而运行环境等于操作系统加硬盘,每个程序都有其对应的运行环境操作系统和硬件决定了程序的运行环境,还需要知道的是,在将硬件作为程序运行环境考虑是CPU的类型......
  • 《程序是怎样跑起来的》第6章读书笔记
    前面讲述了内存跟磁盘,而内存跟磁盘里面的储存量也是有限的,那么我们就需要去压缩数据,而数据该怎么压缩呢?第6章就为我们介绍了。首先要了解文件中储存数据的格式文件是在磁盘等储存媒体中储存数据的一种形式,程序是以字节为单位向文件中储存数据的储存在文件中的数据。如果表示字符,那......
  • Go语言精进之路读书笔记第33条——掌握Go并发模型和常见并发模式
    不要通过共享内存来通信,而应该通过通信来共享内存。——RobPike33.1Go并发模型CSP(CommunicatingSequentialProcess,通信顺序进程)模型。一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合。Go始终推荐以CSP模型风格构建并发程序。Go针对CSP模型提供......
  • [数据库] 使用索引(2): mongoDB
    mongoDB的索引mongodb的索引和mysql基本类似,也是默认主键(相当于mongo中的_id字段)为索引,进行索引排序etc.索引分类单键索引将一个字段作为索引,默认_id,也可以将其他字段作为索引db.collection.createIndex({year:1})其中value为1则是正序,为-1则是倒序复合索引......
  • SciTech-Mathmatics-Trigonometric Identities you must remember: 需要记住的三角函
    TrigonometricIdentities(Revision:1.4)TrigonometricIdentitiesyoumustrememberThe“bigthree”trigonometricidentitiesare\(\large\begin{equation}\sin^{2}t+cos^{2}t=1\tag{1}\end{equation}\)\(\large\begin{equation}\sin(......
  • pytest简易教程(34):pytest常用插件 - 测试报告(pytest-html)
     pytest简易教程汇总,详见:https://www.cnblogs.com/uncleyong/p/17982846关于pytest-html通过命令行方式,生成xml/html格式的测试报告,存储于用户指定路径报告会覆盖上一次的 插件安装pipinstallpytest-html 使用方式命令行格式:pytest--html=./report/report.html......
  • Windows ® Installer. V 5.0.20348.1668 (msiexec命令 参数
    Windows®Installer.V5.0.20348.1668msiexec/Option<RequiredParameter>[OptionalParameter]安装选项 </package|/i><Product.msi> 安装或配置产品 /a<Product.msi> 管理安装-在网络上安装产品 /j<u|m><Product.msi>[/t<TransformList>......
  • AT_abc341_g [ABC341G] Highest Ratio 题解
    题目传送门前置知识单调栈简化题意给定一个长度为\(n\)的序列\(a\)。对于所有的\(l(1\lel\len)\),均求出\(\max\limits_{r=l}^{n}\{\frac{\sum\limits_{i=l}^{r}a_{i}}{r-l+1}\}\)。解法令\(sum_{i}=\sum\limits_{j=1}^{i}a_{j},P_{i}=(i,sum_{i})\),则有\(\beg......
  • kettle从入门到精通 第四十七课 ETL之kettle mongo output 写入
    1、上一节课我们学习了mongoinput读取步骤,本节课我们一起学习下mongoout写入步骤,该步骤可以将数据写入到mongo中,如下图所示。 2、 配置mongo连接,有两种方式,如截图所示。ConnectionString:如StringconnectionString="mongodb://username:password@localhost:27017/myda......
  • abc340比赛总结
    写在前面作业还没有写完,简单写一下吧,做题过程中的感受就不会写那么详细了。A比较简单,就是个等差数列,数据范围很小,随便切。B简单的题目,但是我罚时了四次。把题目看错了,下次注意。C规律一开始没有推出来,写了个不带记忆化的\(O(logn)\)的深搜(没带记忆化所......