首页 > 编程语言 >并发编程之Gevent模块

并发编程之Gevent模块

时间:2024-03-03 11:48:24浏览次数:35  
标签:__ spawn 编程 并发 Gevent gevent time print import

Gevent的介绍

greenlet已经实现了协程,但是这个还要人工切换,这里介绍一个比greenlet更强大而且能够自动切换任务的第三方库,那就是gevent。

gevent内部封装的greenlet,其原理是当一个greenlet遇到IO操作时,,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

由于IO操作非常耗时,经常是程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

安装

pip install gevent

Gevent的使用

代码示例:

import gevent


def work(n):
    for i in range(n):
        # 获取当前协程
        print(gevent.getcurrent(), i)
	return 'ok'

g1 = gevent.spawn(work, 5)
g2 = gevent.spawn(work, 5)
g3 = gevent.spawn(work, 5)
g1.join()
g2.join()
g3.join()
# 或者上述三步合作一步:gevent.joinall([g1,g2,g3])
# 拿到work的返回值
result = g1.value

print(result)
# ok

结果:

<Greenlet at 0x16be40925c0: work(5)> 0
<Greenlet at 0x16be40925c0: work(5)> 1
<Greenlet at 0x16be40925c0: work(5)> 2
<Greenlet at 0x16be40925c0: work(5)> 3
<Greenlet at 0x16be40925c0: work(5)> 4
<Greenlet at 0x16be44609a0: work(5)> 0
<Greenlet at 0x16be44609a0: work(5)> 1
<Greenlet at 0x16be44609a0: work(5)> 2
<Greenlet at 0x16be44609a0: work(5)> 3
<Greenlet at 0x16be44609a0: work(5)> 4
<Greenlet at 0x16be4460860: work(5)> 0
<Greenlet at 0x16be4460860: work(5)> 1
<Greenlet at 0x16be4460860: work(5)> 2
<Greenlet at 0x16be4460860: work(5)> 3
<Greenlet at 0x16be4460860: work(5)> 4

可以看到,3个greenlet是依次运行而不是交替运行

Gevent切换执行

遇到IO阻塞时会自动切换任务

import gevent

def work(n):
    for i in range(n):
        # 获取当前协程
        print(gevent.getcurrent(), i)
        #用来模拟一个耗时操作,注意不是time模块中的sleep
        gevent.sleep(1)

g1 = gevent.spawn(work, 5)
g2 = gevent.spawn(work, 5)
g3 = gevent.spawn(work, 5)
g1.join()
g2.join()
g3.join()

结果:

<Greenlet at 0x7fa70ffa1c30: f(5)> 0
<Greenlet at 0x7fa70ffa1870: f(5)> 0
<Greenlet at 0x7fa70ffa1eb0: f(5)> 0
<Greenlet at 0x7fa70ffa1c30: f(5)> 1
<Greenlet at 0x7fa70ffa1870: f(5)> 1
<Greenlet at 0x7fa70ffa1eb0: f(5)> 1
<Greenlet at 0x7fa70ffa1c30: f(5)> 2
<Greenlet at 0x7fa70ffa1870: f(5)> 2
<Greenlet at 0x7fa70ffa1eb0: f(5)> 2
<Greenlet at 0x7fa70ffa1c30: f(5)> 3
<Greenlet at 0x7fa70ffa1870: f(5)> 3
<Greenlet at 0x7fa70ffa1eb0: f(5)> 3
<Greenlet at 0x7fa70ffa1c30: f(5)> 4
<Greenlet at 0x7fa70ffa1870: f(5)> 4
<Greenlet at 0x7fa70ffa1eb0: f(5)> 4

猴子补丁(兼容其他IO)

而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了。

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前,或者我们干脆记忆成:要用gevent,需要将from gevent import monkey; monkey.patch_all()放到文件的开头。

from gevent import monkey

monkey.patch_all() # # 打补丁,让gevent框架识别耗时操作,比如:time.sleep,网络请求延时

import threading
import gevent
import time


def work(n):
    for i in range(n):
        # 获取当前协程
        print(gevent.getcurrent(), i)
        print(threading.current_thread())
        # 用来模拟一个耗时操作,注意不是time模块中的sleep
        time.sleep(1)


if __name__ == '__main__':
    g1 = gevent.spawn(work, 3)
    g2 = gevent.spawn(work, 3)
    g3 = gevent.spawn(work, 3)
    g1.join()
    g2.join()
    g3.join()
    print(threading.current_thread())

结果:

<Greenlet at 0x1fa4a1a3e20: work(3)> 0
<_DummyThread(Dummy-1, started daemon 2174496685600)>
<Greenlet at 0x1fa4a2b1120: work(3)> 0
<_DummyThread(Dummy-2, started daemon 2174497788192)>
<Greenlet at 0x1fa4a2c13a0: work(3)> 0
<_DummyThread(Dummy-3, started daemon 2174497854368)>
<Greenlet at 0x1fa4a1a3e20: work(3)> 1
<_DummyThread(Dummy-1, started daemon 2174496685600)>
<Greenlet at 0x1fa4a2b1120: work(3)> 1
<_DummyThread(Dummy-2, started daemon 2174497788192)>
<Greenlet at 0x1fa4a2c13a0: work(3)> 1
<_DummyThread(Dummy-3, started daemon 2174497854368)>
<Greenlet at 0x1fa4a1a3e20: work(3)> 2
<_DummyThread(Dummy-1, started daemon 2174496685600)>
<Greenlet at 0x1fa4a2b1120: work(3)> 2
<_DummyThread(Dummy-2, started daemon 2174497788192)>
<Greenlet at 0x1fa4a2c13a0: work(3)> 2
<_DummyThread(Dummy-3, started daemon 2174497854368)>
<_MainThread(MainThread, started 2174463857152)>

