首页 > 其他分享 >go语言调度gmp原理

go语言调度gmp原理

时间:2023-05-14 20:33:26浏览次数:42  
标签:pp sched gp goroutine 调度 newg go gmp stack

go语言调度gmp原理

调度器启动

运行时通过runtime.schedinit初始化调度器

func schedinit() {
	lockInit(&sched.lock, lockRankSched)
	lockInit(&sched.sysmonlock, lockRankSysmon)
	lockInit(&sched.deferlock, lockRankDefer)
	lockInit(&sched.sudoglock, lockRankSudog)
	lockInit(&deadlock, lockRankDeadlock)
	lockInit(&paniclk, lockRankPanic)
	lockInit(&allglock, lockRankAllg)
	lockInit(&allpLock, lockRankAllp)
	lockInit(&reflectOffs.lock, lockRankReflectOffs)
	lockInit(&finlock, lockRankFin)
	lockInit(&trace.bufLock, lockRankTraceBuf)
	lockInit(&trace.stringsLock, lockRankTraceStrings)
	lockInit(&trace.lock, lockRankTrace)
	lockInit(&cpuprof.lock, lockRankCpuprof)
	lockInit(&trace.stackTab.lock, lockRankTraceStackTab)
	// Enforce that this lock is always a leaf lock.
	// All of this lock's critical sections should be
	// extremely short.
	lockInit(&memstats.heapStats.noPLock, lockRankLeafRank)

	// raceinit must be the first call to race detector.
	// In particular, it must be done before mallocinit below calls racemapshadow.
	gp := getg()
	if raceenabled {
		gp.racectx, raceprocctx0 = raceinit()
	}

	sched.maxmcount = 10000

	// The world starts stopped.
	worldStopped()

	moduledataverify()
	stackinit()
	mallocinit()
	godebug := getGodebugEarly()
	initPageTrace(godebug) // must run after mallocinit but before anything allocates
	cpuinit(godebug)       // must run before alginit
	alginit()              // maps, hash, fastrand must not be used before this call
	fastrandinit()         // must run before mcommoninit
	mcommoninit(gp.m, -1)
	modulesinit()   // provides activeModules
	typelinksinit() // uses maps, activeModules
	itabsinit()     // uses activeModules
	stkobjinit()    // must run before GC starts

	sigsave(&gp.m.sigmask)
	initSigmask = gp.m.sigmask

	goargs()
	goenvs()
	parsedebugvars()
	gcinit()

	// if disableMemoryProfiling is set, update MemProfileRate to 0 to turn off memprofile.
	// Note: parsedebugvars may update MemProfileRate, but when disableMemoryProfiling is
	// set to true by the linker, it means that nothing is consuming the profile, it is
	// safe to set MemProfileRate to 0.
	if disableMemoryProfiling {
		MemProfileRate = 0
	}

	lock(&sched.lock)
	sched.lastpoll.Store(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
	unlock(&sched.lock)

	// World is effectively started now, as P's can run.
	worldStarted()

	// For cgocheck > 1, we turn on the write barrier at all times
	// and check all pointer writes. We can't do this until after
	// procresize because the write barrier needs a P.
	if debug.cgocheck > 1 {
		writeBarrier.cgo = true
		writeBarrier.enabled = true
		for _, pp := range allp {
			pp.wbBuf.reset()
		}
	}

	if buildVersion == "" {
		// Condition should never trigger. This code just serves
		// to ensure runtime·buildVersion is kept in the resulting binary.
		buildVersion = "unknown"
	}
	if len(modinfo) == 1 {
		// Condition should never trigger. This code just serves
		// to ensure runtime·modinfo is kept in the resulting binary.
		modinfo = ""
	}
}

在调度器初始函数执行的过程中会将maxmcount设置成10000,这是go语言程序能够创建的最大线程数。虽然最多可以创建10000个线程,但是可以同时运行的线程还是由GOMAXPROCS变量控制的。

我们从环境变量GOMAXPROCS获取了程序能够同时运行的最大处理器数之后,就会调用runtime.procresize更新程序中处理器的数量,此时整个程序不会执行任何用户goroutine,调度器也会进入锁定状态,runtime.procresize的执行过程如下

func procresize(nprocs int32) *p {
	assertLockHeld(&sched.lock)
	assertWorldStopped()

	old := gomaxprocs
	if old < 0 || nprocs <= 0 {
		throw("procresize: invalid arg")
	}
	if trace.enabled {
		traceGomaxprocs(nprocs)
	}

	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	maskWords := (nprocs + 31) / 32

	// Grow allp if necessary.
	if nprocs > int32(len(allp)) {
		// Synchronize with retake, which could be running
		// concurrently since it doesn't run on a P.
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs)
			// Copy everything up to allp's cap so we
			// never lose old allocated Ps.
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}

		if maskWords <= int32(cap(idlepMask)) {
			idlepMask = idlepMask[:maskWords]
			timerpMask = timerpMask[:maskWords]
		} else {
			nidlepMask := make([]uint32, maskWords)
			// No need to copy beyond len, old Ps are irrelevant.
			copy(nidlepMask, idlepMask)
			idlepMask = nidlepMask

			ntimerpMask := make([]uint32, maskWords)
			copy(ntimerpMask, timerpMask)
			timerpMask = ntimerpMask
		}
		unlock(&allpLock)
	}

	// initialize new P's
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}

	gp := getg()
	if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
		// continue to use the current P
		gp.m.p.ptr().status = _Prunning
		gp.m.p.ptr().mcache.prepareForSweep()
	} else {
		// release the current P and acquire allp[0].
		//
		// We must do this before destroying our current P
		// because p.destroy itself has write barriers, so we
		// need to do that from a valid P.
		if gp.m.p != 0 {
			if trace.enabled {
				// Pretend that we were descheduled
				// and then scheduled again to keep
				// the trace sane.
				traceGoSched()
				traceProcStop(gp.m.p.ptr())
			}
			gp.m.p.ptr().m = 0
		}
		gp.m.p = 0
		pp := allp[0]
		pp.m = 0
		pp.status = _Pidle
		acquirep(pp)
		if trace.enabled {
			traceGoStart()
		}
	}

	// g.m.p is now set, so we no longer need mcache0 for bootstrapping.
	mcache0 = nil

	// release resources from unused P's
	for i := nprocs; i < old; i++ {
		pp := allp[i]
		pp.destroy()
		// can't free P itself because it can be referenced by an M in syscall
	}

	// Trim allp.
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		idlepMask = idlepMask[:maskWords]
		timerpMask = timerpMask[:maskWords]
		unlock(&allpLock)
	}

	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		pp := allp[i]
		if gp.m.p.ptr() == pp {
			continue
		}
		pp.status = _Pidle
		if runqempty(pp) {
			pidleput(pp, now)
		} else {
			pp.m.set(mget())
			pp.link.set(runnablePs)
			runnablePs = pp
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	if old != nprocs {
		// Notify the limiter that the amount of procs has changed.
		gcCPULimiter.resetCapacity(now, nprocs)
	}
	return runnablePs
}
  1. 如果全局变量allp切片中的处理器数量少于期望数量,会对切片进行扩容
  2. 使用new创建新的处理器结构体,并调用runtime.p.init初始化刚刚扩容的处理器
  3. 通过指针将线程m0和处理器allp[0]绑定到一起
  4. 调用runtime.destroy释放不再使用的处理器结构
  5. 通过截断改变全局变量allp的长度保证与期望处理器数量相等
  6. 将除allp[0]外的处理器P全部设置成_Pidle并加入全局的空闲队列中

