原文:https://zhuanlan.zhihu.com/p/575280551
这篇文章我们将开始学习net包。因为我们大多是从net.Listen
开始写一个tcp server的,这篇文章我们就从上到下去分析,直到遇到internal/poll.FD
为止。
net.Listen()将返回一个net.Listener接口:
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() Addr
}
查看Listen
函数的代码:
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
type ListenConfig struct {
// If Control is not nil, it is called after creating the network
// connection but before binding it to the operating system.
//
// Network and address parameters passed to Control method are not
// necessarily the ones passed to Listen. For example, passing "tcp" to
// Listen will cause the Control function to be called with "tcp4" or "tcp6".
Control func(network, address string, c syscall.RawConn) error
// KeepAlive specifies the keep-alive period for network
// connections accepted by this listener.
// If zero, keep-alives are enabled if supported by the protocol
// and operating system. Network protocols or operating systems
// that do not support keep-alives ignore this field.
// If negative, keep-alives are disabled.
KeepAlive time.Duration
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
Listen函数内部其实是实例化了一个ListenConfig对象,调用的是ListenConfig对象的Listen()方法。其内部创建了对象 sysListener,并调用了sysListener.listenTCP() 以TCP为例:
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
调用 internetSocket() 返回一个 fd,fd是net.netFD实例,最终返回的是一个net.TCPListener对象,这个对象实现了net.Listener接口。net.netFD与我们关注的internal/poll.FD更接近,我们看一下:
net.netFD
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
不出所料,netFD封装了poll.fd,并且提供了其他几个只读字段。
ReadXXX/WriteXXX
各种ReadXXX/WriteXXX方法,都是调用pfd对应的方法实现的,代码很简单。
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
func (fd *netFD) Write(p []byte) (nn int, err error) {
nn, err = fd.pfd.Write(p)
runtime.KeepAlive(fd)
return nn, wrapSyscallError(writeSyscallName, err)
}
func (fd *netFD) SetDeadline(t time.Time) error {
return fd.pfd.SetDeadline(t)
}
func (fd *netFD) SetReadDeadline(t time.Time) error {
return fd.pfd.SetReadDeadline(t)
}
func (fd *netFD) SetWriteDeadline(t time.Time) error {
return fd.pfd.SetWriteDeadline(t)
}
accept()
因为net.netFD是对internal/poll.FD
的封装,可以猜想accept()的实现会调用internal/poll.FD
的Accept()。
func (fd *netFD) accept() (netfd *netFD, err error) {
// d是新连接的描述符,rsa是对端地址。
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 和我们在internal/poll.FD中预测的一样,Accept()返回的d,被传入net.netFD中,
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// netfd就是server端连接对象,这里进行初始化
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
accept()最终返回了一个新的net.netFD,这个对象代表新连接的服务端对象。
这里涉及net.netFD的创建和初始化,我们顺便看一下:
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
在创建netFD的同时,也创建了internal/poll.FD。这里初始化了net.netFD的4个字段,还有3个字段没有初始化:laddr、raddr、isConnected。在accept()中,通过
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
完成了laddr、raddr的初始化。isConnected默认值为false,因为刚创建的net.netFD还没有连接。
listenStream()
以TCP的listen为例。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
// 这时仅仅是本地端口和socket的绑定,还没有客户端连接对象,因此第二个参数为nil。
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
通过调用syscall.Bind()完成端口和socket的关联,再调用fd.init()完成epoll的监控。
socket 和 sysSocket
func sysSocket(family, sotype, proto int) (int, error) {
// See ../syscall/exec_unix.go for description of ForkLock.
syscall.ForkLock.RLock()
s, err := socketFunc(family, sotype, proto)
if err == nil {
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, os.NewSyscallError("socket", err)
}
if err = syscall.SetNonblock(s, true); err != nil {
poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}
sysSocket通过syscall创建了一个socket文件s,并且设置其为非阻塞模式:syscall.SetNonblock(s, true)
。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
socket() 则调用sysSocket(),把socket文件描述符放入net.netFD:fd, err =newFD(s, family, sotype, net)
,并且根据参数,可能会调用fd.listenStream
或 fd.listenDatagram
或 fd.dial(ctx, laddr, raddr, ctrlFn)
,也就是说,当完成net.socket()调用时,已经调用了listen或dial。
func (fd *netFD) dial(){
// ... ...
if raddr != nil {
if rsa, err = raddr.sockaddr(fd.family); err != nil {
return err
}
if crsa, err = fd.connect(ctx, lsa, rsa); err != nil {
return err
}
fd.isConnected = true
} else {
if err := fd.init(); err != nil {
return err
}
}
// ... ...
}
调用dial()时,如果 fd.connect()成功,则fd.isConnected = true。也就是,fd.dial()成功时。
这时,再次回到 sysListener.listenTCP()
:
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
listenTCP()是通过调用internetSocket()得到net.netFD的。
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
raddr = raddr.toLocal(net)
}
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
核心代码正是通过socket()完成的。调用 internetSocket()时,第4个参数raddr = nil,传入socket中,即只有laddr,没有raddr,即在socket()代码中,走的时if laddr != nil && raddr == nil
这个分支。
TCPListener
已经了解了net.netFD
,接下来看看listenTCP()返回的TCPListener对象。
我们看看Accept() Conn是如何实现的。
Accept()
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
TCPListener提供了内部实现:
func (ln *TCPListener) accept() (*TCPConn, error) {
// ln.fd.accept()是个阻塞操作,代码会停在这里,直到有连接出现。新连接被封装成fd
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// fd 被传入 TCPConn,最终返回了一个TCPConn对象,这离我们理解的Conn更近一步。
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
至此我们发现了Accept()返回的Conn和 accept()返回的TCPConn。
Conn
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
Conn代表了任一连接对象,而TCPConn一定要实现这个接口:
type TCPConn struct {
conn
}
根据搜索到的TCPConn方法,TCPConn自己并没有实现Conn接口,因为TCPConn继承了net.conn,可以猜想net.conn会实现net.Conn接口:
net.conn
type conn struct {
fd *netFD
}
net.conn确实实现了net.Conn接口。
到目前为止,我们有以下类图:
总结
- 我们调用net.Listen()监听一个TCP端口,就得到一个net.Listener接口对象,其内部变量是一个TCPListener对象;
- 接口我们调用net.Listener接口对象的Accept()方法,当有新连接时,Accept()返回一个net.Conn接口对象,其内部变量是一个net.TCPConn对象。
这个模式可以扩展到UDP、Unix网络、IP网络。(UDP用不到Listener。)
我们提取各种网络的公共逻辑,于是可以实现一个listener工厂:sysListener。
sysListener
- sysListener.listenTCP() -->net.ListenTCP()
- sysListener.listenUDP() --> net.ListenUDP()
- sysListener.listenMulticastUDP() --> net.ListenMulticastUDP()
- sysListener.listenUnix() --> net.ListenUnix()
- sysListener.listenUnixgram() --> net.ListenUnixgram()
- sysListener.listenIP() --> net.ListenIP()
net.ListenPacket() 调用 sysListener.listenUDP()、sysListener.listenIP()、sysListener.listenUnixgram()
net.Listen() 调用 sysListener.listenTCP()、sysListener.listenUnix()
net.ListenMulticastUDP() 调用 sysListener.listenMulticastUDP()
因此我们有2种方式监听各种网络:
方式一:通过net.Listen()、net.ListenPacket() 和 net.ListenMulticastUDP()
方式二:通过net.ListenTCP()、net.ListenUDP()、net.ListenMulticastUDP()、net.ListenUnix()、net.ListenUnixgram()、net.ListenIP()
这一篇我们以TCP网络为例,理解了编程模型的底层实现,一直深入到internal/poll.FD,如何实现一个Conn等。我们可以用net包开发我们自己的网络应用了。
net包其实实现了几种常用协议,下一篇我们将看看如何使用net包实现http协议。
标签:系列,err,nil,Golang,fd,error,return,net From: https://www.cnblogs.com/liujiacai/p/17190004.html