我们可以用threading.current_thread()来查看每个g1、g2和g3,查看的结果为DummyThread-n,即假线程

注意

当前程序是一个死循环并且还能有耗时操作,就不需要加上join方法了,因为程序需要一直运行不会退出。

代码示例:

import gevent
import time
from gevent import monkey

# 打补丁,让gevent框架识别耗时操作,比如:time.sleep,网络请求延时
monkey.patch_all()


# 任务1
def work1(num):
    for i in range(num):
        print("work1....")
        time.sleep(0.2)
        # gevent.sleep(0.2)

# 任务1
def work2(num):
    for i in range(num):
        print("work2....")
        time.sleep(0.2)
        # gevent.sleep(0.2)



if __name__ == '__main__':
    # 创建协程指定对应的任务
    g1 = gevent.spawn(work1, 3)
    g2 = gevent.spawn(work2, 3)

    while True:
        print("主线程中执行")
        time.sleep(0.5)

结果:

主线程中执行
work1....
work2....
work1....
work2....
work1....
work2....
主线程中执行
主线程中执行
主线程中执行
..省略..

Gevent之同步与异步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

Gevent之应用举例一

协程应用:爬虫

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

Gevent之应用举例二

通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞。

服务端:

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)

客户端:

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

多线程并发多个客户端:

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()

标签:__,spawn,编程,并发,Gevent,gevent,time,print,import
From: https://www.cnblogs.com/xiao01/p/18049749

相关文章

  • 并发编程之IO模型
    引言Python的I/O模型分为同步(sync)和异步(async)两种:同步I/O模型是指,当一个线程在等待I/O操作完成时,它不能执行其他任务,需要一直等待I/O操作完成,直到接收到I/O操作的完成通知后才继续执行。异步I/O模型是指,当一个线程发起一个I/O操作后,不会等待I/O操作完成,而是直接执行其他任......
  • 并发编程补充:基于多线程实现并发的套接字通信
    服务端:fromsocketimport*fromthreadingimportThreaddefcommunicate(conn):whileTrue:try:data=conn.recv(1024)ifnotdata:breakconn.send(data.upper())exceptConnectionResetError:......
  • 并发编程补充:基于多进程实现并发的套接字通信
    服务端:frommultiprocessingimportProcessfromsocketimport*deftalk(conn):whileTrue:try:data=conn.recv(1024)ifnotdata:breakconn.send(data.upper())exceptConnectionResetError:......
  • 并发编程之定时器
    定时器定时器,指定n秒后执行某操作简易版:fromthreadingimportTimerdeftask(name):print('hello%s'%name)t=Timer(5,task,args=('xiao',))t.start()#helloxiao应用版:##验证码定时器fromthreadingimportTimerimportrandomclassCode:......
  • 并发编程之条件Condition
    条件Condition(了解)使得线程等待,只有满足某条件时,才释放n个线程importthreadingdefrun(n):con.acquire()con.wait()print("runthethread:%s"%n)con.release()if__name__=='__main__':con=threading.Condition()foriinra......
  • 「java.util.concurrent并发包」之 Unsafe
    一unsafe介绍Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C......
  • c++多线程编程
    c++线程库:<thread>创建线程:需要可调用的函数或者函数对象作为线程入口点例:std::threadthreadname(function_name,args...)在C++中,当使用std::thread创建线程并传递类的成员函数时,需要使用&来获取成员函数的地址,同时还需要传递对象的指针(或引用)作为第一个参数。......
  • Go语言的100个错误使用场景(55-60)|并发基础
    目录前言8.并发基础8.1混淆并发与并行的概念(#55)8.2认为并发总是更快(#56)8.3分不清何时使用互斥锁或channel(#57)8.4不理解竞态问题(#58)8.5不了解工作负载类型对并发性能的影响(#59)8.6不懂得使用Gocontexts(#60)小结前言大家好,这里是白泽。《Go语言的100个错误以及如何避免》......
  • 【STL和泛型编程】4. hashtable、unordered_set、unordered_map
    1.hashtable前置知识:【数据结构】3.跳表和散列 基本原理:将Key计算成一个数值,然后取余数得到它在表头中的位置table(篮子)里每个指针都指向一个链表(桶)来存储余数相同的值如果桶内的元素个数比篮子个数还多,则将篮子的大小扩充篮子是vector,数量是质数,初始为53,53扩充后为97......
  • 2024-03-01-Linux高级网络编程(6-原始套接字)
    6.原始套接字6.1TCPUDP回顾数据报式套接字(SOCK_DGRAM)无连接的socket,针对无连接的UDP服务可通过邮件模型来进行对比流式套接字(SOCK_STREAM)面向连接的socket,针对面向连接的TCP服务可通过电话模型来进行对比这两类套接字似乎涵盖了TCP/IP应用的全部TCP......