首页 > 编程语言 >python之I/O多路复用

python之I/O多路复用

时间:2024-09-26 09:50:30浏览次数:12  
标签:socket 多路复用 python server 描述符 print message select

python  IO多路复用

一、多路复用概念:

监听多个描述符(文件描述符(windows下暂不支持)、网络描述符)的状态,如果描述符状态改变 则会被内核修改标志位,进而被进程获取进而进行读写操作

 

二、多路复用两种触发方式:

水平触发(Level Triggered):

    将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,但是会增加消耗

边缘触发(Edge Triggered):

    只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

 

三、 阻塞/非阻塞 模式:

阻塞: 如果阻塞模式则等待数据
非阻塞:   如果非阻塞模式有数据返回数据、无数据直接返回报错

 

四、 I/O模型:

同步I/O: 一问一答 等待数据(阻塞模式)或 不管有没有数据都返回(非阻塞模式)
异步I/O: 用户进程问完之后干别的处理结果出来之后告知用户进程

 

 

五、 selec/poll/epoll

相同点和不同点图解

python之I/O多路复用_描述符

sellect、poll、epoll三者的区别 

select 
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

使用 select : 
在python中,select函数是一个对底层操作系统的直接访问的接口。它用来监控sockets、files和pipes,等待IO完成(Waiting for I/O completion)。当有可读、可写或是异常事件产生时,select可以很容易的监控到。 
select.select(rlist, wlist, xlist[, timeout]) 传递三个参数,一个为输入而观察的文件对象列表,一个为输出而观察的文件对象列表和一个观察错误异常的文件列表。第四个是一个可选参数,表示超时秒数。其返回3个tuple,每个tuple都是一个准备好的对象列表,它和前边的参数是一样的顺序。下面,主要结合代码,简单说说select的使用。 
Server端程序: 
1、该程序主要是利用socket进行通信,接收客户端发送过来的数据,然后再发还给客户端。 
2、首先建立一个TCP/IP socket,并将其设为非阻塞,然后进行bind和listen。 
3、通过select函数获取到三种文件列表,分别对每个列表的每个元素进行轮询,对不同socket进行不同的处理,最外层循环直到inputs列表为空为止 
4、当设置timeout参数时,如果发生了超时,select函数会返回三个空列表。 

使用select示例如下:

server端:

import select
import socket
import Queue
 
#create a socket
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
#set option reused
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
 
server_address= ('192.168.1.102',10001)
server.bind(server_address)
 
server.listen(10)
 
#sockets from which we except to read
inputs = [server]
 
#sockets from which we expect to write
outputs = []
 
#Outgoing message queues (socket:Queue)
message_queues = {}
 
#A optional parameter for select is TIMEOUT
timeout = 20
 
while inputs:
    print "waiting for next event"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
 
    # When timeout reached , select return three empty lists
    if not (readable or writable or exceptional) :
        print "Time out ! "
        break;    
    for s in readable :
        if s is server:
            # A "readable" socket is ready to accept a connection
            connection, client_address = s.accept()
            print "    connection from ", client_address
            connection.setblocking(0)
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data :
                print " received " , data , "from ",s.getpeername()
                message_queues[s].put(data)
                # Add output channel for response    
                if s not in outputs:
                    outputs.append(s)
            else:
                #Interpret empty result as closed connection
                print "  closing", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                #remove message queue 
                del message_queues[s]
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print " " , s.getpeername() , 'queue empty'
            outputs.remove(s)
        else:
            print " sending " , next_msg , " to ", s.getpeername()
            s.send(next_msg)
     
    for s in exceptional:
        print " exception condition on ", s.getpeername()
        #stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        #Remove message queue
        del message_queues[s]

View Code

client端:

import socket
 
messages = ["This is the message" ,
            "It will be sent" ,
            "in parts "]
 
print "Connect to the server"
 
server_address = ("192.168.1.102",10001)
 
#Create a TCP/IP sock
 
socks = []
 
for i in range(10):
    socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
 
for s in socks:
    s.connect(server_address)
 
counter = 0
for message in messages :
    #Sending message from different sockets
    for s in socks:
        counter+=1
        print "  %s sending %s" % (s.getpeername(),message+" version "+str(counter))
        s.send(message+" version "+str(counter))
    #Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print " %s received %s" % (s.getpeername(),data)
        if not data:
            print "closing socket ",s.getpeername()
            s.close()

View Code

