首页 > 其他分享 >go network poller 一

go network poller 一

时间:2023-12-04 17:03:38浏览次数:42  
标签:协程 network epoll syscall mode pd go poller Socket

网络基础

协议架构

tcp链接

假如需要开发者去实现一套新的网络协议(例如 redis 的resp), 是基于TCP的, 那tcp这层的协议,是否需要开发者自己去实现?

这层如果自己实现, 其实很复杂, 会涉及很多算法相关.

因此, 出现了 socket 对传输层进行了抽象, 开发者不需要关注传输层具体的实现, 使用socket提供的接口, socket内部会实现,比如三次握手, 四次挥手.

Socket

  很多系统都提供 Socket 作为 TCP(也有UDP) 网络连接的抽象,

  Linux-> Internet domain socket -> SOCK STREAM

  Linux 中 Socket 以 “文件描述符〞FD 作为标识

每建立一次连接接, sever都会创建一个新的 socket 专门和client通信, 原来监听的socket还是一直处于监听状态.

socket 之前如何进行通信, 假设client 给 server发送消息, sever 没有回复, 那client是阻塞, 还是先执行别的工作, 这就是IO模型的范畴了.

IO模型

阻塞

非阻塞

多路复用

阻塞

当read中没有数据过来, 线程(sever或者client都一样)会阻塞等待.

同步读写Socket时,线程陷入内核态

当读写成功后,切换回用户态,继续执行

优点:开发难度小,代码简单

缺点:内核态切换开销大

非阻塞

如果暂时无法收发数据,会返回错误

应用会不断轮询,直到Socket可以读写

优点:不会陷入内核态,自由度高

缺点:需要自旋轮询

多路复用

大厂面试经常喜欢问的 epoll就是 多路复用的一种实现, 还有 select, poll 都是, 这三个主要的区别: 在返回数据时候的读取方式.

业务把关注的事件注册到 event poll池中, 当epoll接受到对应事件后,通知业务.

注册多个Socket事件

调用epool,当有事件发生,返回

优点:提供了事件列表,不需要查询轮询各个Scoket

缺点:开发难度大,逻辑复杂

epoll 在 Mac: kqueue ; Windows: IOCP 

小结

操作系统提供了Socket作为TCP和UDP通信的抽象

IO模型指的是操作Socket的方案

阻塞模型最利于业务编写,但是性能差

多路复用性能好,但业务编写麻烦

go socket的实现

实现方法

  1. 对系统的 epoll进行封装, 因为go是可以在多个系统上运行的, 底层需要对平台差异化封装.
  1. 如果用户去调用 封装好的 epoll方法, 需要开发者执行去关注 epoll的状态, 了解epoll的大致实现原理,肯定知道还需要去 解析epoll的返回, 然后 分析那些 socket 有事件来了. 这些工作都是通用性的,所以,go把这层一起封装在了它的 网络层中.
  1. 然后,go为了更加简化使用, 采用了 阻塞的思想, 一个协程对应一个 socket连接 , 当epoll 没有对应事件返回时候, 就休眠等待, 有事件来了,就唤醒处理. 这样就简化了上层开发者的使用.

在底层使用操作系统的多路复用10

在协程层次使用阻塞模型

阻塞协程时,休眠协程

具体抽象 go代码相关

多路复用器

各个系统的多路复用都有以下功能: 系统原生

1. 新建多路复用器 epoll create()
2. 往多路复用器里插入需要监听的事件 epoll ctl()
3. 查询发生了什么事件 epoll wait()

Go Network Poller 多路复用器的抽象

epoll create() -> netpollinit()  // 新建

epoll cti0 -> netpollopen()   // 插入事件

epoll wait -> netooll()        // 查询

netpollinit 新建多路复用器

func netpollinit() {
	var errno uintptr
    // 新建了 epoll 系统底层实现 
	epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
	 // 这里的 epfd是个全局变量,初始化之后, 在操作时候就用这个 epfd

    // 新建管道 Linux的管道,用于多线程多进程间的通信, 用于结束这个epoll
	r, w, errpipe := nonblockingPipe()
	
	ev := syscall.EpollEvent{
		Events: syscall.EPOLLIN,
	}

	*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
     // 管道中加入了事件 
	errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
	
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

归纳:

新建了一个底层 epoll对象  epfd

新建一个pipe管道用于中断Epoll

将“管道有数据到达〞 事件注册在Epoll中

netpollopen() 插入事件

func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    // fd 是socket的对象  pd是 socket 和协程的对应关系

	var ev syscall.EpollEvent
	ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET

	tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
	*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
    //  上面这两行代码 是在把 ev.Data 和  pd 联系起来,  后面epoll 返回时候, 就能直接和pd的协程联系起来

    // 将fd要关注的事件, 注册到 epoll中, 调用的是 EpollCtl 和上面也对上了
	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}