调用runtime.procresize是调度器启动的最后一步,之后调度器会启动响应数量的处理器,等待用户创建、运行新的goroutine并为goroutine调度处理器资源

创建goroutine

通过runtime.newproc函数调用,runtime.newproc的入参是参数大小和表示函数的指针funcval,它会获取goroutine以及调用方的程序计数器,然后调用runtime.newproc1函数获取新的goroutine、结构体、将其加入处理器的运行队列,并在满足条件时调用runtime.wakep唤醒新的处理器执行goroutine

func newproc(fn *funcval) {
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, gp, pc)

		pp := getg().m.p.ptr()
		runqput(pp, newg, true)

		if mainStarted {
			wakep()
		}
	})
}

runtime.newproc1会根据传入参数初始化一个g结构体,我们将该函数分成一下三个部分介绍其实现:

  1. 获取或者创建新的goroutine结构体
  2. 将传入的参数移到goroutine的栈上
  3. 更新goroutine调度相关属性

首先了解goroutine结构体的创建过程

func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
	if fn == nil {
		fatal("go of nil func value")
	}

	mp := acquirem() // disable preemption because we hold M and P in local vars.
	pp := mp.p.ptr()
	newg := gfget(pp)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
	totalSize = alignUp(totalSize, sys.StackAlign)
	sp := newg.stack.hi - totalSize
	spArg := sp
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}

	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if isSystemGoroutine(newg, false) {
		sched.ngsys.Add(1)
	} else {
		// Only user goroutines inherit pprof labels.
		if mp.curg != nil {
			newg.labels = mp.curg.labels
		}
		if goroutineProfile.active {
			// A concurrent goroutine profile is running. It should include
			// exactly the set of goroutines that were alive when the goroutine
			// profiler first stopped the world. That does not include newg, so
			// mark it as not needing a profile before transitioning it from
			// _Gdead.
			newg.goroutineProfiled.Store(goroutineProfileSatisfied)
		}
	}
	// Track initial transition?
	newg.trackingSeq = uint8(fastrand())
	if newg.trackingSeq%gTrackingPeriod == 0 {
		newg.tracking = true
	}
	casgstatus(newg, _Gdead, _Grunnable)
	gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))

	if pp.goidcache == pp.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)
		pp.goidcache -= _GoidCacheBatch - 1
		pp.goidcacheend = pp.goidcache + _GoidCacheBatch
	}
	newg.goid = pp.goidcache
	pp.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
		if newg.labels != nil {
			// See note in proflabel.go on labelSync's role in synchronizing
			// with the reads in the signal handler.
			racereleasemergeg(newg, unsafe.Pointer(&labelSync))
		}
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	releasem(mp)

	return newg
}

