首页 > 系统相关 >Nginx 的多进程异步模型要了解下么

Nginx 的多进程异步模型要了解下么

时间:2023-03-22 22:32:38浏览次数:47  
标签:异步 socket 模型 sock Nginx 进程 接字 conn struct


Ngnix 是互联网企业使用最为广泛的高性能的反向代理服务器,为何叫反向代理之前也讲过。它可以为后端分布式服务提供负载均衡的功能,它可以将后端多个服务地址聚合为单个地址来对外提供服务。它正是采用了多进程 PreForking 异步模型达到了业界公认的高并发的处理能力。

说实话使用java语言通过编码层面去实现多进程几乎奢想(找不到相关资料),接下来将使用Python 来体验下多进程的魅力!

Nginx是在 RPC 技术的基础上构建出来的,Nginx 和后端服务之间的交互在本质上也可以理解为 RPC 数据交互。一般的RPC 的服务器模型,从简单变化到复杂,从经典变化到现代。

Nginx 的多进程异步模型要了解下么_子进程

在Python 中有3个非常重要内置的库,socket、struct 和 json,她们提供 RPC 服务所具备基本的功能。

socket

网络通信功能

支持 tcp/udp

send和sendall区别

在网络状况良好的情况下,这两个方法几乎没有区别。但是需要特别注意的是send方法有可能只会发送了部分内容,它通过返回值来指示实际发出去了多少内容。而sendall方法是对send方法的封装,它考虑了这个情况,如果第一次send方法发送不完全,就会尝试第二次第三次循环发送直到全部内容都发送出去了或者中间出了错误才会返回。

struct 

字节转换功能

二进制解码编码库,用于将各种不同的类型的字段编码成二进制字节串,类似于 java 的 bytebuffer 可以将各种不同类型的字段内容编码成 byte 数组。

json

消息序列化功能

类似于 java的fastjson,但它序列化性能不高

thread

用户可以使用 thread 库创建原生的线程

os

os.fork() 创建子进程函数

它封装了 glibc 提供的 fork 函数,fork 调用将生成一个子进程,所以这个函数会在父子进程同时返回。在父进程的返回结果是一个整数值,这个值是子进程的进程号,父进程可以使用该进程号来控制子进程的运行。fork 在子进程的返回结果是零。如果 fork 返回值小于零,一般意味着操作系统资源不足,无法创建进程。

子进程创建后,父进程拥有的很多操作系统资源,子进程也会持有。比如套接字和文件描述符,它们本质上都是对操作系统内核对象的一个引用。如果子进程不需要某些引用,一定要即时关闭它,避免操作系统资源得不到释放导致资源泄露。

# 伪代码
# socket client
sock = socket.socket()  # 创建一个套接字
sock.connect()  # 连接远程服务器
sock.recv() # 读
sock.send()  # 尽可能地写
sock.sendall()  # 完全写
sock.close()  # 关闭

# socket server
sock = socket.socket()  # 创建一个服务器套接字
sock.bind()  # 绑定端口
sock.listen()  # 监听连接
sock.accept()  # 接受新连接
sock.close()  # 关闭服务器套接字

# struct
# 将一个整数编码成 4 个字节的字符串
value_in_bytes = struct.pack("I", 1024)  
# 将一个4字节的字符串解码成一个整数,面有个逗号,不是笔误。
# unpack 返回的是一个列表,它可以将一个很长的字节串解码成一系列的对象。value 取这个列表的第一个对象。
value, = struct.unpack("I", value_in_bytes) 

# json
raw = json.dumps({"hello": "world"})  # 序列化
po = json.loads(raw)  # 反序列化

# thread
def something_heavy(params):
    pass
thread.start_new_thread(something_heavy, (params,))

# os.fork
pid = os.fork()
if pid > 0:
    # in parent process
if pid == 0:
    # in child process
if pid < 0:
    # fork error

单线程同步模型

每次只能处理一个客户端连接,其它连接必须等到前面的连接关闭了才能得到服务器的处理。慢!

多线程同步模型

可以并行处理多个客户端,每来一个新连接,则开启一个新的线程单独进行处理。每个线程都是同步读写客户端连接。不能充分利用多核(ython 的 GIL 致使单个进程只能占满一个 CPU 核心)!

多进程同步模型

子进程创建容易,销毁难。当子进程退出后,父进程需要使用 waitpid 系统调用收割子进程,否则子进程将成为僵尸进程,僵尸进程会持续占据操作系统的资源直到父进程退出后被 init 进程接管收割后才会消失释放资源。收割子进程的逻辑处理有一定的复杂度,涉及到非常精细的信号控制逻辑。进程要比线程更加吃资源(如果来一个连接就开一个进程,当连接比较多时,操作系统的调度压力也就会比较大)!

PreForking同步模型

