首页 > 其他分享 >go语言channel

go语言channel

时间:2023-04-29 18:56:43浏览次数:33  
标签:语言 nil elem mysg hchan go sg channel

go语言channel

设计原理

go语言中提倡:不要通过共享内存方式进行通信,而应该通过通信的方式共享内存。

在很多编程语言中,多个线程传递数据的方式一般是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与go语言的设计并不相同。

虽然在go语言中也能使用共享内存加互斥锁进行通信,但是go语言提供了一种不同的并发模型——通信顺序进程(communicating sequential processes,CSP)。goroutine和channel分别对应CSP中的实体和传递信息的媒介

img

channel在运行时的内部表示是runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,从某种程度上说,channel是一个用于同步和通信的有锁队列。

数据结构

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

sendq和recvq存储了当前channel由于缓冲区空间不足而阻塞的goroutine列表,这些等待队列使用双向链表runtime.waitq表示,链表中所有元素都是runtime.sudog结构

type waitq struct {
	first *sudog
	last  *sudog
}
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool
	next     *sudog
	prev     *sudog
	elem     unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32
	parent      *sudog // semaRoot binary tree
	waitlink    *sudog // g.waiting list or semaRoot
	waittail    *sudog // semaRoot
	c           *hchan // channel
}

runtime.sudog表示一个在等待列表中的goroutine,该结构中存储了两个分别指向前后runtime.sudog的指针以构成链表。

创建channel

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

上述代码根据channel中收发元素的类型和缓冲区大小初始化runtime.hchan和缓冲区:

  • 如果当前channel中不存在缓冲区,那么只会为runtime.hchan分配一块内存空间
  • 如果当前channel中存储的不是指针类型,会为当前channel和底层数组分配一块连续的内存空间
  • 默认情况下会单独为runtime.hchan和缓冲区分配内存

发送数据

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation.
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

直接发送

如果目标channel没有被关闭并且已经有处于读等待的goroutine,那么runtime.chansend会从接收队列recvq中取出最先陷入等待的goroutine并直接向它发送数据:

发送数据时会调用runtime.send,该函数的执行可以分为两个部分:

  1. 调用runtime.sendDirect将发送的数据直接复制到x = <- c表达式中变量x所在的内存地址上;
  2. 调用runtime.goready将等待接收数据的goroutine标记成可运行状态Grunnable,并把该goroutine放到发送方所在处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方。

发送数据只是将接收方的goroutine放到了处理器的runnext中,程序并没有立刻执行该goroutine

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			qp := chanbuf(c, c.recvx)
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

img

标签:语言,nil,elem,mysg,hchan,go,sg,channel
From: https://www.cnblogs.com/zpf253/p/17364362.html

相关文章

  • Python之路【第十七篇】:Django【进阶篇】
    原博客笔记链接:https://www.cnblogs.com/wupeiqi/articles/5246483.html 1.Model到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞:创建数据库,设计表结构和字段使用MySQLdb来连接数据库,并编写数据访问层代码业务逻辑层去调用数据访问层执行数......
  • Python之路【第十六篇】:Django【基础篇】
    原博客教材链接:https://www.cnblogs.com/wupeiqi/articles/5237704.html Python的WEB框架有Django、Tornado、Flask等多种,Django相较与其他WEB框架其优势为:大而全,框架本身集成了ORM、模型绑定、模板引擎、缓存、Session等诸多功能。 1.基本配置1.1创建django程......
  • Django基础(二)
    原博客笔记链接:https://www.cnblogs.com/wupeiqi/articles/4508271.html上节回顾web程序的生命周期MVC和MTV路由系统和自定义动态路由Model模版语言和simple_tag 7.Model连表操作指定映射连表条件一对一多对多#表结构#!/usr/bin/envpythoncl......
  • 希望所有计算机学生能看到这篇c语言教程
    大部分程序员走入编程世界第一个学习的语言就是C语言。作为一门古老的编程语言,c语言拥有48年的发展历程。为什么要学习C语言?C语言是学习计算机程序设计语言的入门语言。最全面的编程面试网站C语言是一门偏底层的语言,学好它,可以让你更好的了解计算机。学会了C语言,你就能学习......
  • Django基础(一)
    原博客笔记链接:https://www.cnblogs.com/wupeiqi/articles/4491246.html1.前戏1.1pythonWeb程序众所周知,对于所有的Web应用,本质上其实就是一个socket服务端,用户的浏览器其实就是一个socket客户端。#!/usr/bin/envpython#coding:utf-8importsocketdefh......
  • django学习笔记--小白三板斧
    小白必会三板斧1.HttpResponse #返回字符串returnHttpResponse("Hello,world.")2.render #返回一个模板returnrender(request,'hello.html') #传参返回l1=['Billy','Felix','Mary']returnrender(reque......
  • c语言中inline用法
    使用inline函数可以提升程序效率,但是让inline函数生效是有条件的...打开Linux内核源代码,会发现内核在定义C语言函数时,有很多都带有“inline”关键字,请看下图,那么这个关键字有什么作用呢? inline关键字的作用在C语言程序开发中,inline一般用于定义函数,inline函数也被称作......
  • Django框架基础7
    本节主要知识点:一对一(OneToOneFiled)一对多(ForeignKey)多对多(ManyToManyField)F对象查询Q对象查询一、Django数据表关联映射一对一(OneToOneFiled)一对多(ForeignKey)多对多(ManyToManyField)  我们知道涉及到数据表之间的对应关系就会想到一对一、一对多、多对多,在学习My......
  • Django学习笔记--目录结构
    Django目录结构myprojectmyproject文件夹 #项目同名的文件夹settings.py #django暴露给用户可以配置的配置文件urls.py#路由与视图函数(也可以是类)对应关系(路由层)wsgi.py #网关,准备一些相关的环境(可以忽略)app01文件夹 #应用(可以有多个)mi......
  • Django学习笔记--命令行启动与pycharm启动
    命令行模式创建Django项目django-adminstartproject项目名启动django项目cd项目名pythonmanage.pyrunserver#可以配置ip和端口pythonmanage.pyrunserver0.0.0.0:8000#如果遇到报错,是解释器和版本不一致导致。找到python38\lib\site-packages\django\con......