首页 > 编程语言 >Go 并发编程之Channel

Go 并发编程之Channel

时间:2024-08-31 19:51:18浏览次数:11  
标签:nil gp 编程 chan mysg Channel Go sg channel

Go 中的 channel 是一种用于在 Goroutine 之间传递数据的通信机制,通常被用来实现 Goroutine 之间的同步和数据共享。

在这里插入图片描述

1. channel 的基本概念

1.1 创建channel

channel 在类型上分为两种:双向和单向。

  • 双向channel: 既能接收又能发送。
  • 单向channel: 只能发送或只能接收。

channel是一个引用类型,使用make创建

未初始化的channel零值为nil

声明并初始化

ch := make(chan int)       // 双向
readCh := make(<-chan int) // 只能接收
writeCh:=make(chan<- int) // 只能发送

1.2 有无缓冲 channel

1.2.1 无缓冲channel
  • 默认创建的是无缓冲 channel,也称为同步 channel

  • 无缓冲 channel 的特点是发送操作会一直阻塞,直到有 Goroutine 接收了这个数据。

    ch := make(chan int) 
    
1.2.2 有缓冲channel
  • 有缓冲 channel 允许在没有接收者的情况下发送一定数量的数据。

  • 创建时指定缓冲区大小,缓冲区未满时发送操作不会阻塞。

    ch := make(chan int, 3)  // 有缓冲 channel,缓冲区大小为 3
    

1.3 channel常见操作

1.3.1 接收/发送

<-指向chan,就表示往chan里边写入数据;如果箭头远离chan,则表示从chan读数据。

ch := make(chan int, 3)
ch <- 100         // 发送数据
fmt.Println(<-ch) // 接收数据 100

在接收数据时,可以返回两个返回值。第一个返回值返回channel中的元素,第二个返回值为bool类型,表示是否成功地从channel中读取到一个值。

  ch := make(chan int, 3)
	ch <- 100 // 发送数据
	value, ok := <-ch
  //value is:100,ok is:true
  fmt.Printf("value is:%d,ok is:%t\n", value, ok) 
	close(ch)
	value, ok = <-ch
  //value is:0,ok is:false
  // 表示channel已经被close而且channel中没有缓存的数据
	fmt.Printf("value is:%d,ok is:%t\n", value, ok)

1.4 channel 高级操作

1.4.1 关闭 channel
  • 使用 close 函数关闭 channel,通知接收方不再有数据发送。
  • 关闭的 channel 不能再发送数据,否则会panic,但仍可接收剩余的数据。

往关闭的chan发送数据

panic: send on closed channel

ch := make(chan int, 3)
close(ch)
ch <- 100

接收剩余数据

value, ok := <-ch  // 如果 ok 为 false,表示channel已经被close而且channel中没有缓存的数据
1.4.2 select 多路复用

select 是 Go 语言并发编程中不可或缺的工具。它通过多路复用 channel 操作,使得 Goroutine 可以灵活、高效地进行通信和同步。无论是实现超时处理、非阻塞操作,还是多路监听,select 都提供了简洁而强大的解决方案。

介绍几种常见使用实例:

1.4.2.1 监听多个channel

等待其中一个 case 中的 channel 可用,然后执行该 case 对应的代码

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- "message from ch1"
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "message from ch2"
	}()

	for i := 0; i < 2; i++ {
		select {
		case msg1 := <-ch1:
			fmt.Println("Received:", msg1)
		case msg2 := <-ch2:
			fmt.Println("Received:", msg2)
		}
	}
}

输出:
ch1 在 1 秒后发送消息, ch2 会在 2 秒后发送消息,select 会首先处理 ch1 的消息,然后处理 ch2 的消息。
Received: message from ch1
Received: message from ch2
1.4.2.2 default 分支

select 语句可以包含一个 default 分支,当所有的 channel 都无法操作时,会执行 default 分支的代码。这种方式可以用来实现非阻塞的 channel 操作。