上述代码会先从处理器的gfree列表中查找空闲goroutine,如果不存在空闲goroutine,就会通过runtime.malg创建一个栈大小足够的新结构体

img

初始化结构体

runtime.gfget通过两种方式获取新的runtime.g

  1. 从goroutine所在处理器的gFree列表或者调度器的sched.gFree列表中获取runtime.g
  2. 调用runtime.malg生成一个新的runtime.g,并将结构体追加到全局的goroutine列表allgs中

rutime.gfget中包含两部分逻辑,他们会根据处理器中gFree列表中goroutine的数量做出不同的决策

  • 当处理器的goroutine列表为空时,会将调度器持有的空闲goroutine转移到当前处理器上,直到gFree列表中的goroutine数量达到32
  • 当处理器的goroutine数量充足时,会从列表头部返回一个新的goroutine
func gfget(pp *p) *g {
retry:
	if pp.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
		lock(&sched.gFree.lock)
		// Move a batch of free Gs to the P.
		for pp.gFree.n < 32 {
			// Prefer Gs with stacks.
			gp := sched.gFree.stack.pop()
			if gp == nil {
				gp = sched.gFree.noStack.pop()
				if gp == nil {
					break
				}
			}
			sched.gFree.n--
			pp.gFree.push(gp)
			pp.gFree.n++
		}
		unlock(&sched.gFree.lock)
		goto retry
	}
	gp := pp.gFree.pop()
	if gp == nil {
		return nil
	}
	pp.gFree.n--
	if gp.stack.lo != 0 && gp.stack.hi-gp.stack.lo != uintptr(startingStackSize) {
		// Deallocate old stack. We kept it in gfput because it was the
		// right size when the goroutine was put on the free list, but
		// the right size has changed since then.
		systemstack(func() {
			stackfree(gp.stack)
			gp.stack.lo = 0
			gp.stack.hi = 0
			gp.stackguard0 = 0
		})
	}
	if gp.stack.lo == 0 {
		// Stack was deallocated in gfput or just above. Allocate a new one.
		systemstack(func() {
			gp.stack = stackalloc(startingStackSize)
		})
		gp.stackguard0 = gp.stack.lo + _StackGuard
	} else {
		if raceenabled {
			racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if msanenabled {
			msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
		if asanenabled {
			asanunpoison(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
		}
	}
	return gp
}

当调度器的gFree和处理器的gFree列表都不存在结构体时,运行时会调用runtime.malg初始化新的runtime.g结构,如果申请的堆栈大小大于0,这里会通过runtime.stackalloc分配2KB大小的栈空间

func malg(stacksize int32) *g {
	newg := new(g)
	if stacksize >= 0 {
		stacksize = round2(_StackSystem + stacksize)
		systemstack(func() {
			newg.stack = stackalloc(uint32(stacksize))
		})
		newg.stackguard0 = newg.stack.lo + _StackGuard
		newg.stackguard1 = ^uintptr(0)
		// Clear the bottom word of the stack. We record g
		// there on gsignal stack during VDSO on ARM and ARM64.
		*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
	}
	return newg
}

runtime.malg返回的goroutine会存储到全局变量allgs中

运行队列

runtime.runqput会将goroutine放到运行队列上,这既可能是全局的运行队列也可能是处理器的本地运行队列

标签:pp,sched,gp,goroutine,调度,newg,go,gmp,stack
From: https://www.cnblogs.com/zpf253/p/17400107.html

相关文章

  • Matlab代码:含冰蓄冷空调的冷热电联供型微网多时间尺度优化调度(复现!!)
    Matlab代码:含冰蓄冷空调的冷热电联供型微网多时间尺度优化调度(复现!!)关键词:冷热电联供优化调度多时间尺度微网优化滚动优化混合整数线性规划冰蓄冷空调运行条件:matlab+cplex求解器内容:提出含冰蓄冷空调的微电网多时间尺度优化调度模型,研究冰蓄冷空调的不同运行方式对优化调......
  • Matlab 代码:计及电动汽车灵活性的微网/虚拟电厂多时间尺度协调调度模型
    Matlab代码:计及电动汽车灵活性的微网/虚拟电厂多时间尺度协调调度模型关键词电动汽车优化微网虚拟电厂vpp多时间尺度优化调度系统灵活性[火][火][火]摘要:构建了含有电动汽车参与的微网/虚拟电厂多时间尺度协调优化模型,其中包括日前-日内-实时三阶段,日前阶段由于风光出......
  • 计及碳捕集电厂低碳特性需求响应综合能源系统多时间尺度调度模型
    计及碳捕集电厂低碳特性需求响应综合能源系统多时间尺度调度模型关键词:碳捕集电厂综合灵活运行方式需求响应日前调度实时调度多时间尺度参考文档:《计及碳捕集电厂低碳特性的含风电电力系统源-荷多时间尺度调度方法》非完全复现,只做了日前日内部分,并在上述基础上改进升级为......
  • MATLAB代码:考虑V2G的光储充一体化微网多目标优化调度策略 关键词
    MATLAB代码:考虑V2G的光储充一体化微网多目标优化调度策略关键词:光储充微网电电汽车V2G多目标优化蓄电池优化调度参考文档:《光伏微网下考虑V2G补偿蓄电池容量的双目标优化调度策略》仿真平台:MATLAB主要内容:[钉子][钉子]过建立光伏微网中以经济性和并网负荷波动率为双目标的蓄......
  • MATLAB代码:基于元模型优化的虚拟电厂主从博弈优化调度模
    MATLAB代码:基于元模型优化的虚拟电厂主从博弈优化调度模型关键词:元模型虚拟电厂主从博弈优化调度参考文档:《基于元模型优化算法的主从博弈多虚拟电厂动态定价和能量管理》复现元模型仿真平台:MATLAB+CPLEX平台主要内容:代码主要做的是虚拟电厂的优化调度策略,其实是多虚拟电厂/微......
  • MATLAB代码:含电动汽车参与园区综合能源系统优化调度模型 关键词:
    MATLAB代码:含电动汽车参与园区综合能源系统优化调度模型关键词:电动汽车改进粒子群综合能源优化调度园区仿真平台:MATLAB主要内容:代码主要做的是一个含有系统能源运营商、分布式光伏用户、电动汽车充电代理商的园区综合能源系统,分析了三种市场交易主体的属性以及市场交易机制,建......
  • Matlab代码:综合能源系统(IES)的优化调度 关键词:[闪亮]综合能源系
    Matlab代码:综合能源系统(IES)的优化调度关键词:[闪亮]综合能源系统冷热电联产优化调度微网优化粒子群算法[闪亮]设备:风力、光伏、燃气轮机、燃气内燃机、燃气锅炉、余热回收系统、吸收式制冷机、电制冷机、蓄电池等设备。负荷类型:冷、热、电优化目标:IES(综合能源系统)的运行成......
  • MATLAB代码:基于多时间尺度滚动优化的多能源微网双层调度模型 关键词:
    MATLAB代码:基于多时间尺度滚动优化的多能源微网双层调度模型关键词:多能源微网多时间尺度滚动优化微网双层模型调度主要内容:代码主要是一个多能源微网的优化调度问题,首先对于下层多能源微网模型,考虑以其最小化运行成本为目标函数,通过多时间尺度滚动优化求解其最优调度策略,对于......
  • 综合能源系统优化调度(冷热电联产)的程序matlab、微网优化调度基础学习 综合能源系统
    综合能源系统优化调度(冷热电联产)的程序matlab、微网优化调度基础学习综合能源系统采用多目标粒子群算法,求解优化调度模型。适合刚入门综合能源研究方向(冷热电联供)并想在前人的基础上进行创新的同学。程序注释清晰明了,易懂上手快,优化套路都是相通的!!!参考文献:请同学知网下载《基......
  • Go语言中List 基本用法与源码详解
    Go-list在Go语言的标准库中,提供了一个container包,这个包中提供了三种数据类型,就是heap,list和ring,本节要讲的是list的使用以及源码剖析。要使用Go提供的list链表,则首先需要导入list包,如下所示:packagemainimport("container/list")导入包之后,需要了解list中定义了两种数据......