首页 > 编程语言 >从源码分析 Go 语言使用 cgo 导致的线程增长

从源码分析 Go 语言使用 cgo 导致的线程增长

时间:2023-06-05 10:25:58浏览次数:64  
标签:调用 cgo DI unsafe 源码 线程 mp MOVQ

TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。

转换 cgo 代码

对 driver-go/wrapper/taosc.go 进行转换

go tool cgo taosc.go

执行后生成 _obj 文件夹

go 代码分析

taosc.cgo1.goTaosResetCurrentDB 为例来分析。

// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
    func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
}

//go:linkname _cgoCheckPointer runtime.cgoCheckPointer
func _cgoCheckPointer(interface{}, interface{})

//go:cgo_unsafe_args
func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
    _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
    if _Cgo_always_false {
        _Cgo_use(p0)
    }
    return
}

//go:linkname _cgo_runtime_cgocall runtime.cgocall
func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32

//go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
//go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)

  • TaosResetCurrentDB 首先调用 _cgoCheckPointer 检查传入参数是否为 nil
  • //go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil 程序将会 panic
  • 接着调用 _Cfunc_taos_reset_current_db
  • Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
    • _cgo_runtime_cgocall 实现是 runtime.cgocall 这个会重点分析。
    • _cgo_453a0cad50ef_Cfunc_taos_reset_current_db 由上方最后代码块可以看出是 taos_reset_current_db 方法指针。
    • uintptr(unsafe.Pointer(&p0)) 表示 p0 的指针地址。
    • 由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。

分析 runtime.cgocall

基于 golang 1.20.4 分析该方法

func cgocall(fn, arg unsafe.Pointer) int32 {
    if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
        throw("cgocall unavailable")
    }

    if fn == nil {
        throw("cgocall nil")
    }

    if raceenabled {
        racereleasemerge(unsafe.Pointer(&racecgosync))
    }

    mp := getg().m // 获取当前 goroutine 的 M
    mp.ncgocall++  // 总 cgo 计数 +1
    mp.ncgo++      // 当前 cgo 计数 +1

    mp.cgoCallers[0] = 0 // 重置追踪

    entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收

    osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效

    mp.incgo = true // 修改状态
    errno := asmcgocall(fn, arg) // 真正进行方法调用的地方

    mp.incgo = false // 修改状态
    mp.ncgo-- // 当前 cgo 调用-1

    osPreemptExtExit(mp) // 恢复异步抢占

    exitsyscall() // 退出系统调用,恢复调度器控制


    if raceenabled {
        raceacquire(unsafe.Pointer(&racecgosync))
    }

    // 避免 GC 过早回收
    KeepAlive(fn)
    KeepAlive(arg)
    KeepAlive(mp)

    return errno
}

其中两个主要的方法 entersyscallasmcgocall,接下来对这两个方法进行着重分析。

分析 entersyscall

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

entersyscall 直接调用的 reentersyscall,关注下 reentersyscall 注释中的一段:

// If the syscall does not block, that is it, we do not emit any other events.
// If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;

如果 syscall 调用没有阻塞则不会触发任何事件,如果被阻塞 retaker 会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker 方法。

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        pp := allp[i]
        if pp == nil {
            continue
        }
        pd := &pp.sysmontick
        s := pp.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            t := int64(pp.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                preemptone(pp)
                sysretake = true
            }
        }
        // 从系统调用中抢占P
        if s == _Psyscall {
            // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
            t := int64(pp.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            unlock(&allpLock)
            incidlelocked(-1)
            if atomic.Cas(&pp.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(pp)
                    traceProcStop(pp)
                }
                n++
                pp.syscalltick++
                handoffp(pp)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp 方法

func handoffp(pp *p) {
    // ...
    // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M
    if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
        sched.needspinning.Store(0)
        startm(pp, true)
        return
    }
    // ...
}

handoffp 方法会调用 startm 来启动一个新的 M,跟到 startm 方法。

func startm(pp *p, spinning bool) {
    // ...
    nmp := mget()
    if nmp == nil {
        // 没有M可用,调用newm
        id := mReserveID()
        unlock(&sched.lock)

        var fn func()
        if spinning {
            fn = mspinning
        }
        newm(fn, pp, id)
        releasem(mp)
        return
    }
    // ...
}

此时如果没有 M startm 会调用 newm 创建一个新的 M,接下来分析 newm 方法。

func newm(fn func(), pp *p, id int64) {
    acquirem()
    mp := allocm(pp, fn, id)
    mp.nextp.set(pp)
    mp.sigmask = initSigmask
    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
        lock(&newmHandoff.lock)
        if newmHandoff.haveTemplateThread == 0 {
            throw("on a locked thread with no template thread")
        }
        mp.schedlink = newmHandoff.newm
        newmHandoff.newm.set(mp)
        if newmHandoff.waiting {
            newmHandoff.waiting = false
            notewakeup(&newmHandoff.wake)
        }
        unlock(&newmHandoff.lock)
        releasem(getg().m)
        return
    }
    newm1(mp)
    releasem(getg().m)
}