type pollDesc struct {
	_     sys.NotInHeap
	link  *pollDesc      // in pollcache, protected by pollcache.lock
	fd    uintptr        //  socket ID
	fdseq atomic.Uintptr // protects against stale pollDesc

	atomicInfo atomic.Uint32 // atomic pollInfo
    // 想要读的协程
	rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
    // 想要写的协程
	wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

	lock    mutex // protects the following fields
	closing bool
	user    uint32    // user settable cookie
	rseq    uintptr   // protects from stale read timers
	rt      timer     // read deadline timer (set if rt.f != nil)
	rd      int64     // read deadline (a nanotime in the future, -1 when expired)
	wseq    uintptr   // protects from stale write timers
	wt      timer     // write deadline timer
	wd      int64     // write deadline (a nanotime in the future, -1 when expired)
	self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

归纳:

传入一个Socket的FD,和pollDesc指针

pollDesc指针是Socket相关详细信息

pollIDesc中记录了哪个协程休眠在等待此Socket

将Socket可读、可写、断开事件注册到Epoll中

netpoll 查询

这里有必要讲下 epoll的使用的一些特点, 能更好理解go中的实现.

epoll_wait 是阻塞函数, 但是可以传一个 超时时间, 超过时间后就先返回, 不阻塞了。

int epoll_wait(int epfd,struct epoll_event * events, int maxevents,int timeout)

epfd epoll的id ,就是创建时候的 epfd.
epoll_event 是传回有事件的fd和even的记录
timeout 是 超时时间
返回值 是指示 epoll_event 中,前多个元素,是有值的, 避免遍历.

// 超时时间 delay
 func netpoll(delay int64) gList {

	var events [128]syscall.EpollEvent
retry:
  // 去调了系统的 EpollWait 然后 看参数, 第二个参数里面是包含了 有事件的even
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
  // n 就表示 events 前多少个是有值的
	var toRun gList
	for i := int32(0); i < n; i++ {
	        var mode int32
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
			pd := (*pollDesc)(tp.pointer())
			tag := tp.tag()
          // 通过ev.Data  和 pd关联了起来  参入事件的时候, 有讲过这个
			if pd.fdseq.Load() == tag {
				pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
				netpollready(&toRun, pd, mode) // 通过pd ,获取到了对应的 协程队列
			}
          }
	}
  return toRun // 返回的是 关注这个事件的协程队列
}

归纳:

调用epoll wait(),查询有哪些事件发生

根据Socket相关的pollDesc信息,返回哪些协程可以唤醒

Go Network Poller 如何工作

收发数据

场景 1:Socket 已经可读写

  1. netpoll 方法需要循环执行,这样才能及时获得那些事件有返回了, 才能通知对应的协程.
    谁来调这个方法, 入口方法放到了 gcStart , 因为 go会保证一段时间,一定会调下 gc, 在讲 go 内存,垃圾回收时候讲过.

再来看 netpoll()netpollready() 的实现:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' { // 可读
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' { // 可写
		wg = netpollunblock(pd, 'w', true)
	}
     // 通过上面的方法,标记好了 pdReady之后,
	if rg != nil {
		toRun.push(rg) //把对应的协程,加入到一个可执行的 队列中,一个链表
	}
	if wg != nil {
		toRun.push(wg)
	}
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg // 默认是 可读协程
	if mode == 'w' { //如果是 可写 w, 就取可写的协程
		gpp = &pd.wg
	}
    // 下面的代码就是把 对应的 rg 或者 wg 这个字段改成了  pdReady, 标记下有事件来了.
	for {
        // 这里的old值 , 第一次取可能是一个等待协程的地址 ,通过下面的情景能看到
		old := gpp.Load()
		if old == pdReady {
			return nil
		}
		if old == pdNil && !ioready {
			return nil
		}
		var new uintptr
		if ioready {
			new = pdReady
		}
		if gpp.CompareAndSwap(old, new) { // 如果值相等,就写入新的值
			if old == pdWait {
				old = pdNil
			}
			return (*g)(unsafe.Pointer(old)) // 当old为一个协程的地址,这里返回的就是一个协程
		}
	}
}

上面方法就是把 pollDesc 的 rg或者wg置为  pdReady

当协程去调用 poll_runtime_pollWait() 方法 询问时候, 发现已经是 ready状态,就开始进行 读写

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
// 这种方式 可以调用声明包中的小写方法
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))

for !netpollblock(pd, int32(mode), false) { // 这里就是判断这个 pd的rg或者wg 是否是 ready状态
	errcode = netpollcheckerr(pd, int32(mode))
	if errcode != pollNoError {
		return errcode
	}
}
return pollNoError

}

归纳:

runtime 循环调用 netpoll() 方法 (g0协程 的gcStart 垃圾回收开始方法)

发现Socket可读写时,给对应的rg或者wg置为pdReady(1)

协程调用poll_runtime_pollWait()

判断rg或者wg已经置为pdReady(1),返向0

场景 2:Socket 暂时无法读写

需要深入看上面那个判断 pdrg是否为ready状态的方法
当协程去调用 poll_runtime_pollWait()

