首页 > 系统相关 >信号量(Semaphore),事件Event(了解),队列补充,进程池和线程池(重点),协程理论,Greenlet,Gevent模块,asynico模块

信号量(Semaphore),事件Event(了解),队列补充,进程池和线程池(重点),协程理论,Greenlet,Gevent模块,asynico模块

时间:2024-06-02 17:21:53浏览次数:14  
标签:__ 协程 url asynico 模块 time print def

Ⅰ 信号量(Semaphore)

【一】什么是信号量

信号量Semahpore(同线程一样)

  • 互斥锁 : 允许在同一时刻只能有一个线程或进程同资源进行修改
  • 信号量 : 允许指定数量的进程或线程对资源进行修改

【二】例子

  • 比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去
  • 互斥锁 对整个厕所进行加锁,对一个坑进行加锁 ---> 每一次只能一个人用
  • 信号量 对整个厕所进行加锁,限制只有三个人能一起用 ---> 下一个人想用只能等上一个人结束才能用

【三】示例

  • 如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。
  • 一旦释放,就有人可以获得一把锁

信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process, Semaphore
import time
import random


def go_wc(sem, user):
    # 【1】对信号量加锁,一个进程占用则锁池 -1
    sem.acquire()

    print(f' {user} 占到一个茅坑')
    # 模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了
    time.sleep(random.randint(0, 3))

    # 【2】释放信号量锁
    sem.release()
    print(f'{user} 释放结束')


def main():
    print(f'main process start .... ')
    # 【一】创建一个信号量的池子 和互斥锁一样要先有一把锁来进行限制
    sem = Semaphore(3)
    # 用来存储所有的子进程
    p_l = [Process(target=go_wc, args=(sem, f'用户_{i}:>>>> ',)) for i in range(10)]
    # 启动多进程
    [i.start() for i in p_l]
    # 等待所有子进程执行完毕
    [i.join() for i in p_l]

    print(f'main process end .... ')


if __name__ == '__main__':
    main()
    
# main process start .... 
#  用户_7:>>>>  占到一个茅坑
#  用户_2:>>>>  占到一个茅坑
#  用户_1:>>>>  占到一个茅坑
# 用户_1:>>>>  释放结束
#  用户_4:>>>>  占到一个茅坑
# 用户_4:>>>>  释放结束用户_2:>>>>  释放结束
#  用户_8:>>>>  占到一个茅坑 用户_0:>>>>  占到一个茅坑
# 
# 
# 用户_8:>>>>  释放结束
#  用户_5:>>>>  占到一个茅坑
# 用户_7:>>>>  释放结束
#  用户_9:>>>>  占到一个茅坑
# 用户_5:>>>>  释放结束用户_0:>>>>  释放结束
#  用户_6:>>>>  占到一个茅坑 用户_3:>>>>  占到一个茅坑
# 
# 
# 用户_3:>>>>  释放结束
# 用户_6:>>>>  释放结束
# 用户_9:>>>>  释放结束
# main process end .... 

Ⅱ 事件Event(了解)

  • Event(同线程一样)

【一】语法引入

from multiprocessing import  Event  

【二】事件处理方法

  • python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
  • 事件处理的机制:
    • 全局定义了一个“Flag”
    • 如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞
    • 如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    • clear:
      • 将“Flag”设置为False
    • set:
    • 将“Flag”设置为True
from multiprocessing import Process, Event, Manager
import time
import random

# 能给你的python解释器终端显示的内容上色
color_red = '\033[31m'
color_reset = '\033[0m'
color_green = '\033[32m'


# 定义车辆行为函数,它会根据事件对象(event)的状态来决定是否等待或通过路口
def car(event, n):
    '''
    模拟汽车行为
    :param event: 事件对象,用于同步操作,红绿灯状态的标识
    :param n: 第几辆车的标识
    :return: 无返回值
    '''

    # 创建一个无限循环直到车辆离开
    while True:
        # 如果事件未设置,表示红灯 False
        if not event.is_set():
            # 红灯亮时,车辆等待,并输出提示信息
            print(f'{color_red}红灯亮{color_reset},car{n} :>>> 正在等待 ... ')
            # 阻塞当前进程,等待event被设置(绿灯)
            event.wait()
            # 当event被设置后,打印绿灯信息
            print(f'{color_green}车 {n} :>>>> 看见绿灯亮了{color_reset}')
            # 模拟通过路口需要的时间
            time.sleep(random.randint(3, 6))
            # 防止在sleep期间event状态改变,再次检查状态
            if not event.is_set():
                continue
            # 通过路口
            print(f'car {n} :>>>> 安全通行')
            # 退出循环
            break


# 定义 警车 行为函数, 警车 在红灯时等待一秒后直接通过
def police_car(event, n):
    '''
    模拟 警车 行为
    :param event: 事件对象,用于同步操作,红绿灯状态的标识
    :param n: 第几辆车的标识
    :return: 无返回值
    '''
    while True:
        # 判断是否为红灯
        if not event.is_set():
            print(f'{color_red}红灯亮{color_reset},car{n} :>>> 正在等待 ... ')
            #  等待1秒,不完全遵守交通规则
            event.wait(1)
            print(f'灯的是{event.is_set()},警车走了,car{n}')
            # 通过后立即结束循环
            break


# 定义交通灯控制函数,周期性切换红绿灯状态
def traffic_lights(event, interval):
    '''
    模拟 交通灯 行为
    :param event: 事件对象,用于同步操作,红绿灯状态的标识
    :param interval: 间隔(比如10秒)改变信号灯
    :return: 无返回值
    '''
    # 无限循环,持续控制交通灯状态
    while True:
        # 按照给定间隔(比如10秒)改变信号灯
        time.sleep(interval)
        # 如果当前是绿灯
        if event.is_set():
            # 切换到红灯状态
            # event.is_set() ---->False
            event.clear()
        else:
            # 如果当前是红灯,则切换到绿灯状态
            # event.is_set() ----> True
            event.set()


def main():
    # 初始化事件对象,初始状态为清除(即红灯)
    event = Event()
    # 使用Manager创建一个可跨进程共享的Event对象
    # manager = Manager()
    # event = manager.Event()

    # 创建并启动多个 警车/car 的进程   运行那个 打开下面代码就行
    for i in range(5):
        police_car_process = Process(target=police_car, args=(event, i))
        police_car_process.start()
    # for i in range(5):
    #     police_car_process = Process(target=car, args=(event, i))
    #     police_car_process.start()


    # 启动交通灯控制进程
    # 交通灯变化周期为10秒
    traffic_lights_process = Process(target=traffic_lights, args=(event, 10))
    traffic_lights_process.start()

    # 打印分割线,表明程序运行开始
    print(' ------------交通开始------------- ')


if __name__ == '__main__':
    main()

'''
 ------------交通开始------------- 
红灯亮,car0 :>>> 正在等待 ... 
红灯亮,car1 :>>> 正在等待 ... 
红灯亮,car2 :>>> 正在等待 ... 
红灯亮,car4 :>>> 正在等待 ... 
红灯亮,car3 :>>> 正在等待 ... 
车 1 :>>>> 看见绿灯亮了
车 2 :>>>> 看见绿灯亮了
车 3 :>>>> 看见绿灯亮了
车 4 :>>>> 看见绿灯亮了
车 0 :>>>> 看见绿灯亮了
car 0 :>>>> 安全通行
car 2 :>>>> 安全通行car 1 :>>>> 安全通行

car 4 :>>>> 安全通行
car 3 :>>>> 安全通行

##############################################

 ------------交通开始------------- 
红灯亮,car0 :>>> 正在等待 ... 
红灯亮,car2 :>>> 正在等待 ... 
红灯亮,car3 :>>> 正在等待 ... 
红灯亮,car1 :>>> 正在等待 ... 
红灯亮,car4 :>>> 正在等待 ... 
灯的是False,警车走了,car1
灯的是False,警车走了,car0
灯的是False,警车走了,car3
灯的是False,警车走了,car2
灯的是False,警车走了,car4
'''

