GMP 以及简单GC
后续会补上GC的实现原理 2022.12.16 23点12分
gc查看
func test() {
//slice 会动态扩容,用slice来做堆内存申请
container := make([]int, 8)
log.Println(" ===> loop begin.")
for i := 0; i < 32*1000*1000; i++ {
container = append(container, i)
}
log.Println(" ===> loop end.")
}
func main() {
log.Println("Start.")
test()
log.Println("force gc.")
runtime.GC() //强制调用gc回收
log.Println("Done.")
time.Sleep(3600 * time.Second) //睡眠,保持程序不退出
}
go build -o main.go main.go && GODEBUG='gotrace 1' ./main
gc 1 @0.001s 0%: 0.003+0.30+0.004 ms clock, 0.024+0/0.044/0.086+0.034 ms cpu, 4->5->2 MB, 5 MB goal, 8 P
编号 运行时长 gc模块占了 垃圾回收时长 垃圾回收占用cpu时间 堆大小 本次GC使用了2个P(调度器中的Processer)
会依次增加 运行时长的多少 STW 清除 + 并发扫描标记 + STW标记 gc开始前->gc接收后->当前活跃的推内存 全局堆大小
- 在test函数执行完后,demo程序中的切片容器所申请的堆空间都被垃圾回收器回收了。
- 如果此时在
top
指令查询内存的时候,如果依然是800+MB,说明垃圾回收器回收了应用层的内存后,(可能)并不会立即将内存归还给系统。
gmp
相关概念:
- runtime ,包含了 调度,内存管理,垃圾回收等内部数据结构的实现。
- scheduler,
The scheduler's job is to distribute ready-to-run goroutines over worker threads.
- TLS 线程内部数据(其他线程不可见),go的 协程实现十分依赖于TLS,会用于获取系统线程中当前的G和G所属的M实例。
- spaining 自旋
- systemstack,mcall,asmcgocall 每个M启动都有一个叫 g0的系统堆栈。
调度的基本组件
G(goroutine)
就是 goroutine ,调度的基本单位,存储了 goroutine 的执行 stack 信息,goroutine 状态以及G的任务函数(也就是我们 go func() 的 func ),G 眼中只有P,P就是G的"CPU"
P(processor)
逻辑的处理器,代表线程M的执行上下文。
P 的最大作用时其拥有各种G对象的队列,链表,cache和状态。
P的数量也代表了执行并发度,既有多少个goroutine可以同时运行(一般 MAXPROCS就是cpu 的核数)
P不执行任何代码,执行是由 M 来执行的,P提供环境如 缓存,defer pool(我们代码中使用到defer可能会条用defer more stack,可运行的G(从G队列队列拿或者偷)
M(mchine)
真正的执行计算资源,就是系统线程,
找P绑定然后找可运行的G,M可以是多个,与P的数量不一定时1:1,因为可能执行某个G的M因为某种原因阻塞,M再绑定P后,进入循环调度,并且M不保留G的状态,这是G可以跨M调度的基础。
GPM的关系示意图
大致的流程(这些图都是转载的)
1. golang 进程的启动
判断一个数是不是
2的幂次 x & (x - 1)
我们知道,运行一个程序最开始必定创建一个进程,这个进程有一个线程,我们称他为m0
,然后一个m或绑定一个g,成为g0
这边贴下 schedinit 的注释
// The bootstrap sequence is: 启动的大致流程
//
// call osinit
// call schedinit
// make & queue new G 创建队列并新建 runtime.main 的 G
// call runtime·mstart
//
// The new G calls runtime·main.
大致流程
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// 我们开始时,会将 m0->g0 = g0, g0->m = m0也就是将 g0 和 m0相互绑定,那么这个g0是干什么的呢?
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
CALL runtime·check(SB)
// main(argc,argv) 也就是传递的参数
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB) // CPU数量,页大小和 操作系统初始化工作。
CALL runtime·schedinit(SB) // 调度系统初始化, proc.go
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB) // 这就是 runtime.main
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB) // 开始执行程序
CALL runtime·abort(SB) // mstart should never return
RET
// Prevent dead-code elimination of debugCallV1, which is
// intended to be called by debuggers.
MOVQ $runtime·debugCallV1(SB), AX
RET
细看
**func osinit()**
func osinit() {
// 获得 cpu个数,最大页的大小
ncpu = getproccount()
physHugePageSize = getHugePageSize()
osArchInit()
}
func schedinit()
func schedinit() {
// 初始化调度相关的锁
lockInit(&sched.lock, lockRankSched)
lockInit(&sched.sysmonlock, lockRankSysmon)
lockInit(&sched.deferlock, lockRankDefer) // 保护 defer 池
...
// Enforce that this lock is always a leaf lock.
// All of this lock's critical sections should be
// extremely short.
lockInit(&memstats.heapStats.noPLock, lockRankLeafRank) // 内存状态
// raceinit must be the first call to race detector.
// In particular, it must be done before mallocinit below calls racemapshadow.
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit() // 轨迹?
}
sched.maxmcount = 10000 // maximum number of m's allowed (or die) 最大的M个数
// The world starts stopped.
worldStopped()
... 初始化工作
lock(&sched.lock)
sched.lastpoll = uint64(nanotime())
procs := ncpu // 确认cpu的数量有 osinit得到的
// 如果没有设置GOMAXPROCS 就按照cpu有几个核来。
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 根据给定的数量,少造,多截断并释放。
/*
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning // 如果当期的 P id<nprocs 继续用,
}else{
//release the current P and acquire allp[0].
释放 当前的P 并绑定 P[0] -> m
1.如果当前m绑定p了,取消p绑定m
2.取消m绑定p
3.去除 allp[0] 更改状态
// Associate p and the current m.
4. acquirep(allp[0])
*/
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
// World is effectively started now, as P's can run.
worldStarted()
...
}
func mstart()
// mstart is the entry-point for new Ms.
// It is written in assembly, uses ABI0, is marked TOPFRAME, and calls mstart0.
// 调用 mstart0
func mstart0() {
_g_ := getg()
osStack := _g_.stack.lo == 0 // 看是否初始化系统栈
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
//
// Note: these bounds may not be very accurate.
// We set hi to &size, but there are things above
// it. The 1024 is supposed to compensate this,
// but is somewhat arbitrary.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// Initialize stack guard so that we can start calling regular
// Go code.
_g_.stackguard0 = _g_.stack.lo + _StackGuard // 保护页
// This is the g0, so we can also call go:systemstack
// functions, which check stackguard1.
_g_.stackguard1 = _g_.stackguard0
mstart1() // 看下面
// Exit this thread.
if mStackIsSystemAllocated() {
// Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
// the stack, but put it in _g_.stack before mstart,
// so the logic above hasn't set osStack yet.
osStack = true
}
mexit(osStack)
}
// The go:noinline is to guarantee the getcallerpc/getcallersp below are safe,
// so that we can set up g0.sched to return to the call of mstart1 above.
//go:noinline
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 { // 新开的m运行的g 一定是 g0
throw("bad runtime·mstart")
}
// Set up m.g0.sched as a label returning to just
// after the mstart1 call in mstart0 above, for use by goexit0 and mcall.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
// And goexit0 does a gogo that needs to return from mstart1
// and let mstart0 exit the thread.
_g_.sched.g = guintptr(unsafe.Pointer(_g_))
_g_.sched.pc = getcallerpc()
_g_.sched.sp = getcallersp()
asminit()
minit()
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 { // 如果是 初始的线程 m0 的
mstartm0() // 初始化信号栈
/*
辅M线程 服务非Go线程(cgo产生)的回调的M。
func mstartm0()
// 创建一个额外的 M 服务 non-Go 线程(cgo 调用中产生的线程)的回调,并且只创建一个
// windows 上也需要额外 M 来服务 syscall.NewCallback 产生的回调
*/
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule() // 最重要的,启动调度系统从不返回
}
func schedule()
可以看到调度程序本质就是尽力找到可运行的g,然后去运行g上面的任务函数。查找g的流程如下,
- 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M。
- 每隔61次调度轮回从全局队列找,避免全局队列中的g被饿死。
- 从p.runnext获取g,从p的本地队列中获取。
- 调用 findrunnable (
local global netpoll steal
)找g,找不到的话就将m休眠,等待唤醒。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
top:
// 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M
if sched.gcwaiting != 0 {
gcstopm() // 为了STW,停止当前的M
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每隔61次调度,尝试从全局队列种获取G
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 从p的本地队列中获取
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil {
// 想尽办法找到可运行的G,找不到就不用返回了
// runqget gp := globrunqget(_p_, 0) 取 globalsize / maxprocs + 1 netpoll 这是个优化
// 然后随机从allp选择一个p然后从里面偷取
gp, inheritTime = findrunnable() // blocks until work is available
}
// 找到了g,那就执行g上的任务函数
execute(gp, inheritTime)
}
func main()
// The main goroutine.
func main() {
g := getg()
// Racectx of m0->g0 is used only as the parent of the main goroutine.
// It must not be used for anything else.
g.m.g0.racectx = 0
// 在系统栈上运行 sysmon
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
// 分配一个新的m,运行sysmon系统后台监控
// (定期垃圾回收和调度抢占)
newm(sysmon, nil, -1)
})
}
// Lock the main goroutine onto this, the main OS thread,
// during initialization. Most programs won't care, but a few
// do require certain calls to be made by the main thread.
// Those can arrange for main.main to run in the main thread
// by calling runtime.LockOSThread during initialization
// to preserve the lock.
lockOSThread()
// 确保是主线程
if g.m != &m0 {
throw("runtime.main not on m0")
}
main_init_done = make(chan bool) // init 函数
doInit(&main_inittask)
...
// 后台清理程序goroutine,并启用GC。
gcenable()
// 调用 main.main 也就是我们的主函数 ok结束!!!!!
fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn()
exit(0)
for {
var x *int32
*x = 0 // x 是 nil 空指针异常 保证 真正退出
}
}
调度机制
main 函数的启动过程正如上面一样,如果用户再代码中新建 goroutine ,runtime 该怎么管理呢?
TODO:
面试题一道
package main
import "fmt"
import "runtime"
func main() {
runtime.GOMAXPROCS(1)
for i :=0; i<10; i++ {
i := i
go func() {
fmt.Println("A:",i)
}()
}
var ch = make(chan int)
<- ch
}
============
A: 9
A: 0
A: 1
A: 2
A: 3
A: 4
A: 5
A: 6
A: 7
A: 8
解析,首先 i := i
,发生拷贝,避免 引用同一个i
然后设置一个 P ,那么我们就只有一个 local queue,
加上全局的queue,一共两个,按理说应该是顺序打印,为什么先打印 9 呢?
G的初始化
newproc(fn)
-> runqput(_p_, newg, true)
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
// 尝试将 g 放在local 可运行队列里面,如果next是false的话,那么加入可执行队列的尾部,如果next
// 是true的话,设置当前 p.runnext,如果当前队列满了的话,加入全局队列
func runqput(_p_ *p, gp *g, next bool) {
//这个是调度的随进行
if randomizeScheduler && next && fastrandn(2) == 0 {
next = false
}
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// 将旧的runext踢出常规运行队列。
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 将本地可运行队列中的g和一批工作放到全局队列中。
if runqputslow(_p_, gp, h, t) {
return
}
// 本地队列没满,那么一定能加入成功,重试。
goto retry
}