package main

import "fmt"

func main() {
	ch := make(chan string)

	select {
	case msg := <-ch:
		fmt.Println("Received:", msg)
	default:
		fmt.Println("No message received")
	}
}

输出:
No message received
1.4.2.3 超时机制
package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)

	go func() {
		time.Sleep(3 * time.Second)
		ch <- "Hello"
	}()

	select {
	case msg := <-ch:
		fmt.Println("Received:", msg)
	case <-time.After(2 * time.Second):
		fmt.Println("Timeout!")
	}
}

输出:
Timeout
超时设置为 2 秒,而 ch 在 3 秒后才有消息,select 最终会选择超时分支,输出 "Timeout!"
1.4.4 信号处理(通过 selectchannel 实现优雅退出)
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)

    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        done <- true
    }()

    fmt.Println("awaiting signal")
    <-done
    fmt.Println("exiting")
}

2.channel 实现原理

从代码层面深入理解channel实现原理

2.1 chan数据结构

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

字段含义:

  • qcount: 当前 channel存在多少元素,通过len函数可以获得
  • dataqsiz: 循环队列大小,可以用于存放的元素容量
  • buf: 用于存放队列的环形缓冲区
  • elemsize: 存放元素类型大小
  • closed: 是否关闭的标识
  • elemtype: 存放元素类型
  • sendx: 发送元素进入环形缓冲区的 index,当channel接收到了新的数据时,该指针就会加上elemsize,移动到下一个位置
  • recvx: 接收元素所处的环形缓冲区的 index
  • recvq: channel中没有数据可读时,陷入阻塞的协程队列
  • sendq: channel满了无法发送时,陷入阻塞的协程队列
  • lock: 互斥锁

在这里插入图片描述

2.2 图解 hchan

接下来用图解的方式模拟 channel 的操作,加深对 hchan 各个字段的理解,之后在来对底层原理进行解读。

2.2.1 创建
ch:=make(chan int,3)
2.2.2 发送
ch<-1
ch<-2
ch<-3

在这里插入图片描述

这时会发现队列满了,那在发送一次会发生什么呢?阻塞

ch<-4

正在运行的协程G1会主动让出M给其他协程,自己进入waiting状态,同时G1也会抽象成sudog结构体保存到hchansendq中等待被唤醒。

在这里插入图片描述

2.2.3 接收
value<-ch

这时channel会将发送阻塞队列中的G1取出,将G1发送的数据写入缓存中,然后唤醒G1,将G1放到可运行的goroutine队列中。

在这里插入图片描述

2.3 源码解读

runtime/chan.go

2.3.1 初始化
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // ...
    // 申请的内存空间大小是否溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:
        // 无缓冲的 elem.size=0 
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 有缓冲的,非指针类型的,分配一块连续的内存给hchan和buf
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 有缓冲的,指针类型,单独分配buf
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    
    lockInit(&c.lock, lockRankHchan)

    return
}
2.3.2 发送

chansend函数

判断chan是否为nil(有没有初始化)

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 未初始化
	if c == nil {
		if !block {
			return false
		}
    // 阻塞
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
		throw("unreachable")
    ...
	}

当channel未关闭且容量满了但是又不想阻塞的时候就直接返回

 if !block && c.closed == 0 && full(c) {
        return false
    }

往关闭的channel写入数据直接panic

if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

写时存在阻塞读协程

	if sg := c.recvq.dequeue(); sg != nil {
    // 找到了一个正在等待的读协程,直接将发送的数据投递给读协程,直接绕过buf缓冲区,速度更快
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

写时无阻塞读协程且环形缓冲区未满

// 判断 buf 缓冲区是否满了	
if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
    // 将数据保存到缓冲区
		typedmemmove(c.elemtype, qp, ep)
   // 索引后移
		c.sendx++
   //如果c.sendx == c.dataqsiz,表示sendx索引已经达到缓冲队列最尾部了,则将sendx移动到0(第一个位置)
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
    //数量+1
		c.qcount++
		unlock(&c.lock)
		return true
	}

