首页 > 其他分享 >(转)Golang网络开发系列(二)—— net包

(转)Golang网络开发系列(二)—— net包

时间:2023-03-07 22:46:37浏览次数:59  
标签:系列 err nil Golang fd error return net

原文:https://zhuanlan.zhihu.com/p/575280551

这篇文章我们将开始学习net包。因为我们大多是从net.Listen开始写一个tcp server的,这篇文章我们就从上到下去分析,直到遇到internal/poll.FD为止。

net.Listen()将返回一个net.Listener接口:

type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (Conn, error)

	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() Addr
}

查看Listen函数的代码:

func Listen(network, address string) (Listener, error) {
	var lc ListenConfig
	return lc.Listen(context.Background(), network, address)
} 

type ListenConfig struct {
	// If Control is not nil, it is called after creating the network
	// connection but before binding it to the operating system.
	//
	// Network and address parameters passed to Control method are not
	// necessarily the ones passed to Listen. For example, passing "tcp" to
	// Listen will cause the Control function to be called with "tcp4" or "tcp6".
	Control func(network, address string, c syscall.RawConn) error

	// KeepAlive specifies the keep-alive period for network
	// connections accepted by this listener.
	// If zero, keep-alives are enabled if supported by the protocol
	// and operating system. Network protocols or operating systems
	// that do not support keep-alives ignore this field.
	// If negative, keep-alives are disabled.
	KeepAlive time.Duration
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
	addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
	if err != nil {
		return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
	}
	sl := &sysListener{
		ListenConfig: *lc,
		network:      network,
		address:      address,
	}
	var l Listener
	la := addrs.first(isIPv4)
	switch la := la.(type) {
	case *TCPAddr:
		l, err = sl.listenTCP(ctx, la)
	case *UnixAddr:
		l, err = sl.listenUnix(ctx, la)
	default:
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
	}
	if err != nil {
		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
	}
	return l, nil
}

Listen函数内部其实是实例化了一个ListenConfig对象,调用的是ListenConfig对象的Listen()方法。其内部创建了对象 sysListener,并调用了sysListener.listenTCP() 以TCP为例:

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
	if err != nil {
		return nil, err
	}
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

调用 internetSocket() 返回一个 fd,fd是net.netFD实例,最终返回的是一个net.TCPListener对象,这个对象实现了net.Listener接口。net.netFD与我们关注的internal/poll.FD更接近,我们看一下:

net.netFD

type netFD struct {
	pfd poll.FD

	// immutable until Close
	family      int
	sotype      int
	isConnected bool // handshake completed or use of association with peer
	net         string
	laddr       Addr
	raddr       Addr
}

不出所料,netFD封装了poll.fd,并且提供了其他几个只读字段。

ReadXXX/WriteXXX

各种ReadXXX/WriteXXX方法,都是调用pfd对应的方法实现的,代码很简单。

func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError(readSyscallName, err)
}

func (fd *netFD) Write(p []byte) (nn int, err error) {
	nn, err = fd.pfd.Write(p)
	runtime.KeepAlive(fd)
	return nn, wrapSyscallError(writeSyscallName, err)
}

func (fd *netFD) SetDeadline(t time.Time) error {
	return fd.pfd.SetDeadline(t)
}

func (fd *netFD) SetReadDeadline(t time.Time) error {
	return fd.pfd.SetReadDeadline(t)
}

func (fd *netFD) SetWriteDeadline(t time.Time) error {
	return fd.pfd.SetWriteDeadline(t)
}

accept()

因为net.netFD是对internal/poll.FD的封装,可以猜想accept()的实现会调用internal/poll.FD的Accept()。

 func (fd *netFD) accept() (netfd *netFD, err error) {
        // d是新连接的描述符,rsa是对端地址。
	d, rsa, errcall, err := fd.pfd.Accept()
	if err != nil {
		if errcall != "" {
			err = wrapSyscallError(errcall, err)
		}
		return nil, err
	}
        // 和我们在internal/poll.FD中预测的一样,Accept()返回的d,被传入net.netFD中,
	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
        // netfd就是server端连接对象,这里进行初始化
	if err = netfd.init(); err != nil {
		netfd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

accept()最终返回了一个新的net.netFD,这个对象代表新连接的服务端对象。

这里涉及net.netFD的创建和初始化,我们顺便看一下:

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
	ret := &netFD{
		pfd: poll.FD{
			Sysfd:         sysfd,
			IsStream:      sotype == syscall.SOCK_STREAM,
			ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
		},
		family: family,
		sotype: sotype,
		net:    net,
	}
	return ret, nil
}

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true)
}

在创建netFD的同时,也创建了internal/poll.FD。这里初始化了net.netFD的4个字段,还有3个字段没有初始化:laddr、raddr、isConnected。在accept()中,通过

netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))

完成了laddr、raddr的初始化。isConnected默认值为false,因为刚创建的net.netFD还没有连接。

listenStream()