PreForking 是通过预先产生多个子进程,共同对服务器套接字进行竞争性的 accept,当一个连接到来时,每个子进程都有机会拿到这个连接,但是最终只会有一个进程能 accept 成功返回拿到连接。子进程拿到连接后,进程内部可以继续使用单线程或者多线程同步的形式对连接进行处理。

如果并行的连接数超过了 prefork 进程的数量,那么后来的客户端请求将会阻塞,因为正在处理连接的子进程是没有机会去调用 accept 来获取新连接的。为了不阻塞新的客户端,我们可以将子进程的单线程同步模型改成多线程同步模型即可。

prefork 之后,父进程创建的服务套接字引用,每个子进程也会继承一份,它们共同指向了操作系统内核的套接字对象,共享了同一份连接监听队列。子进程和父进程一样都可以对服务套接字进行 accept 调用,从共享的监听队列中摘取一个新连接进行处理。

# prefork.py

import os
import json
import struct
import socket


def handle_conn(conn, addr, handlers):
    print addr, "comes"
    while True:
        length_prefix = conn.recv(4)
        if not length_prefix:
            print addr, "bye"
            conn.close()
            break  # 关闭连接,继续处理下一个连接
        length, = struct.unpack("I", length_prefix)
        body = conn.recv(length)
        request = json.loads(body)
        in_ = request['in']
        params = request['params']
        print in_, params
        handler = handlers[in_]
        handler(conn, params)


def loop(sock, handlers):
    while True:
        conn, addr = sock.accept()
        handle_conn(conn, addr, handlers)


def ping(conn, params):
    send_result(conn, "pong", params)


def send_result(conn, out, result):
    response = json.dumps({"out": out, "result": result})
    length_prefix = struct.pack("I", len(response))
    conn.sendall(length_prefix)
    conn.sendall(response)


def prefork(n):
    for i in range(n):
        pid = os.fork()
        if pid < 0:  # fork error
            return
        if pid > 0:  # parent process
            continue
        if pid == 0:
            break  # child process


if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8080))
    sock.listen(1)
    prefork(10)  # 好戏在这里,开启了 10 个子进程
    handlers = {
        "ping": ping
    }
    loop(sock, handlers)

Nginx 多进程并发模型

我们知道Nginx是一个多进程并发模型,它的 Master 进程在绑定监听地址端口后 fork 出了多个 Slave 进程共同竞争处理这个服务端套接字接收到的很多客户端连接。这多个 Slave 进程会共享同一个处于操作系统内核态的套接字队列,操作系统的网络模块在处理完三次握手后就会将套接字塞进这个队列。这是一个生产者消费者模型,生产者是操作系统的网络模块,消费者是多个 Slave 进程,队列中的对象是客户端套接字。

这种模型在负载均衡上有一个缺点,那就是套接字分配不均匀,形成了类似于贫富分化的局面,也就是「闲者愈闲,忙者愈忙」的状态(此时通过监控就可以发现)。这是因为当多个进程竞争同一个套接字队列时,操作系统采用了 LIFO 的策略,最后一个来 accept 的进程最优先拿到 套接字。越是繁忙的进程越是有更多的机会调用 accept,它能拿到的套接字也就越多。

 

Nginx 的多进程异步模型要了解下么_父进程_02

Node Cluster 为了解决负载均衡问题,它采用了不同的策略。它也是多进程并发模型,Master 进程会 fork 出多个子进程来处理客户端套接字。但是不存在竞争问题,因为负责 accept 套接字的只能是 Master 进程,Slave 进程只负责处理客户端套接字请求。那就存在一个问题,Master 进程拿到的客户端套接字如何传递给 Slave 进程。

神奇的 sendmsg 登场了。它是操作系统提供的系统调用,可以在不同的进程之间传递文件描述符。sendmsg 会搭乘一个特殊的「管道」将 Master 进程的套接字描述符传递到 Slave 进程,Slave 进程通过 recvmsg 系统调用从这个「管道」中将描述符取出来。这个「管道」比较特殊,它是 Unix 域套接字。普通的套接字可以跨机器传输消息,Unix 域套接字只能在同一个机器的不同进程之间传递消息。同管道一样,Unix 域套接字也分为有名套接字和无名套接字,有名套接字会在文件系统指定一个路径名,无关进程之间都可以通过这个路径来访问 Unix 域套接字。而无名套接字一般用于父子进程之间,父进程会通过 socketpair 调用来创建套接字,然后 fork 出来子进程,这样子进程也会同时持有这个套接字的引用。后续父子进程就可以通过这个套接字互相通信。

Nginx 的多进程异步模型要了解下么_套接字_03

Nginx 的多进程异步模型要了解下么_套接字_04