【补充】打印到控制台的文本颜色

  • 在Python中,可以使用ANSI转义码来设置打印到控制台的文本颜色。

  • \033[是一个开始 ANSI 序列的转义字符,之后的数字和字母组合定义了具体的样式改变。

  • 以下是对\033[设置]的一个详细解释及示例:

【1】基本语法

ANSI 转义序列的一般形式为:\033[属性;...;属性m,其中:

  • \033[ 是开始标记。
  • 属性之间用分号;隔开,可以指定多个属性(如前景色、背景色)。
  • m 表示设置结束。

【2】常用属性

  • 前景色:30-37 代表不同的颜色,加上40可以使颜色变为亮色系。
    • 30: 黑色
    • 31: 红色
    • 32: 绿色
    • 33: 黄色
    • 34: 蓝色
    • 35: 紫红色(洋红)
    • 36: 青蓝色(青色)
    • 37: 白色
  • 背景色:40-47,与前景色类似,但用于背景。
  • 重置:0,用于清除所有属性设置,回到默认状态。

【3】示例

  • 使用ANSI转义码来改变打印文本的颜色和背景色:
print("\033[31m这是红色文本\033[0m")  # 红色前景色
print("\033[32m这是绿色文本\033[0m")  # 绿色前景色
print("\033[1;33m这是加粗的黄色文本\033[0m")  # 加粗并设置黄色前景色
print("\033[44m这是蓝色背景文本\033[0m")  # 蓝色背景色
print("\033[34;47m这是蓝色前景色和白色背景色的文本\033[0m")  # 蓝色前景和白色背景

image-20240524205809513

【4】注意事项

  • 并非所有的终端都支持ANSI转义码,特别是在一些老版本或非常轻量级的终端中。

  • 在Windows的命令提示符中,默认不支持ANSI转义码,但自从Windows 10开始,通过启用某些设置或使用第三方工具(如ANSICON)可以使其支持。

  • 如果你想让你的脚本更具有跨平台兼容性,可以考虑使用像colorama这样的库,它可以在不支持ANSI转义的平台上模拟这些功能。

Ⅲ 队列补充

【一】语法引入

import queue
q = queue.Queue(maxsize)
# maxsize 如果不给默认值,这个队列的容量就是无限大
from queue import Queue, LifoQueue, PriorityQueue  

【二】方法介绍

  • maxsize 不给默认值,这个队列的容量就是无限大

  • 放:

    • q.put() 向队列中插入数据
    • q.put_nowait() 向队列中插入数据,如果队列满了返回False
    • q.get() 从队列中获取数据
    • q.get_nowait() 从队列中获取数据,如果队列没有数据返回False
  • q.empty() 判断当前队列是否空了

  • q.full() 判断当前队列是否满了

  • q.qsize() 获取当前队列中存在的数据量

【1】Queue 先进先出

from queue import Queue


queue_normal = Queue(3)
queue_normal.put(1)
queue_normal.put(2)
queue_normal.put(3)

print(queue_normal.get())
print(queue_normal.get())
print(queue_normal.get())

# 1
# 2
# 3

【2】LifoQueue 后进先出

from queue import LifoQueue

queue_lifo = LifoQueue(3)

queue_lifo.put(1)
queue_lifo.put(2)
queue_lifo.put(3)

print(queue_lifo.get())
print(queue_lifo.get())
print(queue_lifo.get())

# 3
# 2
# 1

【3】PriorityQueue : 根据优先级数字越小的先出


from queue import PriorityQueue


queue_priority = PriorityQueue(3)
# 可以给放进队列的元素设置优先级:数字越小优先级越高!
queue_priority.put((50, 111))
queue_priority.put((0, 222))
queue_priority.put((100, 333))

print(queue_priority.get())
print(queue_priority.get())
print(queue_priority.get())


# (0, 222)
# (50, 111)
# (100, 333)

Ⅳ 进程池和线程池(重点)

【一】回顾TCP实现并发的效果的原理

  • tcp服务端实现并发
    • 多进程:来一个客户端就开一个进程
    • 多线程:来一个客户端就开一个线程

补充:

  • 服务端必备三要素:
    • 1.全天24小时不断提供服务
    • 2.固定的ip和port
    • 3.自持高并发

【1】服务端

from socket import *
from threading import Thread


def server_create(IP, PORT):
    server = socket()

    server.bind((IP, PORT))

    server.listen(5)

    while True:
        conn, addr = server.accept()
        t = Thread(target=conn_communication, args=(conn,))


def conn_communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:
                break
            print(data.decode('utf8'))
            conn.send(data.upper())
        except Exception as e:
            print(e)
            break
    conn.close()


def main():
    IP = '127.0.0.1'
    PORT = 8086
    t = Thread(target=server_create, args=(IP, PORT,))
    t.start()


if __name__ == '__main__':
    main()

【2】客户端

from socket import *


def client_create(IP, PORT):
    client = socket()

    client.connect((IP, PORT))

    while True:
        word = b'hello world'
        client.send(word)

        msg = client.recv(1024)
        print(msg)


if __name__ == '__main__':
    IP = '127.0.0.1'
    PORT = 8086

    client_create(IP, PORT)
  • 电脑资源是有限的
  • 无论是开设进程还是开设线程,都需要消耗资源
  • 只不过开始线程消耗的资源比开始进程相对少一些

【二】什么是池

  • 池是用来保证计算机硬件安全的情况下最大限度的利用计算机
  • 池降低了程序的运行效率,但是保证了计算机硬件的安全,从而保证程序的正常运行

【三】措施

  • 池:
    • 保证计算机硬件安全的情况下提升程序的运行效率
  • 进程池:
    • 提前创建好固定数量的进程 后续反复使用这些进程(好比永久的合同工)
  • 线程池
    • 提前创建好固定数量的线程 后续反复使用这些线程

如果超出池子里最大的进程或线程 则原地排队等待

  • 强调:
    • 进程池和线程池其实是降低了程序的运行效率 但是保证了数据的安全

【四】线程池

【1】语法引入

from concurrent.futures import ThreadPoolExecutor

# 线程池
pool = ThreadPoolExecutor()  # 线程池默认开设当前计算机 cpu 个数五倍数的线程数

pool = ThreadPoolExecutor(5) # 线程池个数默认是CPU个数的5倍
'''上面代码执行之后立即会创建五个等待工作的线程'''

 pool.submit(task, i).add_done_callback(func)
    '''add_done_callback只要任务有结果了 就会自动调用括号内的函数处理'''

【2】原理

  • 池子造出来后 里面固定存在五个线程
  • 这五个线程不会存在出现重复创建和销毁的过程

【3】优点

  • 避免了重复创建五个线程的资源开销

【4】使用方法

(1)任务的提交方式

  • 同步:提交任务之后原地等待任务的返回结果,期间不做任何事
  • 异步:提交任务之后不等待任务的返回结果,继续执行代码

【2】代码演示

from concurrent.futures import ThreadPoolExecutor
import time
from threading import current_thread
import os

# 线程池
# pool = ThreadPoolExecutor(5)  # 线程池线程数默认是CPU个数的五倍 也可以自定义
'''上面的代码执行之后就会立刻创建五个等待工作的线程'''


def task(n):
    time.sleep(2)
    print(n)
    # print(current_thread().name) # 查看有几个线程
    return '任务的执行结果:%s'%n**2


def func(*args, **kwargs):
    # print(args, kwargs)
    print(args[0].result())

for i in range(20):
    # res = pool.submit(task, i)  # 朝线程池中提交任务(异步)
    # print(res.result())  # 同步提交(获取任务的返回值)
    '''不应该自己主动等待结果 应该让异步提交自动提醒>>>:异步回调机制'''
    pool.submit(task, i).add_done_callback(func)
    """add_done_callback只要任务有结果了 就会自动调用括号内的函数处理"""

   
"""
任务的提交方式
    同步
        提交任务之后原地等待任务的结果
    异步
        提交任务之后不原地等待任务的结果 结果通过反馈机制自动获取
"""

【五】进程池

【1】语法引入

from concurrent.futures import ProcessPoolExecutor

# 线程池
pool = ProcessPoolExecutor()  # 进程池进程数默认是CPU个数 也可以自定义

pool = ProcessPoolExecutor(5)  # 进程池进程数默认是CPU个数 也可以自定义
'''上面代码执行之后立即会创建五个等待工作的进程'''

 pool.submit(task, i).add_done_callback(func)
    '''add_done_callback只要任务有结果了 就会自动调用括号内的函数处理'''

【2】原理

  • 池子造出来后 里面固定存在五个进程
  • 这五个进程不会存在出现重复创建和销毁的过程

【3】优点

  • 避免了重复创建五个进程的资源开销

【4】使用方法

(1)任务的提交方式

  • 同步:提交任务之后原地等待任务的返回结果,期间不做任何事
  • 异步:提交任务之后不等待任务的返回结果,继续执行代码

【2】代码演示

from concurrent.futures import ProcessPoolExecutor
import time
from threading import current_thread
import os


# 进程池
pool = ProcessPoolExecutor(5)  # 进程池进程数默认是CPU个数 也可以自定义
'''上面的代码执行之后就会立刻创建五个等待工作的进程'''


def task(n):
    time.sleep(2)
    print(n)
    # print(os.getpid())
    return '任务的执行结果:%s'%n**2


def func(future):
    # print(os.getpid()) # 查看进程池个数
    print(future.result())

def main():
    for i in range(20):
        # res = pool.submit(task, i)  # 朝线程池中提交任务(异步)
        # print(res.result())  # 同步提交(获取任务的返回值)
        '''不应该自己主动等待结果 应该让异步提交自动提醒>>>:异步回调机制'''
        # pool.submit(task, i).add_done_callback(func)
        """add_done_callback只要任务有结果了 就会自动调用括号内的函数处理"""

        pool.submit(task, i).add_done_callback(func)

if __name__ == '__main__':
    main()

【六】进程池,线程池模板推导

# 【一】池的概念
# 池就是用来保证计算机硬件安全的情况下最大限度的利用计算机

# 【二】线程池
# 【1】引入模块
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import random
import os

# 【2】创建池子
# 在后台的空间中就已经开辟好了五个池子,就等到子线程去用这个池子
# 避免了资源的重复开设和销毁
# poll = ProcessPoolExecutor(5)
poll = ThreadPoolExecutor(5)

'''

def work_add(result):
    return result + result


# 【3】定义多线程任务
def work(name):
    print(f'{name} is starting ... ')
    sleep_time = random.randint(1, 4)
    print(f'{name} start sleeping for {sleep_time}s')
    time.sleep(sleep_time)
    print(f'{name} end sleeping for {sleep_time}s')
    print(f'{name} is ending ... ')
    result = name * name
    result = work_add(result)
    print(f'{name} 的 result ::>>>> {result} ')
    
# 同步调教 : 提交任务之后原地等待结果,不做任何事
def main_thread():
    print(f'main process start ... ')
    for i in range(1, 3):
        poll.submit(work, i)
    print(f'main process end ... ')

'''

'''
def work_add(result):
    return result + result


# 【3】定义多线程任务
def work(name):
    print(f'{name} is starting  pid {os.getpid()} ppid {os.getppid()}... ')
    sleep_time = random.randint(1, 4)
    print(f'{name} start sleeping for {sleep_time}s')
    time.sleep(sleep_time)
    print(f'{name} end sleeping for {sleep_time}s')
    print(f'{name} is ending ... ')
    return name * name


# 异步提交 : 提交任务之后不需要原地等待结果,去做其他事
def main():
    task_list = []
    print(f'main process start pid {os.getpid()} ppid {os.getppid()}... ')
    for i in range(1, 5):
        task_obj = poll.submit(work, i)
        task_list.append(task_obj)
    # 所有子线程结束后才能获取到所有的结果
    poll.shutdown()
    # 逐个获取每一个对象的结果
    for task_obj in task_list:
        print(task_obj.result())
    print(f'main process end ... ')
'''


def work_add(result):
    return result + result


# 【3】定义多线程任务
def work(name):
    print(f'{name} is starting  pid {os.getpid()} ppid {os.getppid()}... ')
    sleep_time = random.randint(1, 4)
    print(f'{name} start sleeping for {sleep_time}s')
    time.sleep(sleep_time)
    print(f'{name} end sleeping for {sleep_time}s')
    print(f'{name} is ending ... ')
    return name * name


def call_back(n):
    print(f'call_back :>> n :>>> {n}')
    # <Future at 0x2a83d694f10 state=finished returned int>
    print(f'call_back :>> n.result() :>>> {n.result()}')
    # call_back :>> n.result() :>>> 16


def main():
    print(f'main process start pid {os.getpid()} ppid {os.getppid()}... ')
    for i in range(1, 5):
        # task_obj = poll.submit(work, i)
        # 没有加 add_done_callback 时候的 task_obj:  <Future at 0x1c21ab89e50 state=running>
        # 增加一个 add_done_callback : 子线程任务结束后自动调用异步回到函数 call_back
        poll.submit(work, i).add_done_callback(call_back)
        # 加了 add_done_callback  时候的 task_obj : None
    # 所有子线程结束后才能获取到所有的结果
    poll.shutdown()
    print(f'main process end ... ')


if __name__ == '__main__':
    main()

Ⅴ 协程理论

进程 --> 线程(进程下的进程) --> 协程(线程下的线程)

【一】基于单线程来实现并发

【1】并发的本质

  • 本节的主题是基于单线程来实现并发
    • 即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发
    • 为此我们需要先回顾下并发的本质:
      • 切换+保存状态
  • CPU正在运行一个任务
    • 会在两种情况下切走去执行其他的任务(切换由操作系统强制控制)
    • 一种情况是该任务发生了阻塞
    • 另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它。

【2】yield关键字

  • 其中第二种情况并不能提升效率
  • 只是为了让cpu能够雨露均沾
  • 实现看起来所有任务都被“同时”执行的效果
  • 如果多个任务都是纯计算的
  • 这种切换反而会降低效率。
  • 为此我们可以基于yield来验证
    • yield本身就是一种在单线程下可以保存任务运行状态的方法
yield可以保存状态
	yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
    让你看起来其实就是在并行运行
send可以把一个函数的结果传给另外一个函数
	以此实现单线程内程序之间的切换
  • 单纯地切换反而会降低运行效率

(1)串行执行

import time


def func1():
    for i in range(10000000):
        i + 1


def func2():
    for i in range(10000000):
        i + 1


start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)
# 0.9035823345184326

(2)基于yield并发执行

# 生成器的创建方式 :
# 元组生成式
# print((i for i in range(6)))
# <generator object <genexpr> at 0x00000277BCD2AAC0>
# 函数 + yield 的返回值

def foo():
    while True:
        yield
print(foo())
import time

def func1():
    while True:
        print(f'回来了')
        yield


def func2():
    g = func1()
    # g :生成器对象
    # 在这里负责图片链接的抓取,抓取到一个图片链接
    for i in range(10000000):
        print(f'开始算了')
        i + 1
        next(g)  # 发给生成器 send 生成器负责下载


def main():
    start = time.time()
    func2()
    stop = time.time()
    print(stop - start)


if __name__ == '__main__':
    main()

#  1.3168463706970215

【3】实现遇到IO自动切换

  • 第一种情况的切换。
  • 在任务一遇到IO情况下
  • 切到任务二去执行
  • 这样就可以利用任务一阻塞的时间完成任务二的计算
  • 效率的提升就在于此。
  • yield不能检测IO
    • 实现遇到IO自动切换
import time


def func1():
    while True:
        print('func1')
        yield


def func2():
    g = func1()
    for i in range(5):
        i + 1
        next(g)
        time.sleep(3)
        print('func2')

def main():
    start = time.time()
    func2()
    stop = time.time()
    print(stop - start)

if __name__ == '__main__':
    main()

【二】协程介绍

【1】什么是协程

  • 是单线程下的并发,又称微线程,纤程。英文名Coroutine。

  • 一句话说明什么是线程:

    • 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
  • 需要强调的是:

    • python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
    • 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
  • 对比操作系统控制线程的切换,用户在单线程内控制协程的切换

【2】优点

  • 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
  • 单线程内就可以实现并发的效果,最大限度地利用cpu
  • 应用程序级别速度要远远高于操作系统的切换

【3】缺点

  • 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
  • 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程(多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他的任务都不能执行了)

【4】总结

  • 1.必须在只有一个单线程里实现并发
  • 2.修改共享数据不需加锁
  • 3.用户程序里自己保存多个控制流的上下文栈
  • 4.附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
import time
import random
from multiprocessing import Process
from threading import Thread


def work(name):
    print(f'{name} start ... ')
    time.sleep(random.randint(1, 3))
    print(f'{name} end ... ')
    main(cls=Thread)


def main(cls):
    task_list = [cls(target=work, args=(i,)) for i in range(10)]
    [task.start() for task in task_list]
    [task.join() for task in task_list]


def main_process():
    main(cls=Process)


if __name__ == '__main__':
    main_process()

# 千万不要不信,去尝试上面的代码运行取来,否则后果很严重

Ⅵ Greenlet

  • 如果我们在单个线程内有20个任务
    • 要想实现在多个任务之间切换
    • 使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦
    • 而使用greenlet模块可以非常简单地实现这20个任务直接的切换

【一】安装

pip3 install greenlet

【二】使用

from greenlet import greenlet
import time
import random
import os

# 定义协程函数
def work(name):
    print(f'{name} start .... ')
    sleep_time = random.randint(1, 4)
    print(f'{name} start sleeping {sleep_time}s')
    time.sleep(sleep_time)
    print(f'{name} end sleeping {sleep_time}s')
    print(f'{name} end .... ')


# 开启协程
# 串行运行 : 印象效率
def main():
    for i in range(5):
        g = greenlet(work)
        g.switch(i)

if __name__ == '__main__':
    main()

【3】单纯切换降低效率

  • 单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度
def add_number_normal():
    res = 1
    for i in range(1, 100000):
        res += i


def modify_number_normal():
    res = 1
    for i in range(1, 100000):
        res *= i


def main_normal():
    start_time = time.time()
    add_number_normal()
    modify_number_normal()
    end_time = time.time()
    print(f'总耗时 :>>>> {end_time - start_time}s')


def add_number_greenlet(modify_number_greenlet_two, modify_number_greenlet_one):
    res = 1
    for i in range(1, 100000):
        res += i
        modify_number_greenlet_two.switch(modify_number_greenlet_two, modify_number_greenlet_one)


def modify_number_greenlet(add_number_greenlet_one, modify_number_greenlet_two):
    res = 1
    for i in range(1, 100000):
        res *= i
        add_number_greenlet_one.switch(add_number_greenlet_one, modify_number_greenlet_two)


def main():
    start_time = time.time()
    # 创建两个 greenlet 对象
    add_number_greenlet_one = greenlet(add_number_greenlet)
    modify_number_greenlet_two = greenlet(modify_number_greenlet)

    # 把两个 greenlet 对象作为参数传递给 函数
    add_number_greenlet_one.switch(modify_number_greenlet_two, add_number_greenlet_one)
    modify_number_greenlet_two.switch(add_number_greenlet_one, modify_number_greenlet_two)
    end_time = time.time()
    print(f'总耗时 :>>>> {end_time - start_time}s')


if __name__ == '__main__':
    # main_normal()
    # 总耗时 :>>>> 3
    main()
    # 总耗时 :>>>> 2

# greenlet 模块能够帮助我们实现进程或线程之间的切换
# 一旦遇到IO阻塞就开始切换

【三】小结

  • greenlet只是提供了一种比generator更加便捷的切换方式

    • 当切到一个任务执行时如果遇到io
    • 那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
  • 单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作

    • 我们完全可以在执行任务1时遇到阻塞
    • 就利用阻塞的时间去执行任务2。。。。
    • 如此,才能提高效率,这就用到了Gevent模块。

Ⅶ Gevent模块

【一】介绍

  • Gevent 是一个第三方库
  • 可以轻松通过gevent实现并发同步或异步编程
  • 在gevent中用到的主要模式是Greenlet
  • 它是以C扩展模块形式接入Python的轻量级协程。
  • Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

【二】安装

pip install gevent 

【三】使用

import gevent


def func(*args, **kwargs):
    print(args)  # (1, 2, 3)
    print(kwargs)  # {'x': 4, 'y': 5}
    return 'ok'


def func2():
    ...


# 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g1 = gevent.spawn(func, 1, 2, 3, x=4, y=5)

# 创建一个协程对象g2
g2 = gevent.spawn(func2)

g1.join()  # 等待g1结束

g2.join()  # 等待g2结束

# 或者上述两步合作一步:gevent.joinall([g1,g2])

# 拿到func1的返回值
result = g1.value

print(result)
# ok

【四】遇到IO阻塞时会自动切换任务

import gevent


def eat(name):
    print('%s eat 1' % name)
    gevent.sleep(2)
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' % name)
    gevent.sleep(1)
    print('%s play 2' % name)


g1 = gevent.spawn(eat, 'silence')
g2 = gevent.spawn(play, name='mark')
g1.join()
g2.join()

# 或者gevent.joinall([g1,g2])

print('主')

# silence eat 1
# mark play 1
# mark play 2
# silence eat 2
# 主
  • 上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞

【五】兼容其他IO

# 如果你想兼容其他的IO阻塞 需要打补丁
# 补丁叫猴子补丁
from gevent import monkey

monkey.patch_all()
  • 而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

  • from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面

    • 如time,socket模块之前
  • **或者我们干脆记忆成:要用gevent,需要将from gevent import monkey; **

  • monkey.patch_all()放到文件的开头

from gevent import monkey
import gevent
import time
import threading

monkey.patch_all()




def eat():
    print('eat food 1')
    time.sleep(2)
    print(f"eat 中的 :>>>> {threading.current_thread()}")
    print('eat food 2')


def play():
    print('play 1')
    time.sleep(1)
    print(f"play 中的 :>>>> {threading.current_thread()}")
    print('play 2')

start_time = time.time()
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1, g2])

