首页 > 其他分享 >go语言调度gmp原理(2)

go语言调度gmp原理(2)

时间:2023-05-16 21:22:27浏览次数:42  
标签:pp runtime gp goroutine 调度 newg go gmp stack

go语言调度gmp原理(2)

创建goroutine

通过runtime.newproc函数调用,runtime.newproc的入参是参数大小和表示函数的指针funcval,它会获取goroutine以及调用方的程序计数器,然后调用runtime.newproc1函数获取新的goroutine、结构体、将其加入处理器的运行队列,并在满足条件时调用runtime.wakep唤醒新的处理器执行goroutine

func newproc(fn *funcval) {
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, gp, pc)

		pp := getg().m.p.ptr()
		runqput(pp, newg, true)

		if mainStarted {
			wakep()
		}
	})
}

runtime.newproc1会根据传入参数初始化一个g结构体,我们将该函数分成一下三个部分介绍其实现:

  1. 获取或者创建新的goroutine结构体
  2. 将传入的参数移到goroutine的栈上
  3. 更新goroutine调度相关属性

首先了解goroutine结构体的创建过程

func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
	if fn == nil {
		fatal("go of nil func value")
	}

	mp := acquirem() // disable preemption because we hold M and P in local vars.
	pp := mp.p.ptr()
	newg := gfget(pp)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
	totalSize = alignUp(totalSize, sys.StackAlign)
	sp := newg.stack.hi - totalSize
	spArg := sp
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}

	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if isSystemGoroutine(newg, false) {
		sched.ngsys.Add(1)
	} else {
		// Only user goroutines inherit pprof labels.
		if mp.curg != nil {
			newg.labels = mp.curg.labels
		}
		if goroutineProfile.active {
			// A concurrent goroutine profile is running. It should include
			// exactly the set of goroutines that were alive when the goroutine
			// profiler first stopped the world. That does not include newg, so
			// mark it as not needing a profile before transitioning it from
			// _Gdead.
			newg.goroutineProfiled.Store(goroutineProfileSatisfied)
		}
	}
	// Track initial transition?
	newg.trackingSeq = uint8(fastrand())
	if newg.trackingSeq%gTrackingPeriod == 0 {
		newg.tracking = true
	}
	casgstatus(newg, _Gdead, _Grunnable)
	gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))

	if pp.goidcache == pp.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)
		pp.goidcache -= _GoidCacheBatch - 1
		pp.goidcacheend = pp.goidcache + _GoidCacheBatch
	}
	newg.goid = pp.goidcache
	pp.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
		if newg.labels != nil {
			// See note in proflabel.go on labelSync's role in synchronizing
			// with the reads in the signal handler.
			racereleasemergeg(newg, unsafe.Pointer(&labelSync))
		}
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	releasem(mp)

	return newg
}

上述代码会先从处理器的gfree列表中查找空闲goroutine,如果不存在空闲goroutine,就会通过runtime.malg创建一个栈大小足够的新结构体

img

初始化结构体

runtime.gfget通过两种方式获取新的runtime.g

  1. 从goroutine所在处理器的gFree列表或者调度器的sched.gFree列表中获取runtime.g
  2. 调用runtime.malg生成一个新的runtime.g,并将结构体追加到全局的goroutine列表allgs中

rutime.gfget中包含两部分逻辑,他们会根据处理器中gFree列表中goroutine的数量做出不同的决策

  • 当处理器的goroutine列表为空时,会将调度器持有的空闲goroutine转移到当前处理器上,直到gFree列表中的goroutine数量达到32
  • 当处理器的goroutine数量充足时,会从列表头部返回一个新的goroutine