写时无阻塞读协程且环形缓冲区满了

 //获取当前goroutine
	gp := getg()
	// 获取一个sudog对象并设置其字段
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

	mysg.elem = ep //将指向发送数据的指针保存到 elem 中
	mysg.waitlink = nil
	mysg.g = gp //将g指向当前的goroutine
	mysg.isSelect = false
	mysg.c = c //当前阻塞的 channel
	gp.waiting = mysg
	gp.param = nil  
	c.sendq.enqueue(mysg)// 将sudog加入到channel的发送等待队列

	atomic.Store8(&gp.parkingOnChan, 1)
	// 当前 Goroutine 切换为等待状态并阻塞等待其他的Goroutine从 channel 接收数据并将其唤醒
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

	//保证当前需要被发送的的值一直是可用状态
	KeepAlive(ep)

	// 协程被唤醒后
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	//更新goroutine相关的对象信息
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	//释放sudog对象
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true

整个流程大概如下:

在这里插入图片描述

2.3.3 接收

chanrecv函数

channel为nil

if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

channel关闭且内部无元素

// 关闭
if c.closed != 0 {
    // 元素为空
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
        // 将接收的值置为空值
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

读时有阻塞写协程

if sg := c.sendq.dequeue(); sg != nil {
      // 缓冲区容量为0,直接将写数据传递给接收方
      // 否则从缓冲区头部获取数据,并将获取的发送方的数据写入到缓冲区尾部
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}

读时无阻塞写协程且缓冲区有数据

	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
      // 将该数据复制到接收对象
			typedmemmove(c.elemtype, ep, qp)
		}
    // 清空该指针地址的数据
		typedmemclr(c.elemtype, qp)
    // 接收索引后移
		c.recvx++
    // 如果接收索引等于环形数组容量,则置为0。
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
    // 数量-1
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

读时无阻塞写协程且缓冲区无数据

// 缓冲区队列没有数据可以读取,则将当前G打包成Sudo结构并加入到接收等待队列  
gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
  // 加入到接收等待队列recvq中
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	gp.parkingOnChan.Store(true)
  // 阻塞等待被唤醒
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
  // 释放sudog
	releaseSudog(mysg)
	return true, success

整个流程大概如下:

在这里插入图片描述

2.3 .4 关闭

closechan函数