print(threading.current_thread())

print('主')
print(f"总耗时 :>>> {time.time() - start_time}s")
# 总耗时 :>>> 2.026843547821045s
# eat food 1
# play 1
# play 中的 :>>>> <_DummyThread(Dummy-1, started daemon 1978834756736)>
# play 2
# eat 中的 :>>>> <_DummyThread(Dummy-2, started daemon 1978834750816)>
# eat food 2
# <_MainThread(MainThread, started 1978808969472)>
# 主
# 总好受 :>>> 2.026843547821045s
  • 我们可以用threading.current_thread()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程

【六】Gevent之同步与异步

from gevent import spawn, joinall, monkey

monkey.patch_all()

import time


def timer(func):
    def inner(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print(f'当前程序 {func.__name__} 总耗时 :>>>> {time.time() - start} s')
        return res

    return inner


def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


@timer
def synchronous():
    for i in range(10):
        task(i)


@timer
def asynchronous():
    g_l = [spawn(task, i) for i in range(10)]
    joinall(g_l)


if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()

    # Synchronous:
    # Task 0 done
    # Task 1 done
    # Task 2 done
    # Task 3 done
    # Task 4 done
    # Task 5 done
    # Task 6 done
    # Task 7 done
    # Task 8 done
    # Task 9 done
    # 当前程序 synchronous 总耗时 :>>>> 5.034381151199341 s
    # Asynchronous:
    # Task 0 done
    # Task 1 done
    # Task 2 done
    # Task 3 done
    # Task 4 done
    # Task 5 done
    # Task 6 done
    # Task 7 done
    # Task 8 done
    # Task 9 done
    # 当前程序 asynchronous 总耗时 :>>>> 0.504889726638794 s

# 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
# 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。
# 执行流程只会在 所有greenlet执行完后才会继续向下走。

【七】Gevent之应用举例一

  • 协程应用:爬虫
from gevent import monkey

monkey.patch_all()
import gevent
import requests
import time


def get_page(url):
    print(f"当前正在获取 :>>>> {url}")
    response = requests.get(url=url)
    if response.status_code == 200:
        print(f'当前响应数据总长度 :>>>> {len(response.text)} 当前链接 :>>>> {url}')


def main_gevent():
    start_time = time.time()
    url_list = ['https://www.python.org/', 'https://www.jd.com/', 'https://www.baidu.com/']
    gevent.joinall([gevent.spawn(get_page, url) for url in url_list])
    stop_time = time.time()
    print(f'总耗时 :>>>> {stop_time - start_time}s')


# main_gevent()
# 当前正在获取 :>>>> https://www.python.org/
# 当前正在获取 :>>>> https://www.jd.com/
# 当前正在获取 :>>>> https://www.baidu.com/
# 当前响应数据总长度 :>>>> 221581 当前链接 :>>>> https://www.jd.com/
# 当前响应数据总长度 :>>>> 2443 当前链接 :>>>> https://www.baidu.com/
# 当前响应数据总长度 :>>>> 50464 当前链接 :>>>> https://www.python.org/
# 总耗时 :>>>> 0.42029809951782227s

def main_normal():
    start_time = time.time()
    url_list = ['https://www.python.org/', 'https://www.jd.com/', 'https://www.baidu.com/']
    for url in url_list:
        get_page(url=url)
    stop_time = time.time()
    print(f'总耗时 :>>>> {stop_time - start_time}s')


main_normal()
# 当前正在获取 :>>>> https://www.python.org/
# 当前响应数据总长度 :>>>> 50464 当前链接 :>>>> https://www.python.org/
# 当前正在获取 :>>>> https://www.jd.com/
# 当前响应数据总长度 :>>>> 218539 当前链接 :>>>> https://www.jd.com/
# 当前正在获取 :>>>> https://www.baidu.com/
# 当前响应数据总长度 :>>>> 2443 当前链接 :>>>> https://www.baidu.com/
# 总耗时 :>>>> 0.8800139427185059s

Ⅷ asynico模块

  • 基于 async 和 await 关键字的协程可以实现异步编程,这也是目前 Python 异步相关的主流技术。在这里我们主要介绍一下实现异步的模块:asyncio 模块

【一】asynico模块

  • asyncio 模块是 Python 中实现异步的一个模块,该模块在 Python3.4 的时候发布
  • async 和 await 关键字在 Python3.5 中引入。
  • 因此,想要使用asyncio模块,建议 Python 解释器的版本不要低于 Python3.5 。

【二】事件循环

  • 所谓的事件循环,我们可以把它当作是一个 while 循环,这个 while 循环在循环生命周期内运行并执行一些任务,在特定的条件下结束循环。
  • 在编写程序的时候可以通过如下代码来获取和创建事件循环:
import asyncio

loop = asyncio.get_event_loop()

【三】协程对象和协程函数

【1】什么是协程函数

带了 asynic 关键的函数就叫协程函数

  • 首先我们来看一下协程函数

    • 什么是协程函数呢?
  • 直白的讲,定义为如下形式的函数

  • 我们可以称之为协程函数,如下代码所示:

# 使用 async 声明的函数就是协程函数
async def fn():
    pass

【2】什么是协程对象

  • 知道了什么是协程函数

    • 接下来我们再来看一下什么是协程对象
    • 所谓的协程对象就是调用协程函数之后返回的对象
    • 我们称之为 协程对象,如下代码所示:
# 使用 async 声明的函数就是协程函数
async def fn():
    pass


# 调用携程函数得到的对象就是协程对象
res = fn()
print(res) # <coroutine object fn at 0x1029684a0>
  • 注意事项:调用协程函数时,函数内部的代码不会执行,只是会返回一个协程对象!

【四】协程函数应用

【1】基本应用

  • 在编写程序的时候,如果想要执行协程函数内部的代码,通过 函数名() 调用函数是不可以的,需要 事件循环协程对象 配合才能实现,如下代码所示:
import time


def timer(fucn):
    def inner(*args, **kwargs):
        start = time.time()
        result = fucn(*args, **kwargs)
        print(f'总耗时 {time.time() - start}s')
        return result

    return inner


# 【二】基本使用
import asyncio


# 定义协程函数

async def modify(name):
    print(f'这是modify函数内部 :>>>> {name}')

    return name * name


def modify_one(name):
    print(f'这是modify函数内部 :>>>> {name}')

    return name * name


async def add(name):
    print(f'这是add函数内部 :>>>> {name}')
    # 这个等价于 gevent.sleep(1) 约等于= time.sleep(1)
    # await asyncio.sleep(2)
    # res = await modify(name=name)
    res = modify_one(name=name)
    return res + res


# 开启协程
# 【1】方式一
def main_one():
    # (1)调用协程函数获取得到协程对象
    task_list = [add(i) for i in range(5)]

    # (2)创建一个事件循环
    loop = asyncio.get_event_loop()

    # (3)将上面的任务提交给事件循环
    # run 运行
    # until 直至
    # complete 完成
    # run_until_complete 返回的结果就是当前协程函数返回值
    res = [loop.run_until_complete(task) for task in task_list]
    print(res)


# 【2】方式二
@timer
def main_two():
    # (1)调用协程函数获取得到协程对象
    task_list = [add(i) for i in range(5)]

    # (2)将协程对象交给 run 运行
    res = [asyncio.run(task) for task in task_list]
    print(res)


# 【补充】在gevent内部要用 gevent.sleep()
# async 内部 也要用 asyncio.sleep()
# 如果我想要等待另一个函数的返回值,拿到另一个函数的返回值进行处理
# await 等待函数返回值


if __name__ == '__main__':
    main_two()
  • 这个过程可以简单理解为:

    • 协程函数 当做任务添加到 事件循环 的任务列表
    • 然后事件循环检测列表中的协程函数 是否已准备就绪(默认可理解为就绪状态)
    • 如果准备就绪则执行其内部代码。

【2】await关键字

  • await 是一个 只能 在协程函数中使用的关键字,用于当协程函数遇到IO操作的时候挂起当前协程(任务),
  • 当前协程挂起过程中,事件循环可以去执行其他的协程(任务)
  • 当前协程IO处理完成时,可以再次切换回来执行 await 之后的代码

(1)实例1

import asyncio


async def fn():
    print('协程函数内部的代码')

    # 遇到IO操作之后挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)
    response = await asyncio.sleep(2)  # 模拟遇到了IO操作

    print(f'IO请求结束,结果为:{response}')


def main():
    # 调用协程函数,返回一个协程对象
    res = fn()

    # 执行协程函数
    asyncio.run(res)


if __name__ == '__main__':
    main()

    '''
    运行结果:
    协程函数内部的代码
    IO请求结束,结果为:None
    '''

(2)实例2

import asyncio


async def other_tasks():
    print('start')
    await asyncio.sleep(2)  # 模拟遇到了IO操作
    print('end')
    return '返回值'


async def fn():
    print('协程函数内部的代码')

    # 遇到IO操作之后挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)
    response = await other_tasks()  # 模拟执行其他协程任务

    print(f'IO请求结束,结果为:{response}')


def main():
    # 调用协程函数,返回一个协程对象
    res = fn()

    # 执行协程函数
    asyncio.run(res)


if __name__ == '__main__':
    main()

    '''
    运行结果:
    协程函数内部的代码
    start
    end
    IO请求结束,结果为:返回值
    '''

(3)实例3

import asyncio
import time


async def other_tasks():
    print('start')
    await asyncio.sleep(2)  # 模拟遇到了IO操作
    print('end')
    return '返回值'


async def fn():
    print('协程函数内部的代码')

    # 遇到IO操作之后挂起当前协程(任务),等IO操作完成之后再继续往下执行。
    # 当前协程挂起时,事件循环可以去执行其他协程(任务)
    respnse1 = await other_tasks()
    print(f'IO请求结束,结果为:{respnse1}')

    respnse2 = await other_tasks()
    print(f'IO请求结束,结果为:{respnse2}')


def timer(func):
    def inner(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        print(f'总耗时 :>>> {time.time() - start}s')
        return result

    return inner


@timer
def main():
    # 调用协程函数,返回一个协程对象
    cor_obj = fn()

    # 执行协程函数
    asyncio.run(cor_obj)


if __name__ == '__main__':
    main()

    '''
    运行结果:
    协程函数内部的代码
    start
    end
    IO请求结束,结果为:返回值
    start
    end
    IO请求结束,结果为:返回值
    '''

(4)小结

  • 上述的所有实例都只是创建了一个任务

    • 即:事件循环的任务列表中只有一个任务
    • 所以在IO等待时无法演示切换到其他任务效果。
  • 在程序中想要创建多个任务对象

    • 需要使用Task对象来实现。

【3】Task 对象

  • Tasks 用于并发调度协程

  • 通过 asyncio.create_task(协程对象) 的方式创建 Task 对象

  • 这样可以让协程加入事件循环中等待被调度执行。

  • 除了使用 asyncio.create_task() 函数以外

  • 还可以用低层级的loop.create_task() 或 ensure_future() 函数。并且不建议手动实例化 Task 对象。

  • 本质上是将协程对象封装成 Task 对象

  • 并将协程立即加入事件循环,同时追踪协程的状态。

  • 注意事项:

    • asyncio.create_task() 函数在 Python3.7 中被加入。
    • 在 Python3.7 之前,可以改用低层级的
    • asyncio.ensure_future() 函数。

(1)协程运行方式一

  • async.run() 运行协程
  • async.create_task()创建task
import time
import asyncio

def timer(func):
    def inner(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        print(f'总耗时 :>>> {time.time() - start}s')
        return result

    return inner


async def other_tasks():
    print('start')
    await asyncio.sleep(2)  # 模拟遇到了IO操作
    print('end')
    return '返回值'


async def fn():
    print('fn开始')

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(other_tasks())

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task2 = asyncio.create_task(other_tasks())

    print('fn结束')

    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待相对应的协程全都执行完毕并获取结果
    response1 = await task1
    response2 = await task2
    print(response1, response2)

@timer
def main():
    asyncio.run(fn())


if __name__ == '__main__':
    main()
    
        '''
    运行结果:
    fn开始
    fn结束
    start
    start
    end
    end
    返回值 返回值
    总耗时 :>>> 2.013091802597046s
    '''

(2)协程运行方式二

import asyncio


async def other_tasks():
    print('start')
    await asyncio.sleep(2)  # 模拟遇到了IO操作
    print('end')
    return '返回值'


async def fn():
    print('fn开始')

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task_lis = [
        asyncio.create_task(other_tasks()),
        asyncio.create_task(other_tasks()),
    ]

    print('fn结束')
    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
    # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
    done, pending = await asyncio.wait(task_lis, timeout=None)

    print(f"done :>>>> {done}")
    print(f"pending :>>>> {pending}")


def main():
    asyncio.run(fn())


if __name__ == '__main__':
    main()

'''
fn开始
fn结束
start
start
end
end
done :>>>> {<Task finished name='Task-2' coro=<other_tasks() done, 
defined at E:\socket文件上传下载\socket\互斥锁.py:4> result='返回值'>,
 <Task finished name='Task-3' coro=<other_tasks() done,
  defined at E:\socket文件上传下载\socket\互斥锁.py:4> result='返回值'>}
pending :>>>> set()

'''   

(3)获取协程返回值

  • async.gather()获取返回值
import asyncio


async def other_tasks():
    print('start')
    await asyncio.sleep(2)  # 模拟遇到了IO操作
    print('end')
    return '返回值'


async def fn():
    print('fn开始')

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task_lis = [
        asyncio.create_task(other_tasks()),
        asyncio.create_task(other_tasks()),
    ]

    print('fn结束')
    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
    # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
    await asyncio.wait(task_lis, timeout=None)

    response = await asyncio.gather(task_lis[0], task_lis[1])  # 将task_lis作为参数传入gather,等异步任务都结束后返回结果列表

    print(f'response :>>>> {response}')


def main():
    asyncio.run(fn())


if __name__ == '__main__':
    main()

    '''
    fn开始
    fn结束
    start
    start
    end
    end
    response :>>>> ['返回值', '返回值']
    '''

【4】aiohtpp对象

  • 我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。

    • 本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。
    • 安装方式,pip install aiohttp。
  • aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。

    • asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,
    • aiohttp就是基于asyncio实现的http框架。
import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://httpbin.org/headers") as response:
            print(await response.text())

asyncio.run(main())

【五】异步迭代器

【1】什么是异步迭代器?

  • 实现了 aiter() 和 anext()方法的对象。
  • anext 必须返回一个 awaitable 对象。
  • async for会处理异步迭代器的 anext()方法所返回的可等待对象,直到其引发一个 StopAsyncIteration异常。

【2】什么是异步可迭代对象?

  • 可在 async for语句中被使用的对象。
  • 必须通过它的 aiter()方法返回一个 asynchronous iterator 。
import asyncio


class Reader:
    """ 自定义异步迭代器(同时也是异步可迭代对象) """

    def __init__(self):
        self.count = 0

    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val


async def fn():
    # 创建异步可迭代对象

    async_iter = Reader()
    # async for 必须放在async def 函数内,否则语法错误。
    async for item in async_iter:
        print(item)


asyncio.run((fn()))

【六】异步上下文管理器

  • 此种对象通过定义 aenter() 和 aexit() 方法来对 async with 语句中的环境进行控制。
import asyncio


class AsyncContextManager:

    def __init__(self):
        self.conn = None

    async def do_something(self):
        # 异步操作数据库
        return 123

    async def __aenter__(self):
        # 异步链接数据库
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)


async def fn():
    async with AsyncContextManager() as f:
        res = await f.do_something()
        print(res)


asyncio.run(fn())

【七】小结

  • 在程序中只要看到 asyncawait 关键字
  • 其内部就是基于协程实现的异步编程
  • 这种异步编程是通过一个线程在IO等待时间去执行其他任务,从而实现并发。

以上就是异步编程的常见操作,更多内容请参考 Python 官方文档:https://docs.python.org/zh-cn/3.8/library/asyncio.html

Ⅸ 案例(比较进程,线程和协程的下载速度)

  • 用爬虫 爬数据对比

【一】多进程和多线程和正常

import os
import time
from multiprocessing import Process
from threading import Thread
# pip install fake-useragent
from fake_useragent import UserAgent
# pip install requests
import requests

# pip install lxml
from lxml import etree


class BaseSpider(object):
    def __init__(self):
        self.base_area = 'https://pic.netbian.com'
        # 根目录
        self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
        # 创建请求头
        self.headers = {
            'User-Agent': UserAgent().random
        }

    def __create_etree(self, page_source):
        return etree.HTML(page_source)

    # 获取页面源码数据
    def get_tree(self, url):
        # 对目标地址发起请求并获得响应
        response = requests.get(url=url, headers=self.headers)
        # 因为数据是乱码 所以要对数据进行编码
        response.encoding = 'gbk'
        # 返回当前页面的源码数据
        page_source = response.text
        # 返回当前页面的解析器对象
        tree = self.__create_etree(page_source=page_source)
        return tree

    def spider_all_category(self):
        category_dict = {}
        tree = self.get_tree(url=self.base_area)
        # 直接使用xpath语法
        a_list = tree.xpath('//*[@id="main"]/div[2]/a')
        for a in a_list:
            # //*[@id="main"]/div[2]/a[1]
            # <a href="/4kdongman/" title="4K动漫图片">4K动漫</a>
            # 如果是获取标签的属性 href / title ---> /@属性名
            # 如果是获取标签中间的文本 4K动漫 ---> /text()
            # 上面取到的都是在列表中
            title = a.xpath('./text()')[0]
            href = self.base_area + a.xpath('./@href')[0]
            category_dict[title] = href

        return category_dict

    def spider_all_detail_source(self, url):
        img_data_dict = {}
        tree = self.get_tree(url=url)
        li_list = tree.xpath('//*[@id="main"]/div[4]/ul/li')
        for li in li_list:
            detail_href = self.base_area + li.xpath('./a/@href')[0]
            tree = self.get_tree(url=detail_href)
            img_detail_url = self.base_area + tree.xpath('//*[@id="img"]/img/@src')[0]
            img_title_ = tree.xpath('//*[@id="img"]/img/@title')[0].split(' ')[0]
            img_title = ''
            for item in img_title_:
                if item.isdigit() or item.isspace() or item in ['*', '-', 'x', '+', '\\', '/']:
                    pass
                else:
                    img_title += item
            img_data_dict[img_title] = img_detail_url
        return img_data_dict

    def create_url_list(self, start_page: int, end_page: int, base_url: str):
        url_list = []
        for page in range(start_page, end_page):
            if page == 1:
                # https://pic.netbian.com/4kdongman/
                page_url = base_url
                url_list.append(page_url)
            else:
                # https://pic.netbian.com/4kdongman/index_2.html
                page_url = base_url + f'index_{page}.html'
                url_list.append(page_url)
        return url_list

    def download_image(self, img_url: str, img_title: str, category_input: str):
        file_dir = os.path.join(self.BASE_DIR, category_input)
        os.makedirs(file_dir, exist_ok=True)
        file_path = os.path.join(file_dir, img_title + '.png')
        response = requests.get(url=img_url, headers=self.headers)
        content = response.content
        with open(file_path, 'wb') as fp:
            fp.write(content)
        print(f'当前图片 :>>> 标题 {img_title} 下载成功! 链接 :>>>> {img_url}!')

    @staticmethod
    def show_time(func):
        def inner(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            print(f'总耗时 {func.__name__} :>>>> {end_time - start_time} s!')
            return result

        return inner

    def main_download_cls(self, cls, img_data_dict, category_input):
        task_list = []
        count = 0
        for title, url in img_data_dict.items():
            count += 1
            print(f'当前是第 {count} 张图片!')
            task = cls(
                target=self.download_image,
                args=(url, title, category_input)
            )
            task.start()
            task_list.append(task)
        for task in task_list:
            task.join()

    @show_time
    def main_download(self, start_page, end_page, category_href, category_input, func_id):
        # 构建所有目标地址
        target_url_list = self.create_url_list(start_page=int(start_page),
                                               end_page=int(end_page),
                                               base_url=category_href)
        print(f'当前所有目标地址构建完成 :>>>> {target_url_list}')
        print(f'------------------------------------------------')
        img_data_dict = {}
        # 请求每一页的图片详细连接
        for target_url in target_url_list:
            print(f'当前抓取图片首页地址连接为 :>>>> {target_url}')
            img_dict = self.spider_all_detail_source(url=target_url)
            img_data_dict.update(img_dict)
        print(f'当前所有图片连接构建完成 :>>>> {img_data_dict}')
        print(f'------------------------------------------------')
        print(f'开始下载 :>>>> ')
        # 下载每一张图片
        if func_id == '1':
            count = 0
            for title, url in img_data_dict.items():
                count += 1
                print(f'当前是第 {count} 张图片!')
                self.download_image(img_url=url, img_title=title, category_input=category_input)
            # 总耗时 main_download :>>>> 31.696726083755493 s!
            # 1 - 5 页 53 张
        elif func_id == '2':
            self.main_download_cls(Process, img_data_dict, category_input)
            # 总耗时 main_download :>>>> 25.603846788406372 s!
            # 1-5页 54 张
        elif func_id == '3':
            self.main_download_cls(Thread, img_data_dict, category_input)
            # 总耗时 main_download :>>>> 25.290791749954224 s!
            # 1-5页 68张
        print(f'下载结束 :>>>> ')


# 多进程和多线程的相关操作
class SpiderProcessThread(BaseSpider):
    def __init__(self):
        super().__init__()

    def main_chose_category_normal(self):
        # 获取所有的分类
        category_dict = self.spider_all_category()
        while True:
            # 遍历分类获取指定的分类下载
            for index, data in enumerate(category_dict.items(), start=1):
                index = str(index).rjust(len(str(index)) + 3, '0')
                category_title, category_href = data
                print(f'当前编号 :>>> {index} 分类 :>>>> {category_title}')
            # 用户输入下载的分类
            category_input = input("请输入下载的分类 :>>>> ").strip()
            if category_input not in category_dict.keys():
                print(f'当前分类 {category_input} 不存在')
                continue
            # 分类的主链接
            category_href = category_dict.get(category_input)
            start_page = input("请输入起始页码 :>>>> ").strip()
            end_page = input("请输入结束页码 :>>>> ").strip()
            # 起始页码和结束页码
            if not all([start_page.isdigit(), end_page.isdigit()]):
                print(f'页码有误!')
                continue
            func_id = input("请选择版本 ::>> ").strip()
            # 1 正常下载
            # 2 多进程下载
            # 3 多线程下载
            self.main_download(start_page, end_page, category_href, category_input, func_id=func_id)


if __name__ == '__main__':
    s = SpiderProcessThread()
    res = s.main_chose_category_normal()
    print(res)

【二】协程版本

import os
import time
from multiprocessing import Process
from threading import Thread

# pip install fake-useragent
from fake_useragent import UserAgent
# pip install requests
import requests

# pip install lxml
from lxml import etree

import asyncio
# pip install aiohttp
import aiohttp

import aiofiles


# pip install aiofiles

class BaseSpider(object):
    def __init__(self):
        self.base_area = 'https://pic.netbian.com'
        # 根目录
        self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
        # 创建请求头
        self.headers = {
            'User-Agent': UserAgent().random
        }

    async def __create_etree(self, page_source):
        return etree.HTML(page_source)

    # 获取页面源码数据
    async def get_tree(self, url):
        # 对目标地址发起请求并获得响应
        # async def main():
        #     async with aiohttp.ClientSession() as session:
        #         async with session.get("http://httpbin.org/headers") as response:
        #             print(await response.text())
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                # 因为数据是乱码 所以要对数据进行编码

                # 返回当前页面的源码数据
                page_source = await response.text(encoding='gbk')
                # 返回当前页面的解析器对象
                tree = await self.__create_etree(page_source=page_source)
                return tree

    async def spider_all_category(self):
        category_dict = {}
        tree = await asyncio.create_task(self.get_tree(url=self.base_area))
        print(tree)
        # 直接使用xpath语法
        a_list = tree.xpath('//*[@id="main"]/div[2]/a')
        for a in a_list:
            # //*[@id="main"]/div[2]/a[1]
            # <a href="/4kdongman/" title="4K动漫图片">4K动漫</a>
            # 如果是获取标签的属性 href / title ---> /@属性名
            # 如果是获取标签中间的文本 4K动漫 ---> /text()
            # 上面取到的都是在列表中
            title = a.xpath('./text()')[0]
            href = self.base_area + a.xpath('./@href')[0]
            category_dict[title] = href

        return category_dict

    async def spider_all_detail_source(self, url):
        img_data_dict = {}
        tree = await self.get_tree(url=url)
        li_list = tree.xpath('//*[@id="main"]/div[4]/ul/li')
        for li in li_list:
            detail_href = self.base_area + li.xpath('./a/@href')[0]
            tree = await self.get_tree(url=detail_href)
            img_detail_url = self.base_area + tree.xpath('//*[@id="img"]/img/@src')[0]
            img_title_ = tree.xpath('//*[@id="img"]/img/@title')[0].split(' ')[0]
            img_title = ''
            for item in img_title_:
                if item.isdigit() or item.isspace() or item in ['*', '-', 'x', '+', '\\', '/']:
                    pass
                else:
                    img_title += item
            img_data_dict[img_title] = img_detail_url
        return img_data_dict

    async def create_url_list(self, start_page: int, end_page: int, base_url: str):
        url_list = []
        for page in range(start_page, end_page):
            if page == 1:
                # https://pic.netbian.com/4kdongman/
                page_url = base_url
                url_list.append(page_url)
            else:
                # https://pic.netbian.com/4kdongman/index_2.html
                page_url = base_url + f'index_{page}.html'
                url_list.append(page_url)
        return url_list

    async def download_image(self, img_url: str, img_title: str, category_input: str):
        file_dir = os.path.join(self.BASE_DIR, category_input)
        os.makedirs(file_dir, exist_ok=True)
        file_path = os.path.join(file_dir, img_title + '.png')
        async with aiohttp.ClientSession() as session:
            async with session.get(img_url) as response:
                content = await response.read()
                async with aiofiles.open(file_path, mode='wb') as f:
                    await f.write(content)
        print(f'当前图片 :>>> 标题 {img_title} 下载成功! 链接 :>>>> {img_url}!')

    async def main_download(self, start_page, end_page, category_href, category_input, func_id):
        start_time = time.time()
        # 构建所有目标地址
        target_url_list = await self.create_url_list(start_page=int(start_page),
                                                     end_page=int(end_page),
                                                     base_url=category_href)
        print(f'当前所有目标地址构建完成 :>>>> {target_url_list}')
        print(f'------------------------------------------------')
        img_data_dict = {}
        # 请求每一页的图片详细连接
        for target_url in target_url_list:
            print(f'当前抓取图片首页地址连接为 :>>>> {target_url}')
            img_dict = await self.spider_all_detail_source(url=target_url)
            img_data_dict.update(img_dict)
        print(f'当前所有图片连接构建完成 :>>>> {img_data_dict}')
        print(f'------------------------------------------------')
        print(f'开始下载 :>>>> ')
        stat_download_time = time.time()
        # 下载每一张图片
        if func_id == '1':
            count = 0
            task_list = []
            for title, url in img_data_dict.items():
                count += 1
                print(f'当前是第 {count} 张图片!')
                task = asyncio.create_task(
                    self.download_image(img_url=url, img_title=title, category_input=category_input))
                task_list.append(task)
            # 等待任务完成
            await asyncio.wait(task_list)
        print(f'下载结束 :>>>> ')
        end_time = time.time()
        print(f'总耗时 :>>>> {end_time - start_time} s!')
        print(f'下载总耗时 :>>>> {end_time - stat_download_time} s!')


# 多进程和多线程的相关操作
class SpiderProcessThread(BaseSpider):
    def __init__(self):
        super().__init__()

    async def main_chose_category_normal(self):
        # 获取所有的分类
        category_dict = await self.spider_all_category()
        while True:
            # 遍历分类获取指定的分类下载
            for index, data in enumerate(category_dict.items(), start=1):
                index = str(index).rjust(len(str(index)) + 3, '0')
                category_title, category_href = data
                print(f'当前编号 :>>> {index} 分类 :>>>> {category_title}')
            # 用户输入下载的分类
            category_input = input("请输入下载的分类 :>>>> ").strip()
            if category_input not in category_dict.keys():
                print(f'当前分类 {category_input} 不存在')
                continue
            # 分类的主链接
            category_href = category_dict.get(category_input)
            start_page = input("请输入起始页码 :>>>> ").strip()
            end_page = input("请输入结束页码 :>>>> ").strip()
            # 起始页码和结束页码
            if not all([start_page.isdigit(), end_page.isdigit()]):
                print(f'页码有误!')
                continue
            func_id = input("请选择版本 ::>> ").strip()
            # 1 正常下载
            # 2 多进程下载
            # 3 多线程下载
            await self.main_download(start_page, end_page, category_href, category_input, func_id=func_id)


if __name__ == '__main__':
    s = SpiderProcessThread()
    res = s.main_chose_category_normal()
    asyncio.run(res)
import asyncio
import os
import time

from fake_useragent import UserAgent
import aiohttp
from lxml import etree

headers = {
    'User-Agent': UserAgent().random
}
BASE_DIR = os.path.dirname(__file__)


def create_file_name(path='img'):
    file_name_path = os.path.join(BASE_DIR, path)
    os.makedirs(file_name_path, exist_ok=True)
    return file_name_path


file_name_path = create_file_name()


async def create_url_list():
    url_list = []
    for i in range(1, 5):
        if i == 1:
            index_url = 'https://pic.netbian.com/4kdongman/'
            url_list.append(index_url)
        else:
            index_url = f'https://pic.netbian.com/4kdongman/index_{i}.html'
            url_list.append(index_url)
    return url_list


async def get_tree(page_text):
    tree = etree.HTML(page_text)
    return tree


async def get_page_text(tag_url, encoding='gbk'):
    async with aiohttp.ClientSession() as session:
        # 如果遇到 ssl error 这种错,一般都是 ssl=False
        async with session.get(url=tag_url, headers=headers, ssl=False) as response:
            page_text = await response.text(encoding='gbk')
    return page_text


async def spider_index_tree():
    tree_list = []
    url_list = await create_url_list()
    # url_list = ['https://pic.netbian.com/4kdongman/']
    for url in url_list:
        # 获取每一页的页面源码
        page_text = await get_page_text(tag_url=url)
        tree = await get_tree(page_text=page_text)
        tree_list.append(tree)
    return tree_list


async def get_tree_data(tree):
    img_data_list = []
    li_list = tree.xpath('//*[@id="main"]/div[4]/ul/li')
    # //*[@id="main"]/div[4]/ul/li[1]/a/img
    for li in li_list:
        # ./a/img
        img_title_ = li.xpath('./a/img/@alt')[0]
        img_title = ''
        for item in img_title_:
            if item.isdigit() or item.isspace() or item in ['*', '-', 'x', '+', '\\', '/']:
                pass
            else:
                img_title += item
        img_src = 'https://pic.netbian.com' + li.xpath('./a/img/@src')[0]
        img_data_list.append({'img_title': img_title, 'img_src': img_src})
    print(img_data_list)
    return img_data_list


async def spider_index_img_data():
    img_data_list = []
    tree_list = await spider_index_tree()
    for tree in tree_list:
        img_list = await get_tree_data(tree=tree)
        # [{},{}]
        img_data_list.extend(img_list)
    return img_data_list


async def download(img_src, img_title):
    async with aiohttp.ClientSession() as session:
        async with session.get(url=img_src, headers=headers, ssl=False) as response:
            data_all = await response.read()
            file_path = os.path.join(file_name_path, f'{img_title}.png')
            with open(file_path, mode='wb') as fp:
                fp.write(data_all)
            print(f"当前图片 :>>>> {img_title} 保存成功!")


async def main():
    img_data_list = await spider_index_img_data()
    print(len(img_data_list))
    # 创建Task对象列表
    task_list = [asyncio.create_task(download(img_src=img_data.get('img_src'), img_title=img_data.get('img_title'))) for
                 img_data in img_data_list]
    # 等待任务完成
    await asyncio.wait(task_list)


if __name__ == '__main__':
    start_time = time.time()
    # 启协程
    asyncio.run(main())
    print(f"总耗时 :>>>> {time.time() - start_time} s")

    # 总耗时 :>>>> 6.5860209465026855 s

标签:__,协程,url,asynico,模块,time,print,def
From: https://www.cnblogs.com/zyb123/p/18227329

相关文章

  • 电机控制系列模块解析(26)—— 参数辨识
    一、离线辨识参数辨识分为:离线辨识和在线辨识。在现代电机控制领域,准确掌握电机的各项电气和机械参数对于实现高效、精准的控制至关重要。离线辨识作为电机参数测量的一种重要手段,主要在电机未接入实际运行系统时进行,通过特定的测试信号和算法,辨识出电机的关键参数。本文将介......
  • NET工控,上位机,Modbus485网口/串口通讯(鸣志步进电机,鸣志伺服电机,松下伺服电机,华庆
    先上两个通用Modbus帮助类,下面这个是多线程不安全版,在多线程多电机同一端口通信下,可能造成步进电机丢步或者输出口无响应等,还有个多线程安全版,只是基于这个不安全版加上了LOCK,THISusingModbus.Device;usingSunny.UI;usingSystem;usingSystem.IO.Ports;usingSystem.Li......
  • Navicat, PDManer,PyMySQL模块,SQL注入问题,PyMySQL进阶之主动提交事务
    ⅠNavicat【一】Navicat介绍Navicat可以充当很多数据库软件的客户端提供了图形化界面能够让我们更加快速的操作数据库【1】介绍Navicat是一款功能强大且广泛使用的数据库管理工具,可用于连接和管理多种数据库系统,如MySQL、MariaDB、Oracle、PostgreSQL等。本文将详细......
  • YOLOv8改进 | 卷积模块 | 添加选择性内核SKConv【附完整代码一键运行】
    ......
  • PyTorch 的 torch.nn 模块学习
    torch.nn是PyTorch中专门用于构建和训练神经网络的模块。它的整体架构分为几个主要部分,每部分的原理、要点和使用场景如下:1.nn.Module原理和要点:nn.Module是所有神经网络组件的基类。任何神经网络模型都应该继承nn.Module,并实现其forward方法。使用场景:用于定义和......
  • DHT11温湿度模块的简单使用与代码(江科大代码风格)
    目录模块接线测量范围模块代码DTH11.hDHT11.c模块接线测量范围相对湿度:5%~95%RH温度:-20~60℃模块代码DTH11.h#ifndef_DHT11_H_#define_DHT11_H_#include"stm32f10x.h"//Deviceheader//上电后等待1秒才调用函数......
  • 嵌入式模块学习小记(未分类)
    L298N电机驱动板模块OutputA:接DC电机1或步进电机的A+和A-;OutputB:接DC电机2或步进电机的B+和B-;5VEnable:如果使用输入电源大于12V的电源,请将跳线帽移除。输入电源小于12V时短接可以提供5V电源输出;+5VPower:当输入电源小于12V时且5VEnable处于短接状态,可以提......
  • pymysql 模块演练代码
    importpymysqlfrompymysql.cursorsimportDictCursorconn=pymysql.connect(host='localhost',user='root',password='123456',database='day1',port=3306,cursorclass=DictCursor,connect_timeout=3)cursor=conn.cur......
  • BOSHIDA AC/DC电源模块:应用于工业自动化领域
    BOSHIDAAC/DC电源模块:应用于工业自动化领域AC/DC电源模块是一种用来将交流电转换为直流电的电源模块。它在工业自动化领域有着广泛的应用,可以为各种设备和系统提供稳定可靠的电力供应。 一,AC/DC电源模块在工业自动化领域中起到了稳定电源供应的作用。在工业自动化过程中,许多......
  • 低代码开发平台(Low-code Development Platform)的模块组成部分
    低代码开发平台(Low-codeDevelopmentPlatform)的模块组成部分主要包括以下几个方面:低代码开发平台的模块组成部分可以按照包含系统、模块、菜单组织操作行为等维度进行详细阐述。以下是从这些方面对平台模块组成部分的说明:包含系统低代码开发平台本身作为一个完整的系统,包含......