注意这里的传递描述符,本质上不是传递,而是复制。父进程的描述符并不会在 sendmsg 自动关闭自动消失,子进程收到的描述符和父进程的描述符也不是同一个整数值。但是父子进程的描述符都会指向同一个内核套接字对象。有了描述符的传递能力,父进程就可以将 accept 到的客户端套接字轮流传递给多个 Slave 进程,负载均衡的目标就可以顺利实现了。

Python3.5+才内置了sendmsg 和 recvmsg 方法,此时它就可以实现Node Cluster 的并发模型。

# buffers: 表示需要传递的消息内容
# ancdata:描述符是通过ancdata 参数传递,它是一个三元组的列表,三元组的第一个参数表示网络协议栈级别 level,第二个参数表示辅助数据的类型 type,第三个参数才是携带的数据,level=SOL_SOCKET 表示传递的数据处于 TCP 协议层级,type=SCM_RIGHTS 就表示携带的数据是文件描述符。我们传递的描述符 fd 是一个整数,需要使用 struct 包将它序列化成二进制。
socket.sendmsg(buffers[, ancdata[, flags[, address]]])
#伪代码
def send_fds(sock, fd):
    return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])

#提供消息体的长度和辅助数据的长度参数
msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]])
#伪代码
bufsize = 1  # 消息内容的长度
ancbufsize = socket.CMSG_LEN(struct.calcsize('i'))  # 辅助数据的长度
msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息
level, type, fd_bytes = ancdata[0] # 取第一个元祖,注意发送消息时我们传递的是一个三元组的列表
fd = struct.unpack('i', fd_bytes) # 反序列化
import os
import json
import struct
import socket


def handle_conn(conn, addr, handlers):
    print(addr, "comes")
    while True:
        # 简单起见,这里就没有使用循环读取了
        length_prefix = conn.recv(4)
        if not length_prefix:
            print(addr, "bye")
            conn.close()
            break  # 关闭连接,继续处理下一个连接
        length, = struct.unpack("I", length_prefix)
        body = conn.recv(length)
        request = json.loads(body)
        in_ = request['in']
        params = request['params']
        print(in_, params)
        handler = handlers[in_]
        handler(conn, params)


def loop_slave(pr, handlers):
    while True:
        bufsize = 1
        ancsize = socket.CMSG_LEN(struct.calcsize('i'))
        msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize)
        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
        fd = struct.unpack('i', cmsg_data)[0]
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd)
        handle_conn(sock, sock.getpeername(), handlers)


def ping(conn, params):
    send_result(conn, "pong", params)


def send_result(conn, out, result):
    response = json.dumps({"out": out, "result": result}).encode('utf-8')
    length_prefix = struct.pack("I", len(response))
    conn.sendall(length_prefix)
    conn.sendall(response)


def loop_master(serv_sock, pws):
    idx = 0
    while True:
        sock, addr = serv_sock.accept()
        pw = pws[idx % len(pws)]
        # 消息数据,whatever
        msg = [b'x']
        # 辅助数据,携带描述符
        ancdata = [(
            socket.SOL_SOCKET,
            socket.SCM_RIGHTS,
            struct.pack('i', sock.fileno()))]
        pw.sendmsg(msg, ancdata)
        sock.close()  # 关闭引用
        idx += 1


def prefork(serv_sock, n):
    pws = []
    for i in range(n):
        # 开辟父子进程通信「管道」
        pr, pw = socket.socketpair()
        pid = os.fork()
        if pid < 0:  # fork error
            return pws
        if pid > 0:
            # 父进程
            pr.close()  # 父进程不用读
            pws.append(pw)
            continue
        if pid == 0:
            # 子进程
            serv_sock.close()  # 关闭引用
            pw.close()  # 子进程不用写
            return pr
    return pws


if __name__ == '__main__':
    serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serv_sock.bind(("localhost", 8080))
    serv_sock.listen(1)
    pws_or_pr = prefork(serv_sock, 10)
    if hasattr(pws_or_pr, '__len__'):
        if pws_or_pr:
            loop_master(serv_sock, pws_or_pr)
        else:
            # fork 全部失败,没有子进程,Game Over
            serv_sock.close()
    else:
        handlers = {
            "ping": ping
        }
        loop_slave(pws_or_pr, handlers)

父进程使用 fork 调用创建了多个子进程,然后又使用 socketpair 调用为每一个子进程都创建一个无名套接字用来传递描述符。父进程使用 roundrobin 策略平均分配接收到的客户端套接字。子进程接收到的是一个描述符整数,需要将描述符包装成套接字对象后方可读写。

当然了,上面不论是单进程还是多进程都是同步模型,其实Python 也是支持异步模型的!

持续更新中

标签:异步,socket,模型,sock,Nginx,进程,接字,conn,struct
From: https://blog.51cto.com/alex/6143479

相关文章