以TCP的listen为例。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
	var err error
	if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
		return err
	}
	var lsa syscall.Sockaddr
	if lsa, err = laddr.sockaddr(fd.family); err != nil {
		return err
	}
	if ctrlFn != nil {
		c, err := newRawConn(fd)
		if err != nil {
			return err
		}
		if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
			return err
		}
	}
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
	if err = fd.init(); err != nil {
		return err
	}
	lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
        // 这时仅仅是本地端口和socket的绑定,还没有客户端连接对象,因此第二个参数为nil。
	fd.setAddr(fd.addrFunc()(lsa), nil)
	return nil
}

通过调用syscall.Bind()完成端口和socket的关联,再调用fd.init()完成epoll的监控。

socket 和 sysSocket

func sysSocket(family, sotype, proto int) (int, error) {
	// See ../syscall/exec_unix.go for description of ForkLock.
	syscall.ForkLock.RLock()
	s, err := socketFunc(family, sotype, proto)
	if err == nil {
		syscall.CloseOnExec(s)
	}
	syscall.ForkLock.RUnlock()
	if err != nil {
		return -1, os.NewSyscallError("socket", err)
	}
	if err = syscall.SetNonblock(s, true); err != nil {
		poll.CloseFunc(s)
		return -1, os.NewSyscallError("setnonblock", err)
	}
	return s, nil
}

sysSocket通过syscall创建了一个socket文件s,并且设置其为非阻塞模式:syscall.SetNonblock(s, true)

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	s, err := sysSocket(family, sotype, proto)
	if err != nil {
		return nil, err
	}
	if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}
	if fd, err = newFD(s, family, sotype, net); err != nil {
		poll.CloseFunc(s)
		return nil, err
	}

	if laddr != nil && raddr == nil {
		switch sotype {
		case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
			if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		case syscall.SOCK_DGRAM:
			if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
				fd.Close()
				return nil, err
			}
			return fd, nil
		}
	}
	if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
		fd.Close()
		return nil, err
	}
	return fd, nil
}

socket() 则调用sysSocket(),把socket文件描述符放入net.netFD:fd, err =newFD(s, family, sotype, net) ,并且根据参数,可能会调用fd.listenStream 或 fd.listenDatagram 或 fd.dial(ctx, laddr, raddr, ctrlFn),也就是说,当完成net.socket()调用时,已经调用了listen或dial。

func (fd *netFD) dial(){
    // ... ...
    if raddr != nil {
	if rsa, err = raddr.sockaddr(fd.family); err != nil {
		return err
	}
	if crsa, err = fd.connect(ctx, lsa, rsa); err != nil {
		return err
	}
	fd.isConnected = true
    } else {
	if err := fd.init(); err != nil {
		return err
	}
    }
    // ... ...
} 

调用dial()时,如果 fd.connect()成功,则fd.isConnected = true。也就是,fd.dial()成功时。

 

这时,再次回到 sysListener.listenTCP():

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
	if err != nil {
		return nil, err
	}
	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
} 

listenTCP()是通过调用internetSocket()得到net.netFD的。

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
	if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
		raddr = raddr.toLocal(net)
	}
	family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

核心代码正是通过socket()完成的。调用 internetSocket()时,第4个参数raddr = nil,传入socket中,即只有laddr,没有raddr,即在socket()代码中,走的时if laddr != nil && raddr == nil 这个分支。

TCPListener

已经了解了net.netFD,接下来看看listenTCP()返回的TCPListener对象。

我们看看Accept() Conn是如何实现的。

Accept()