func closechan(c *hchan) {
  // 未初始化 直接panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
  // 关闭已经关闭的channel 直接panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

	c.closed = 1

	var glist gList

	// release all readers
  // 将阻塞读协程队列中的协程节点统一添加到 glist
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
  // 将阻塞写协程队列中的协程节点统一添加到 glist;
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
  // 唤醒所有的glist中的goroutine
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

3总结

本文通过介绍 channel 的基本概念、图解底层数据结构,并剖析源码,深入探讨了 channel 的工作原理。通过这些内容,读者能够更全面地理解 channel 的机制,不仅有助于在实际开发中更高效地使用 channel,还能为定位和解决相关问题提供理论支持。通过对 channel 的深入剖析,本文为后续的并发编程实践奠定了坚实的基础,帮助开发者在复杂的并发场景中游刃有余。

如果您觉得有帮助,请关注我,另公众号【小张的编程世界】,如有任何错误或建议,欢迎指出。感谢您的阅读!

标签:nil,gp,编程,chan,mysg,Channel,Go,sg,channel
From: https://blog.csdn.net/luozong2689/article/details/141651715

相关文章

  • 深入解析 Go 中 Map
    0前言Go语言中的map是一种内建的数据结构,用于存储键值对。它类似于其他编程语言中的哈希表或字典,提供了快速的插入、删除和查找操作。本文将深入浅出介绍map基本概念、使用方式、核心原理、性能以及最佳实践,帮助读者更好的理解和使用map。如果您觉得有帮助,请关注我,另......
  • 深入解析 Go 中 Slice
    0前言slice是一种灵活且强大的数据结构,它在功能上类似于其他编程语言中的数组,但提供了更多的灵活性。与数组不同,slice允许动态调整长度,使其在大多数场景中更加适用。本文将深入解析slice的基本概念及底层实现原理,并通过分析一些面试中常见的易错题,加深对slice的理......
  • 2024-08-31:用go语言,给定一个数组apple,包含n个元素,每个元素表示一个包裹中的苹果数量;
    2024-08-31:用go语言,给定一个数组apple,包含n个元素,每个元素表示一个包裹中的苹果数量;另一个数组capacity包含m个元素,表示m个不同箱子的容量。有n个包裹,每个包裹内装有指定数量的苹果,以及m个箱子,每个箱子的容量不同。任务是将这n个包裹中的所有苹果重新分配到箱子中,最小化所需的......
  • 2024-08-31:用go语言,给定一个数组apple,包含n个元素,每个元素表示一个包裹中的苹果数量;
    2024-08-31:用go语言,给定一个数组apple,包含n个元素,每个元素表示一个包裹中的苹果数量;另一个数组capacity包含m个元素,表示m个不同箱子的容量。有n个包裹,每个包裹内装有指定数量的苹果,以及m个箱子,每个箱子的容量不同。任务是将这n个包裹中的所有苹果重新分配到箱子中,最小化所需的箱子......
  • Go实战全家桶之一:goconfig依赖注入扩展之自动注入配置项、工业级巨匠
    开源地址:goconfig:gitclonehttps://gitee.com/ichub/go.git基础结构packageichubconfigimport("gitee.com/ichub/goconfig/common/base/basedto""gitee.com/ichub/goconfig/common/base/baseutils/reflectutils""github.com/gogf/......
  • Goolge earth studio 进阶4——路径修改与平滑
    如果我们希望在大约中途时获得更多的城市鸟瞰视角。可以将相机拖动到这里并创建一个新的关键帧。camera_target_clip_7EarthStudio会自动平滑我们的路径,所以当我们通过这个关键帧时,不是一个生硬的角度,而是一个平滑的曲线。camera_target_clip_8路径上有贝塞尔控制......
  • Vue期末考试速成复习指南附编程题(js开发基础+路由+Pinia)
    前文:本文参考书籍《Vue.js前端开发实战(第二版)》--黑马程序员/编著重点在于本科期末速成和0基础入门目录:一.初识Vue1.包管理工具:npmyarn2.创建Vue项目二.js开发基础1.什么是单文件组件?2.单文件组件基本结构3.切换页面显示组件3.数据绑定与输出4.Vue引入Html页面5.......
  • Go plan9 汇编: 打通应用到底层的任督二脉
    原创文章,欢迎转载,转载请注明出处,谢谢。0.前言作为一个严肃的Gopher,了解汇编是必须的。本汇编系列文章会围绕基本的Go程序介绍汇编的基础知识。1.Go程序到汇编首先看一个简单到令人发指的示例:packagemainfuncmain(){ a:=1 print(a)}运行程序,输出:#gorun......
  • 多线程编程(面试重中之中,超简单理解)
    最近项目比较紧急,固本之旅卡顿了一段时间,抽时间看了一下多线程,面试重点知识!!!多线程编程优点:提高程序的响应速度,增加用户的体验;提高计算机系统CPU的利用率;优化程序结构,将一个复杂的单线程分化成多个清晰化的单线程,更有利于维护并行指两个以上的事物在同一时刻同时发......
  • 代码大全伪代码转换为高层次的注释是一种很好的编程实践
    首先,把伪代码转变为高层次的注释,并写出第一条和最后一条语句在C++中也就是“{”和“}”。伪代码示例/*Thisroutineoutputsanerrormessagebasedonanerrorcodesuppliedbythecallingroutine.Thewayitoutputsthemessagedependsonthecurrentproce......