使用Poll:

Server端:

import socket
import select 
import Queue
 
# Create a TCP/IP socket, and then bind and listen
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.102", 10001)
 
print  "Starting up on %s port %s" % server_address
server.bind(server_address)
server.listen(5)
message_queues = {}
#The timeout value is represented in milliseconds, instead of seconds.
timeout = 1000
# Create a limit for the event
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
# Set up the poller
poller = select.poll()
poller.register(server,READ_ONLY)
#Map file descriptors to socket objects
fd_to_socket = {server.fileno():server,}
while True:
    print "Waiting for the next event"
    events = poller.poll(timeout)
    print "*"*20
    print len(events)
    print events
    print "*"*20
    for fd ,flag in  events:
        s = fd_to_socket[fd]
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                # A readable socket is ready to accept a connection
                connection , client_address = s.accept()
                print " Connection " , client_address
                connection.setblocking(False)
                 
                fd_to_socket[connection.fileno()] = connection
                poller.register(connection,READ_ONLY)
                 
                #Give the connection a queue to send data
                message_queues[connection]  = Queue.Queue()
            else :
                data = s.recv(1024)
                if data:
                    # A readable client socket has data
                    print "  received %s from %s " % (data, s.getpeername())
                    message_queues[s].put(data)
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print "  closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        elif flag & select.POLLHUP :
            #A client that "hang up" , to be closed.
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        elif flag & select.POLLOUT :
            #Socket is ready to send data , if there is any to send
            try:
                next_msg = message_queues[s].get_nowait()
            except Queue.Empty:
                # No messages waiting so stop checking
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print " sending %s to %s" % (next_msg , s.getpeername())
                s.send(next_msg)
        elif flag & select.POLLERR:
            #Any events with POLLERR cause the server to close the socket
            print "  exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]

View Code

"一劳永逸" 的话,有是有的,而 "一劳永逸" 的事却极少



标签:socket,多路复用,python,server,描述符,print,message,select
From: https://blog.51cto.com/u_8901540/12117502

相关文章

  • python画图|横向填充两条线之间的区域
    前述已经学习了《python画图|竖向填充两条线之间的区域》,点击下方链接直达:python画图|竖向填充两条线之间的区域-CSDN博客现在我们尝试更换填充方向,转向横向填充。【1】官网教程首先点击官网链接,直达教程:https://matplotlib.org/stable/gallery/lines_bars_and_markers/fi......
  • 【Python】Python中的进制转换操作
    【Python】Python中的进制转换操作1.十进制转其他进制转二进制使用bin()函数可以将十进制整数转换为二进制字符串,前缀为'0b'。decimal_number=10binary_string=bin(decimal_number)print(binary_string)#输出:0b1010转八进制使用oct()函数可以将十进......
  • 【鸟类识别系统】+计算机毕设项目+卷积神经网络算法+人工智能+深度学习+模型训练+Pyth
    一、介绍鸟类识别系统。本系统采用Python作为主要开发语言,通过使用加利福利亚大学开源的200种鸟类图像作为数据集。使用TensorFlow搭建ResNet50卷积神经网络算法模型,然后进行模型的迭代训练,得到一个识别精度较高的模型,然后在保存为本地的H5格式文件。在使用Django开发Web网页端操作......
  • 2024.9.24 Python与C++面试八股文
    1.externextern关键字用于在多个文件中引用同一个全局变量的声明在一个头文件中,如果这个变量声明了,但是在cpp文件中没找到他的定义,那么编译就会报错,但是如果加了extern,编译器就不会给头文件报错,而是给cpp文件报错,如果函数没定义的话。或者定义出错的话。2.关于反复调用简......
  • Python笔记
    Python笔记(大数据方向)一、基本数据类型1、数字类型1.1、整型(int)i=100t=type(i)print(i,t)1.2、浮点型(float)f=12.14t=type(i)print(f,t)1.3、布尔型(False,True)b=Truet=type(b)print(b,t)2、字符串使用单引号将若干个字符括起来的序列,叫做字符串a1='这是......
  • Python中Cache的使用
    文章目录一、缓存的基础概念二、基础使用三、进阶使用四、外部缓存工具五、缓存的注意事项一、缓存的基础概念缓存(Cache)是一种在应用程序中提升性能的技术,它通过将一些数据临时存储在快速访问的存储介质(如内存)中,以减少数据的重复计算或重复读取。通常,缓存用于存储......