func newm1(mp *m) {
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        if asanenabled {
            asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock()
        // 创建新线程
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock()
    newosproc(mp)
    execLock.runlock()
}

newm 看出如果线程都在阻塞中则调用 newm1newm1 调用 _cgo_thread_start 创建新线程。

由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。

分析 asmcgocall

只分析 amd64
asm_amd64.s

TEXT ·asmcgocall(SB),NOSPLIT,$0-20
    MOVQ    fn+0(FP), AX
    MOVQ    arg+8(FP), BX

    MOVQ    SP, DX

    // 考虑是否需要切换到 m.g0 栈
    // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了
    get_tls(CX)
    MOVQ    g(CX), DI
    CMPQ    DI, $0
    JEQ nosave
    MOVQ    g_m(DI), R8
    MOVQ    m_gsignal(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    MOVQ    m_g0(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    
    // 切换到系统栈
    CALL    gosave_systemstack_switch<>(SB)
    MOVQ    SI, g(CX)
    MOVQ    (g_sched+gobuf_sp)(SI), SP

    // 于调度栈中(pthread 新创建的栈)
    // 确保有足够的空间给四个 stack-based fast-call 寄存器
    // 为使得 windows amd64 调用服务
    SUBQ    $64, SP
    ANDQ    $~15, SP // 为 gcc ABI 对齐
    MOVQ    DI, 48(SP) // 保存 g
    MOVQ    (g_stack+stack_hi)(DI), DI
    SUBQ    DX, DI
    MOVQ    DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)
    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
    MOVQ    BX, CX  // CX = Win64 第一个参数
    CALL    AX  // 调用 fn

    // 恢复寄存器、 g、栈指针
    get_tls(CX)
    MOVQ    48(SP), DI
    MOVQ    (g_stack+stack_hi)(DI), SI
    SUBQ    40(SP), SI
    MOVQ    DI, g(CX)
    MOVQ    SI, SP

    MOVL    AX, ret+16(FP)
    RET

nosave:
    // 在系统栈上运行,可能没有 g
    // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)
    // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)
    // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。
    // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。
    SUBQ    $64, SP
    ANDQ    $~15, SP // ABI 对齐
    MOVQ    $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用
    MOVQ    DX, 40(SP) // 保存原始的栈指针
    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
    MOVQ    BX, CX  // CX = Win64 第一个参数
    CALL    AX
    MOVQ    40(SP), SI // 恢复原来的栈指针
    MOVQ    SI, SP
    MOVL    AX, ret+16(FP)
    RET

这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。

产生问题

cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。

解决方案

限制 Go 程序最大线程数,默认为 cpu 核数。

runtime.GOMAXPROCS(runtime.NumCPU())

使用 channel 限制 cgo 最大并发数为 cpu 核数

package thread

import "runtime"

var c chan struct{}

func Lock() {
    c <- struct{}{}
}

func Unlock() {
    <-c
}

func init() {
    c = make(chan struct{}, runtime.NumCPU())
}

针对超过 20 微秒的 cgo 调用进行限制:

thread.Lock()
wrapper.TaosFreeResult(result)
thread.Unlock()

标签:调用,cgo,DI,unsafe,源码,线程,mp,MOVQ
From: https://www.cnblogs.com/t102011/p/17457120.html

相关文章

  • C语言多线程爬虫代码示例
    使用C语言编写多线程爬虫能够同时处理多条数据,提高了爬虫的并发度和效率。在编写多线程爬虫时仍需要注意线程安全性和错误处理机制,并根据系统资源和目标网站的特点调整线程数和优化并发策略,以提高程序效率和稳定性。以下是一个使用C语言多线程编写的简单爬虫示例,实现了并发爬取多......
  • Java:从单线程计数器到多线程数据同步synchronized和原子类Atomic
    (目录)使用单线程单线程修改计数器的值,没有发生问题,每次运行结果都是10000,不过程序耗时较长packagecom.example;/***计数器*/classCounter{privatestaticlongcount;publicstaticlonggetCount(){returncount;}publicstaticv......
  • 线程的引入
    线程为什么需要引入线程?一方面是计算机多核的提升,使得计算机的并行度越来越高,如果能够运行多个程序,将一个程序划分为多个线程同时执行,就比如一个程序一个进程由一步一步去做,和划分为好几个模块去分开由多个CPU去做,时间效率上高出了不少。另一方面是进程都拥有独立的虚拟空间,所......
  • 04.如何创建并运行java线程
    评:原文链接译者:章筱虎校对:方腾飞Java线程类也是一个object类,它的实例都继承自java.lang.Thread或其子类。可以用如下方式用java中创建一个线程:查看源代码打印帮助1Treadthread=newThread();执行该线程可以调用该线程的start()方法:查看源代码打印帮助1thre......
  • HMaster启动源码分析
       写之前先吐槽一下自己的sb公司环境,电脑上不了网,优盘又不能插。所以做点笔记基本上都是晚上回家再写一遍。哎,废话不说了   先贴个hbase在构造函数中起来的RPC服务的UML图:http://blackproof.iteye.com/blog/2029170   HMaster启动会调用Run方法,概述为  HBase启动......
  • 手把手实现springboot整合flowable、附源码-视频教程
    手把手实现springboot整合flowable、附源码-视频教程[toc]视频教程点击:https://www.bilibili.com/video/BV1fa411j7Q5/插件安装BPMN绘图可视化工具>FlowableBPMNvisualizer导入依赖<dependency><groupid>org.springframework.boot</groupid><artifact......
  • 必知必会:多线程
    1.线程的6种状态(1)New:初始状态,线程被创建,但是还没调用start方法。(2)Running:就绪状态和运行状态,统称为运行状态(3)Blocked:阻塞状态(4)Waiting:等待状态,需要等待其他线程做出特定的动作(通知或中断)。(5)Time-Waiting:超时等待状态,表示可以在指定的时间自行返回。(6)Terminated:终止状态,表示当前......
  • JBPM5 Designer 2.3源码问题
    最新本2.4发布,但是里面是使用Maven的module方式来管理,鉴于知识有限,不会这种方式,所以选择2.3版本的源码[color=blue][b]2.4[/b][/color]war:[url]http://sourceforge.net/projects/jbpm/files/designer/designer-2.4/[/url]源码:[url]https://github.com/dro......
  • Hibernate Tools源码的使用
    Eclipse中Hibernatetools的安装和使用[url]http://zhoualine.iteye.com/blog/1190141[/url]源码里面有一个很好的格式化类:org.hibernate.tool.hbm2x.[color=darkblue][b]XMLPrettyPrinter[/b][/color]调用publicvoidformatFiles(){ formatXml("......
  • dubbo源码学习(四)初始化过程细节:解析服务
    今天将真正去看dubbo内部的实现过程,看dubbo的源码前我先把dubbo的用户指南和开发指指南大概的看了一遍,这样再看dubbo源码比较轻松。从用户指南和开发指指南可以找到相应的切入点,今天将介绍的是dubbo的初始化解析bean的过程:解析服务基于dubbo.jar内的META......