go语言调度gmp原理
调度器启动
运行时通过runtime.schedinit初始化调度器
func schedinit() {
lockInit(&sched.lock, lockRankSched)
lockInit(&sched.sysmonlock, lockRankSysmon)
lockInit(&sched.deferlock, lockRankDefer)
lockInit(&sched.sudoglock, lockRankSudog)
lockInit(&deadlock, lockRankDeadlock)
lockInit(&paniclk, lockRankPanic)
lockInit(&allglock, lockRankAllg)
lockInit(&allpLock, lockRankAllp)
lockInit(&reflectOffs.lock, lockRankReflectOffs)
lockInit(&finlock, lockRankFin)
lockInit(&trace.bufLock, lockRankTraceBuf)
lockInit(&trace.stringsLock, lockRankTraceStrings)
lockInit(&trace.lock, lockRankTrace)
lockInit(&cpuprof.lock, lockRankCpuprof)
lockInit(&trace.stackTab.lock, lockRankTraceStackTab)
// Enforce that this lock is always a leaf lock.
// All of this lock's critical sections should be
// extremely short.
lockInit(&memstats.heapStats.noPLock, lockRankLeafRank)
// raceinit must be the first call to race detector.
// In particular, it must be done before mallocinit below calls racemapshadow.
gp := getg()
if raceenabled {
gp.racectx, raceprocctx0 = raceinit()
}
sched.maxmcount = 10000
// The world starts stopped.
worldStopped()
moduledataverify()
stackinit()
mallocinit()
godebug := getGodebugEarly()
initPageTrace(godebug) // must run after mallocinit but before anything allocates
cpuinit(godebug) // must run before alginit
alginit() // maps, hash, fastrand must not be used before this call
fastrandinit() // must run before mcommoninit
mcommoninit(gp.m, -1)
modulesinit() // provides activeModules
typelinksinit() // uses maps, activeModules
itabsinit() // uses activeModules
stkobjinit() // must run before GC starts
sigsave(&gp.m.sigmask)
initSigmask = gp.m.sigmask
goargs()
goenvs()
parsedebugvars()
gcinit()
// if disableMemoryProfiling is set, update MemProfileRate to 0 to turn off memprofile.
// Note: parsedebugvars may update MemProfileRate, but when disableMemoryProfiling is
// set to true by the linker, it means that nothing is consuming the profile, it is
// safe to set MemProfileRate to 0.
if disableMemoryProfiling {
MemProfileRate = 0
}
lock(&sched.lock)
sched.lastpoll.Store(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
// World is effectively started now, as P's can run.
worldStarted()
// For cgocheck > 1, we turn on the write barrier at all times
// and check all pointer writes. We can't do this until after
// procresize because the write barrier needs a P.
if debug.cgocheck > 1 {
writeBarrier.cgo = true
writeBarrier.enabled = true
for _, pp := range allp {
pp.wbBuf.reset()
}
}
if buildVersion == "" {
// Condition should never trigger. This code just serves
// to ensure runtime·buildVersion is kept in the resulting binary.
buildVersion = "unknown"
}
if len(modinfo) == 1 {
// Condition should never trigger. This code just serves
// to ensure runtime·modinfo is kept in the resulting binary.
modinfo = ""
}
}
在调度器初始函数执行的过程中会将maxmcount设置成10000,这是go语言程序能够创建的最大线程数。虽然最多可以创建10000个线程,但是可以同时运行的线程还是由GOMAXPROCS变量控制的。
我们从环境变量GOMAXPROCS获取了程序能够同时运行的最大处理器数之后,就会调用runtime.procresize更新程序中处理器的数量,此时整个程序不会执行任何用户goroutine,调度器也会进入锁定状态,runtime.procresize的执行过程如下
func procresize(nprocs int32) *p {
assertLockHeld(&sched.lock)
assertWorldStopped()
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
maskWords := (nprocs + 31) / 32
// Grow allp if necessary.
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
if maskWords <= int32(cap(idlepMask)) {
idlepMask = idlepMask[:maskWords]
timerpMask = timerpMask[:maskWords]
} else {
nidlepMask := make([]uint32, maskWords)
// No need to copy beyond len, old Ps are irrelevant.
copy(nidlepMask, idlepMask)
idlepMask = nidlepMask
ntimerpMask := make([]uint32, maskWords)
copy(ntimerpMask, timerpMask)
timerpMask = ntimerpMask
}
unlock(&allpLock)
}
// initialize new P's
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
gp := getg()
if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
// continue to use the current P
gp.m.p.ptr().status = _Prunning
gp.m.p.ptr().mcache.prepareForSweep()
} else {
// release the current P and acquire allp[0].
//
// We must do this before destroying our current P
// because p.destroy itself has write barriers, so we
// need to do that from a valid P.
if gp.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(gp.m.p.ptr())
}
gp.m.p.ptr().m = 0
}
gp.m.p = 0
pp := allp[0]
pp.m = 0
pp.status = _Pidle
acquirep(pp)
if trace.enabled {
traceGoStart()
}
}
// g.m.p is now set, so we no longer need mcache0 for bootstrapping.
mcache0 = nil
// release resources from unused P's
for i := nprocs; i < old; i++ {
pp := allp[i]
pp.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
idlepMask = idlepMask[:maskWords]
timerpMask = timerpMask[:maskWords]
unlock(&allpLock)
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
pp := allp[i]
if gp.m.p.ptr() == pp {
continue
}
pp.status = _Pidle
if runqempty(pp) {
pidleput(pp, now)
} else {
pp.m.set(mget())
pp.link.set(runnablePs)
runnablePs = pp
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
if old != nprocs {
// Notify the limiter that the amount of procs has changed.
gcCPULimiter.resetCapacity(now, nprocs)
}
return runnablePs
}
- 如果全局变量allp切片中的处理器数量少于期望数量,会对切片进行扩容
- 使用new创建新的处理器结构体,并调用runtime.p.init初始化刚刚扩容的处理器
- 通过指针将线程m0和处理器allp[0]绑定到一起
- 调用runtime.destroy释放不再使用的处理器结构
- 通过截断改变全局变量allp的长度保证与期望处理器数量相等
- 将除allp[0]外的处理器P全部设置成_Pidle并加入全局的空闲队列中
调用runtime.procresize是调度器启动的最后一步,之后调度器会启动响应数量的处理器,等待用户创建、运行新的goroutine并为goroutine调度处理器资源
创建goroutine
通过runtime.newproc函数调用,runtime.newproc的入参是参数大小和表示函数的指针funcval,它会获取goroutine以及调用方的程序计数器,然后调用runtime.newproc1函数获取新的goroutine、结构体、将其加入处理器的运行队列,并在满足条件时调用runtime.wakep唤醒新的处理器执行goroutine
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}
runtime.newproc1会根据传入参数初始化一个g结构体,我们将该函数分成一下三个部分介绍其实现:
- 获取或者创建新的goroutine结构体
- 将传入的参数移到goroutine的栈上
- 更新goroutine调度相关属性
首先了解goroutine结构体的创建过程
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
if fn == nil {
fatal("go of nil func value")
}
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if isSystemGoroutine(newg, false) {
sched.ngsys.Add(1)
} else {
// Only user goroutines inherit pprof labels.
if mp.curg != nil {
newg.labels = mp.curg.labels
}
if goroutineProfile.active {
// A concurrent goroutine profile is running. It should include
// exactly the set of goroutines that were alive when the goroutine
// profiler first stopped the world. That does not include newg, so
// mark it as not needing a profile before transitioning it from
// _Gdead.
newg.goroutineProfiled.Store(goroutineProfileSatisfied)
}
}
// Track initial transition?
newg.trackingSeq = uint8(fastrand())
if newg.trackingSeq%gTrackingPeriod == 0 {
newg.tracking = true
}
casgstatus(newg, _Gdead, _Grunnable)
gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))
if pp.goidcache == pp.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)
pp.goidcache -= _GoidCacheBatch - 1
pp.goidcacheend = pp.goidcache + _GoidCacheBatch
}
newg.goid = pp.goidcache
pp.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
if newg.labels != nil {
// See note in proflabel.go on labelSync's role in synchronizing
// with the reads in the signal handler.
racereleasemergeg(newg, unsafe.Pointer(&labelSync))
}
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
releasem(mp)
return newg
}
上述代码会先从处理器的gfree列表中查找空闲goroutine,如果不存在空闲goroutine,就会通过runtime.malg创建一个栈大小足够的新结构体
初始化结构体
runtime.gfget通过两种方式获取新的runtime.g
- 从goroutine所在处理器的gFree列表或者调度器的sched.gFree列表中获取runtime.g
- 调用runtime.malg生成一个新的runtime.g,并将结构体追加到全局的goroutine列表allgs中
rutime.gfget中包含两部分逻辑,他们会根据处理器中gFree列表中goroutine的数量做出不同的决策
- 当处理器的goroutine列表为空时,会将调度器持有的空闲goroutine转移到当前处理器上,直到gFree列表中的goroutine数量达到32
- 当处理器的goroutine数量充足时,会从列表头部返回一个新的goroutine
func gfget(pp *p) *g {
retry:
if pp.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// Move a batch of free Gs to the P.
for pp.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
pp.gFree.push(gp)
pp.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
gp := pp.gFree.pop()
if gp == nil {
return nil
}
pp.gFree.n--
if gp.stack.lo != 0 && gp.stack.hi-gp.stack.lo != uintptr(startingStackSize) {
// Deallocate old stack. We kept it in gfput because it was the
// right size when the goroutine was put on the free list, but
// the right size has changed since then.
systemstack(func() {
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
})
}
if gp.stack.lo == 0 {
// Stack was deallocated in gfput or just above. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(startingStackSize)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if asanenabled {
asanunpoison(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
当调度器的gFree和处理器的gFree列表都不存在结构体时,运行时会调用runtime.malg初始化新的runtime.g结构,如果申请的堆栈大小大于0,这里会通过runtime.stackalloc分配2KB大小的栈空间
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
// Clear the bottom word of the stack. We record g
// there on gsignal stack during VDSO on ARM and ARM64.
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}
runtime.malg返回的goroutine会存储到全局变量allgs中
运行队列
runtime.runqput会将goroutine放到运行队列上,这既可能是全局的运行队列也可能是处理器的本地运行队列
标签:pp,sched,gp,goroutine,调度,newg,go,gmp,stack From: https://www.cnblogs.com/zpf253/p/17400107.html