func (l *TCPListener) Accept() (Conn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	c, err := l.accept()
	if err != nil {
		return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return c, nil
}

TCPListener提供了内部实现:

func (ln *TCPListener) accept() (*TCPConn, error) {
        // ln.fd.accept()是个阻塞操作,代码会停在这里,直到有连接出现。新连接被封装成fd
	fd, err := ln.fd.accept()
	if err != nil {
		return nil, err
	}
        // fd 被传入 TCPConn,最终返回了一个TCPConn对象,这离我们理解的Conn更近一步。
	tc := newTCPConn(fd)
	if ln.lc.KeepAlive >= 0 {
		setKeepAlive(fd, true)
		ka := ln.lc.KeepAlive
		if ln.lc.KeepAlive == 0 {
			ka = defaultTCPKeepAlive
		}
		setKeepAlivePeriod(fd, ka)
	}
	return tc, nil
}

至此我们发现了Accept()返回的Conn和 accept()返回的TCPConn。

Conn

type Conn interface {
	Read(b []byte) (n int, err error)
	Write(b []byte) (n int, err error)
	Close() error
	LocalAddr() Addr
	RemoteAddr() Addr
	SetDeadline(t time.Time) error
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
}

Conn代表了任一连接对象,而TCPConn一定要实现这个接口:

type TCPConn struct {
	conn
}

根据搜索到的TCPConn方法,TCPConn自己并没有实现Conn接口,因为TCPConn继承了net.conn,可以猜想net.conn会实现net.Conn接口:

net.conn

type conn struct {
	fd *netFD
}

net.conn确实实现了net.Conn接口。

到目前为止,我们有以下类图:

总结

  1. 我们调用net.Listen()监听一个TCP端口,就得到一个net.Listener接口对象,其内部变量是一个TCPListener对象;
  2. 接口我们调用net.Listener接口对象的Accept()方法,当有新连接时,Accept()返回一个net.Conn接口对象,其内部变量是一个net.TCPConn对象。

这个模式可以扩展到UDP、Unix网络、IP网络。(UDP用不到Listener。)

我们提取各种网络的公共逻辑,于是可以实现一个listener工厂:sysListener。

sysListener

  1. sysListener.listenTCP() -->net.ListenTCP()
  2. sysListener.listenUDP() --> net.ListenUDP()
  3. sysListener.listenMulticastUDP() --> net.ListenMulticastUDP()
  4. sysListener.listenUnix() --> net.ListenUnix()
  5. sysListener.listenUnixgram() --> net.ListenUnixgram()
  6. sysListener.listenIP() --> net.ListenIP()

net.ListenPacket() 调用 sysListener.listenUDP()、sysListener.listenIP()、sysListener.listenUnixgram()

net.Listen() 调用 sysListener.listenTCP()、sysListener.listenUnix()

net.ListenMulticastUDP() 调用 sysListener.listenMulticastUDP()

因此我们有2种方式监听各种网络:

方式一:通过net.Listen()、net.ListenPacket() 和 net.ListenMulticastUDP()

方式二:通过net.ListenTCP()、net.ListenUDP()、net.ListenMulticastUDP()、net.ListenUnix()、net.ListenUnixgram()、net.ListenIP()

这一篇我们以TCP网络为例,理解了编程模型的底层实现,一直深入到internal/poll.FD,如何实现一个Conn等。我们可以用net包开发我们自己的网络应用了。

net包其实实现了几种常用协议,下一篇我们将看看如何使用net包实现http协议。

标签:系列,err,nil,Golang,fd,error,return,net
From: https://www.cnblogs.com/liujiacai/p/17190004.html

相关文章

  • sealos 神奇功能 serverless kubernetes 之 cloud terminal
    何为serverlesskubernetes顾名思义,就是不需要安装直接打开网页就可以直接使用的kubernetes,是一个多租户共享kubernetes的租户模型,这样做的好处是对于用户的使用成本极......
  • 16.2 Kubernetes - Helm(Chart 结构)
    ChartHelm使用一种名为charts的包格式,一个chart是描述一组相关的Kubernetes资源的文件集合。单个chart既可以用于部署简单的应用,也可以是复杂的应用。Charts是......
  • AspNet Core MVC项目接入AdminLTE
    AdminLTE是一款基于jQuery和Bootstrap的流行的后台管理界面框架,通过使用AdminLET框架提供的现成的组件可以大幅的提高后台管理界面的开发速度和规范性。参考 http......
  • #yyds干货盘点#【愚公系列】2023年03月 .NET CORE工具案例-.NET Core使用MiniExcel
    前言1.MiniExcel的介绍MiniExcel是一个简单、高效避免OOM的.NET处理Excel查、写、填充数据工具。目前主流框架大多需要将数据全载入到内存方便操作,但这会导致内......
  • 《Spectral Partitioning Residual Network With Spatial Attention Mechanism for Hy
    论文作者:XiangrongZhang,ShouwangShang,XuTang,etal.论文发表年份:2021模型简称:SPRN发表期刊:IEEETransactionsonGeoscienceandRemoteSensing论文链接:Sci-Hub......
  • 基于DDD的golang实现
    女主宣言今天小编为大家分享基于DDD的golang实现,DDD即领域驱动设计,该模式也算是比较热门的话题了。希望通过本篇文章,大家能够掌握DDD模式,能对大家有所帮助。PS:丰富的一线......
  • 递归表达式系列
    递归函数:什么是递归函数:就是直接或者间接的调用自己"""递归:1.递推逐层寻找答案2.回溯根据最终的答案推导出最原始的答案3.递归函......
  • 深入解读.NET MAUI音乐播放器项目(三):界面交互
    UI设计的本质是对于产品的理解在界面中多种形式的映射,当需求和定位不同时,对相同的功能表达出了不同的界面和交互方式。作为播放器,界面可以是千差万别的。《番茄播放器》的......
  • golang 方法( method )
    1.方法的定义方法总是绑定对象实例,并隐式的将实例作为第一实参(receiver),receiver可以是基础类型,也可以是指针类型,这会关系到是否需要有可以修改对象实例的能力。2.......
  • golang 结构体(struct)
    1.结构体定义Golang没有类(class),Go语言的结构体(struct)和其它编程语言的类(class)有同等的地位,你可以理解Gelang是基于struct来实现OOP特性的。结构体由一系列命名的......