func gfget(pp *p) *g {
retry:
	if pp.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
		lock(&sched.gFree.lock)
		// Move a batch of free Gs to the P.
		for pp.gFree.n < 32 {
			// Prefer Gs with stacks.
			gp := sched.gFree.stack.pop()
			if gp == nil {
				gp = sched.gFree.noStack.pop()
				if gp == nil {
					break
				}
			}
			sched.gFree.n--
			pp.gFree.push(gp)
			pp.gFree.n++
		}
		unlock(&sched.gFree.lock)
		goto retry
	}
	gp := pp.gFree.pop()
	if gp == nil {
		return nil
	}
	pp.gFree.n--
	if gp.stack.lo != 0 && gp.stack.hi-gp.stack.lo != uintptr(startingStackSize) {
		// Deallocate old stack. We kept it in gfput because it was the
		// right size when the goroutine was put on the free list, but
		// the right size has changed since then.
		systemstack(func() {
			stackfree(gp.stack)
			gp.stack.lo = 0
			gp.stack.hi = 0
			gp.stackguard0 = 0
		})
	}
	if gp.stack.lo == 0 {
		// Stack was deallocated in gfput or just above. Allocate a new one.
		systemstack(func() {
			gp.stack = stackalloc(startingStackSize)
		})
		gp.stackguard0 = gp.stack.lo + _StackGuard
	} else {
		if raceenabled {
			racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if msanenabled {
			msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if asanenabled {
			asanunpoison(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
	}
	return gp
}

当调度器的gFree和处理器的gFree列表都不存在结构体时,运行时会调用runtime.malg初始化新的runtime.g结构,如果申请的堆栈大小大于0,这里会通过runtime.stackalloc分配2KB大小的栈空间

func malg(stacksize int32) *g {
	newg := new(g)
	if stacksize >= 0 {
		stacksize = round2(_StackSystem + stacksize)
		systemstack(func() {
			newg.stack = stackalloc(uint32(stacksize))
		})
		newg.stackguard0 = newg.stack.lo + _StackGuard
		newg.stackguard1 = ^uintptr(0)
		// Clear the bottom word of the stack. We record g
		// there on gsignal stack during VDSO on ARM and ARM64.
		*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
	}
	return newg
}

runtime.malg返回的goroutine会存储到全局变量allgs中

运行队列

runtime.runqput会将goroutine放到运行队列上,这既可能是全局的运行队列也可能是处理器的本地运行队列

func runqput(pp *p, gp *g, next bool) {
	if randomizeScheduler && next && fastrandn(2) == 0 {
		next = false
	}

	if next {
	retryNext:
		oldnext := pp.runnext
		if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
	t := pp.runqtail
	if t-h < uint32(len(pp.runq)) {
		pp.runq[t%uint32(len(pp.runq))].set(gp)
		atomic.StoreRel(&pp.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	if runqputslow(pp, gp, h, t) {
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

代码的核心逻辑如下

  1. 当next为true时,将goroutine设置到处理器的runnext作为助力器执行的下一个任务
  2. 当next为false且本地运行队列还有剩余空间时,将goroutine加入处理器持有的本地运行队列
  3. 当处理器的本地运行队列已经没有剩余空间时,就会把本地队列中的一部分goroutine和待加入的goroutine通过runtime.runqputslow添加到调度器持有的全局队列上

处理器本地的运行队列是数组构成的环形链表,它醉倒可以存储256个待执行任务

go语言有两个运行队列,其中一个是处理器的本地运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局运行队列。

img

调度信息

运行时创建goroutine时会通过下面的代码设置调度相关信息,前两行代码会分别将程序计数器和goroutine设置成runtime.goexit和新创建goroutine运行的函数

上述调度信息sched不是初始化后的goroutine的最终结果,它还需要经过runtime.gostartcallfn和runtime.gostartcall的处理

func gostartcallfn(gobuf *gobuf, fv *funcval) {
	var fn unsafe.Pointer
	if fv != nil {
		fn = unsafe.Pointer(fv.fn)
	} else {
		fn = unsafe.Pointer(abi.FuncPCABIInternal(nilfunc))
	}
	gostartcall(gobuf, fn, unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
	sp := buf.sp
	sp -= goarch.PtrSize
	*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
	buf.sp = sp
	buf.pc = uintptr(fn)
	buf.ctxt = ctxt
}

调度信息的sp中存储了runtime.goexit函数的程序计数器,而pc中存储了传入函数的程序计数器。因为pc寄存器的作用是存储程序接下来运行的位置,所以pc的使用比较好理解,但是sp中存储的runtime.goexit会让人感到困惑,我们需要配合下面的调度循环来理解它的作用。

标签:pp,runtime,gp,goroutine,调度,newg,go,gmp,stack
From: https://www.cnblogs.com/zpf253/p/17406845.html

相关文章

  • Golang接收者方法语法糖
    1、概述在《Golang常用语法糖》这篇博文中我们讲解Golang中常用的12种语法糖,在本文我们主要讲解下接收者方法语法糖。在介绍Golang接收者方法语法糖前,先简单说下Go语言的指针(Pointer),大致上理解如下:变量名前的& 符号,是取变量的内存地址,不是取值;数据类型前的* 符号,代表......
  • django系列-服务和环境配置(陆续完善中···)
    一、Mysql1、安装服务端yuminstallmariadb-server-ymariadb-server.x86_641:5.5.68-1.el7#版本2、安装客户端yuminstallmariadb-y#软件包1:mariadb-5.5.68-1.el7.x86_64已安装并且是最新版本3、服务配置4、帐号初始化二、Redis三、Python四、虚拟环境......
  • 【Go新手起步01】5步完成 vscode的go插件安装跟激活。
     首先下载vscode,进行两个插件安装,如图所示 然后下载go语言,在官网https://go.dev/doc/install下载 cmd打开,输入goversion验证下载是否成功。在dos页面输入goenv-wGO111MODULE=on                goenv-wGOPROXY=https://goproxy.cn,di......
  • golang vrrp + ipvs 实现简单的服务ha
    比较类似keeplived,但是是比较简单的集成参考图基于vrrp实现vip的处理,同时master以及backup安装基于vrrp+ipvs的程序,基于服务状态进行服务的切换处理 实现说明:对于vrrp处理可以基于包装的vrrpgolang(rongfengliang/vrrp)包,同时对于ipvs可以直接ipvs包(可以使用mqli......
  • c-for-go cgo 绑定自动生成工具
    c-for-go可以快速的生成cgo绑定代码的工具,目前有不少golang项目使用了此工具,比如cloudflare/ipvs也使用了此工具参考处理 参考使用这个是libvpx的一个项目yaml定义文件---GENERATOR:PackageName:vpxPackageDescription:"Packagevpxpro......
  • Django用递归实现查询所有子部门逻辑
    假设你已经定义好了部门模型Department,该模型包含以下字段:classDepartment(models.Model):name=models.CharField(max_length=100)parent_department=models.ForeignKey('self',on_delete=models.CASCADE,null=True,blank=True)其中,name表示部门名称,paren......
  • Golang URL query contains semicolon 报错解决方案
    ​ 报错信息http:URLquerycontainssemicolon,whichisnolongerasupportedseparator;partsofthequerymaybestrippedwhenparsed;seegolang.org/issue/25192 高版本http废除了分号做分隔符,会在http库中做报警输出,基础库代码如下:func(shserverHandle......
  • Go语言并发编程-cnblog
    并发编程并发vs并行举个形象点的例子并发可以理解为一边吃饭,一边喝水,因为人只有一个嘴一个咽喉,所以同一时刻饭和水只能有一样进入,二者只能交替进行并行可以理解为一边走路一边吃东西,因为走路是靠腿脚,吃东西是靠嘴,二者不相干,相当于两个独立的线程,因而可以同时进行Go语言......
  • python3 获取mongodb表中数据的条数
    说明:此处考虑了时区,mongodb默认使用"格林威治时间"1#!/usr/bin/python323importpymongo4importdatetime5importpytz67#统计8"""9/usr/bin/pip3install-Ivpymongo-ihttp://pypi.douban.com/simple/--trusted-hostpypi.douban.com......
  • mongoDB 批量将某个字段值等于另一个字段值
    将update_time字段的值设置为create_time的值:db.collection_name.find().forEach(function(item){db.collection_name.update({_id:item._id,create_time:{$exists:true}},{$set:{update_time:item.create_time}})}......