Go-Netpoller模型
Go netpoll 核心
Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。
上面的示例代码中相关的在源码里的几个数据结构和方法:
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
// Conn
type conn struct {
fd *netFD
}
type conn struct {
fd *netFD
}
func (c *conn) ok() bool { return c != nil && c.fd != nil }
// Implementation of the Conn interface.
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
netFD
net.Listen("tcp", ":8888")
方法返回了一个 *TCPListener,它是一个实现了 net.Listener
接口的 struct,而通过 listener.Accept()
接收的新连接 *TCPConn 则是一个实现了 net.Conn
接口的 struct,它内嵌了 net.conn
struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD
的数据结构的操作, netFD
是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。
netFD
和 poll.FD
的源码:
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
pollDesc
前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:
type pollDesc struct {
runtimeCtx uintptr
}
这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在 runtime.pollDesc
这里:
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
runtime.pollDesc
包含自身类型的一个指针,用来保存下一个 runtime.pollDesc
的地址,以此来实现链表,可以减少数据结构的大小,所有的 runtime.pollDesc
保存在 runtime.pollCache
结构中,定义如下:
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
net.Listen
调用 net.Listen
之后,底层会通过 Linux 的系统调用 socket
方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD
,接着调用 netFD 的 listenStream
方法完成对 socket 的 bind&listen 操作以及对 netFD
的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:
// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.
socketFunc func(int, int, int) (int, error) = syscall.Socket
// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
// ...
// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once
// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}
var (
// 全局唯一的 epoll fd,只在 listener fd 初始化之时被指定一次
epfd int32 = -1 // epoll descriptor
)
// netpollinit 会创建一个 epoll 实例,然后把 epoll fd 赋值给 epfd,
// 后续 listener 以及它 accept 的所有 sockets 有关 epoll 的操作都是基于这个全局的 epfd
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
// netpollopen 会被 runtime_pollOpen 调用,注册 fd 到 epoll 实例,
// 同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。
Listener.Accept()
netpoll
accept socket 的工作流程如下:
- 服务端的 netFD 在
listen
时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列 - netFD 在
accept
时将返回的 connFD 也加入 epoll 的事件队列 - netFD 在读写时出现
syscall.EAGAIN
错误,通过 pollDesc 的waitRead
方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的waitRead
中返回
Listener.Accept()
接收来自客户端的新连接,具体还是调用 netFD.accept
方法来完成这个功能:
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用 poll.FD 的 Accept 方法接受新的 socket 连接,返回 socket 的 fd
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 以 socket fd 构造一个新的 netFD,代表这个新的 socket
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 调用 netFD 的 init 方法完成初始化
if err = netfd.init(); err != nil {
fd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
netFD.accept
方法里会再调用 poll.FD.Accept
,最后会使用 Linux 的系统调用 accept
来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.
// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
pollDesc.waitRead
方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。
poll.FD.Accept() 返回之后,会构造一个对应这个新 socket 的 netFD,然后调用 init() 方法完成初始化,这个 init 过程和前面 net.Listen() 是一样的,调用链:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最终又会走到这里:
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
然后把这个 socket fd 注册到 listener 的 epoll 实例的事件队列中去,等待 I/O 事件。
Conn.Read/Conn.Write
我们先来看看 Conn.Read
方法是如何实现的,原理其实和 Listener.Accept
是一样的,具体调用链还是首先调用 conn 的 netFD.Read
,然后内部再调用 poll.FD.Read
,最后使用 Linux 的系统调用 read: syscall.Read
完成数据读取:
// Implementation of the Conn interface.
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
conn.Write
和 conn.Read
的原理是一致的,它也是通过类似 pollDesc.waitRead
的 pollDesc.waitWrite
来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite
的内部实现原理和 pollDesc.waitRead
是一样的,都是基于 runtime_pollWait
,这里就不再赘述。
pollDesc.waitRead
pollDesc.waitRead
内部调用了 runtime_pollWait
来达成无 I/O 事件时 park 住 goroutine 的目的:
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg
// 后面调用 gopark 之后,会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,接着重新调度 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
netpoll
前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由 _Grunning
为 _Gwaitting
),因此 G 必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。
所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait
,在 Go 源码中的 src/runtime/netpoll_epoll.go
文件中有一个 func netpoll(block bool) gList
方法,它会内部调用 epoll_wait
获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回:
// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
// 是否以阻塞模式调用 epoll_wait
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 获取就绪的 fd 列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
// 判断发生的事件类型,读类型或者写类型
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}
// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 通过万能指针还原成 g 并返回
return (*g)(unsafe.Pointer(old))
}
}
}
而 Go 在多种场景下都可能会调用 netpoll
检查文件描述符状态。寻找到 I/O 就绪的 socket fd,并找到这些 socket fd 对应的轮询器中附带的信息,根据这些信息将之前等待这些 socket fd 就绪的 goroutine 状态修改为 _Grunnable
。执行完 netpoll
之后,会返回一个就绪 fd 列表对应的 goroutine 列表,接下来将就绪的 goroutine 加入到调度队列中,等待调度运行。
首先,在 Go runtime scheduler 正常调度 goroutine 之时就有可能会调用 netpoll
获取到已就绪的 fd 对应的 goroutine 来调度执行:
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
// ...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// ...
}
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
// ...
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// ...
}
Go scheduler 的核心方法 schedule
里会调用一个叫 findrunable()
的方法获取可运行的 goroutine 来执行,而在 findrunable()
方法里就调用了 netpoll
获取已就绪的 fd 列表对应的 goroutine 列表。
另外, sysmon
监控线程也可能会调用到 netpoll
:
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
// ...
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
// 以非阻塞的方式调用 netpoll 获取就绪 fd 列表
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
// 将其插入调度器的runnable列表中(全局),等待被调度执行
injectglist(&list)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
Go runtime 在程序启动的时候会创建一个独立的 M 作为监控线程,叫 sysmon
,这个线程为系统级的 daemon 线程,无需 P 即可运行, sysmon
每 20us~10ms 运行一次。 sysmon
中以轮询的方式执行以下操作(如上面的代码所示):
- 以非阻塞的方式调用
runtime.netpoll
,从中找出能从网络 I/O 中唤醒的 G,并调用injectglist
,将其插入调度器的 runnable 列表中(全局),调度触发时,有可能从这个全局 runnable 列表获取 G。然后再循环调用startm
,直到所有 P 都不处于_Pidle
状态。 - 调用
retake
,抢占长时间处于_Psyscall
状态的 P。
综上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的帮助,设计出了自己的 I/O 多路复用 netpoll,成功地让 Listener.Accept
/ conn.Read
/ conn.Write
等方法从开发者的角度看来是同步模式。
参考文献
标签:pollDesc,return,err,nil,netpoll,fd,Go,runtime From: https://www.cnblogs.com/JujunWang/p/16909265.html