会跳转到:

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		
		if gpp.CompareAndSwap(pdReady, pdNil) {
			return true
		}
		if gpp.CompareAndSwap(pdNil, pdWait) {
			break
		}
		if v := gpp.Load(); v != pdReady && v != pdNil {
			throw("runtime: double wait")
		}
	}

	// 如果不是ready ,那么协程就会休眠等待,  并把协程的地址,赋值给 pd的rg或wg,在`netpollblockcommit`中
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
	}
	// be careful to not lose concurrent pdReady notification
	old := gpp.Swap(pdNil)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

归纳:

runtime 循环调用netpoll()方法 (g0协程)
协程调用poll_runtime_pollWait()
发现对应的rg或者wg为0
给对应的rg或者wg置为协程地址
休眠等待

当有事件来的时候, netpoll 中返回了 对应事件的 glist, 然后,runtime就会通知这些协程开始工作.

runtime 循环调用 netpoll() 方法 (g0协程)
发现Socket可读写时,给对应的查看对应的rg或者wg
若为协程地址,返回协程地址
调度器开始调度对应协程

大体的机构如下:

标签:协程,network,epoll,syscall,mode,pd,go,poller,Socket
From: https://www.cnblogs.com/studyios/p/17875360.html

相关文章

  • win10 访问 ubuntu 虚拟机 上的Django web 服务 操作 和 问题解决
    虚拟机版本VMware16proubuntu版本 Ubuntu22.04.1LTS 第一步:虚拟机设置NATEdit>VirtualNetworkEditor修改配置更改DHCP设置要注意ip地址要用在虚拟机Ubuntu系统中的网段范围 在NAT添加端口转发 查看ubuntu防火墙sudoufwstatus Status:ina......
  • go-carbon v2.2.14 发布,轻量级、语义化、对开发者友好的 Golang 时间处理库
    carbon是一个轻量级、语义化、对开发者友好的golang时间处理库,支持链式调用。目前已被awesome-go收录,如果您觉得不错,请给个star吧github.com/golang-module/carbongitee.com/golang-module/carbon安装使用Golang版本大于等于1.16//使用github库goget-ugithu......
  • 用Gogs私有化部署git服务
    Gogs官网1.下载安装gitee源码地址girhub源码地址go写的gogs客户端源码安装方法参考,二进制下载安装方法参考安装过程可参考首次安装可手动启动,cmd命令行下切换到gogs.exe所在目录,执行“e:/gogs/gogs.exeweb”,windows下不能直接不带目录而执行"gogs.exeweb",貌似会找不到目......
  • Golang使用kcp
    安装goget-ugithub.com/xtaci/kcp-goimport("fmt""github.com/xtaci/kcp-go""golang.org/x/net/ipv4""golang.org/x/net/ipv6""net")//KCP服务器funcserver(){//创建一个UDP连接u......
  • Django学习(一) 之 环境搭建
    写在前面最近比较迷AI绘图,那就上个图吧,我感觉还挺好看的。可能会有人说,之前不一致分享的是flask吗,怎么突然改到django了?这个问题问得好,开发环境遇到了一些小困难!不过django,真的是很流行,一点都不过时,这您放心好了!不多说,直接看效果吧!环境搭建1、当前环境版本python==3.9.10django==......
  • install goldendict-ng-git
    manjaro升级后,goldendict不能启动了,重新安装总是出错,原来是官方的版本不支持qt6了,查阅archlinux文档,获知goldendic-ng-git支持qt6,安装步骤如下,需要的同学可以参看。1.获取PKGBUILD文件,gitclone https://aur.archlinux.org/goldendict-ng-git.git2.安装必须的依赖,可能有......
  • go 内存管理
    协程栈go栈的位置1.Go协程栈位于Go-堆内存上2.Go堆内存位于操作系统虚拟内存上go栈的工作流程以main.main为出发点要记录runtime.main的栈基地址记录a和b的局部变量值开辟一个空间记录sum函数的返回值记录b和a的值,这里是为了方便sum在执行时候,......
  • MongoDB 各种复杂查询彻底弄明白
    查询语法db.collection.find(query,projection)query :可选,使用查询操作符指定查询条件projection :可选,使用投影操作符指定返回的键。查询时返回文档中所有键值,只需省略该参数即可(默认省略)。querymongoDB的query就好比MySQL中where后的内容。我们知道where后可以跟很多条件语句......
  • 自定义精美商品分类列表组件 侧边栏商品分类组件 category组件(适配vue3)
    随着技术的发展,开发的复杂度也越来越高,传统开发方式将一个系统做成了整块应用,经常出现的情况就是一个小小的改动或者一个小功能的增加可能会引起整体逻辑的修改,造成牵一发而动全身。通过组件化开发,可以有效实现单独开发,单独维护,而且他们之间可以随意的进行组合。大大提升开发效率......
  • GO富集分析图
    1.GO富集分析输入数据输入文件需要有3列信息:ONTOLOGY:GO分类,BP/CC/MFTerm:GO名称Count:富集在每个Term上基因数目ONTOLOGY ID Term GeneRatio BgRatio pvalue p.adjust qvalue geneID CountBP GO:0032496 responsetolipopolysaccharide 37/163 330/18670 1.5814603638303......