【一】信号量(Semahpore)
1)概念
互斥锁:允许在同一时刻只能有一个线程或进程同资源进行修改
信号量 : 允许指定数量的进程或线程对资源进行修改
2)示例
from multiprocessing import Process, Semaphore
import time
import random
def eg(sem, user):
# 对信号量加锁
sem.acquire()
print(f'{user}占到一个位置')
# 模拟延迟
time.sleep(random.randint(0, 3))
# 对信号量解锁
sem.release()
if __name__ == '__main__':
# 创建一个信号量的池子
sem = Semaphore(5)
# 用来存储所有的子进程
p_l = []
# 十三个子进程
for i in range(5):
# 【三】创建子进程
p = Process(target=eg, args=(sem, f'user {i}:>>>> ',))
# 【四】启动子进程
p.start()
# 【五】添加到进程列表
p_l.append(p)
# 等待所有子进程执行完毕
for i in p_l:
i.join()
# user 0:>>>> 占到一个位置
# user 2:>>>> 占到一个位置
# user 1:>>>> 占到一个位置
# user 3:>>>> 占到一个位置
# user 4:>>>> 占到一个位置
【二】事件(Event)
1)事件处理方法
-
python线程的事件用于主线程控制其他线程的执行
-
事件主要提供了三个方法 set、wait、clear。
-
事件处理的机制:
- 全局定义了一个“Flag”
-
- 如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞
- 如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
2)示例
from multiprocessing import Process, Event, Manager
import time
import random
color_reset = '\033[0m'
color_red = '\033[31m'
color_green = '\033[32m'
# 定义车辆行为函数,它会根据事件对象(event)的状态来决定是否等待或通过路口
def car(event, n):
while True:
# 若事件未设置,表示红灯
if not event.is_set():
print(f'{color_red}红灯亮{color_reset},车{n}等待')
# 阻塞当前进程
event.wait()
# 当event被设置后,打印绿灯信息
print(f'{color_green}绿灯亮{color_reset},车{n}起步')
# 模拟通过路口需要的时间
time.sleep(random.randint(3, 6))
# 防止在sleep期间event状态改变,再次检查状态
if not event.is_set():
continue
# 通过路口
print(f'车{n}通行')
# 退出循环
break
# 定义 警车 行为函数, 警车 在红灯时等待一秒后直接通过
def police_car(event, n):
while True:
# 判断是否为红灯
if not event.is_set():
print(f'{color_red}红灯亮{color_reset},警车{n}等待')
# 等待1秒,不完全遵守交通规则
event.wait(1)
print(f'灯是{event.is_set()},警车{n}通行')
# 通过后立即结束循环
break
# 定义交通灯控制函数,周期性切换红绿灯状态
def traffic_lights(event, interval):
# 无限循环,持续控制交通灯状态
while True:
# 按照给定间隔(比如10秒)改变信号灯
time.sleep(interval)
# 如果当前是绿灯
if event.is_set():
# 切换到红灯状态
# event.is_set() ---->False
event.clear()
else:
# 如果当前是红灯,则切换到绿灯状态
# event.is_set() ----> True
event.set()
def main():
# 初始化事件对象,初始状态为清除(即红灯)
event = Event()
# 创建并启动多个 警车 进程
for i in range(3):
i += 1
police_car_process = Process(target=police_car, args=(event, i))
police_car_process.start()
for i in range(3):
i += 1
car_process = Process(target=car, args=(event, i))
car_process.start()
# 启动交通灯控制进程,交通灯变化周期为10秒
traffic_lights_process = Process(target=traffic_lights, args=(event, 5))
traffic_lights_process.start()
if __name__ == '__main__':
main()
# 红灯亮,警车1等待
# 红灯亮,警车2等待
# 红灯亮,警车3等待
# 红灯亮,车1等待
# 红灯亮,车2等待
# 红灯亮,车3等待
# 灯是False,警车1通行
# 灯是False,警车2通行
# 灯是False,警车3通行
# 绿灯亮,车1起步
# 绿灯亮,车2起步
# 绿灯亮,车3起步
# 车1通行
# 车2通行
# 红灯亮,车3等待
# 绿灯亮,车3起步
# 车3通行
【三】队列补充
from queue import Queue, LifoQueue, PriorityQueue
# 【一】正常队列
# maxsize过不给默认值,这个队列的容量就是无限大
queue = Queue(5)
# 放
queue.put(timeout=1)
queue.put_nowait()
# 取
queue.get(timeout=1)
queue.get_nowait()
# 判断队列是否为空
print(queue.empty())
# 判断是满的
print(queue.full())
# 获取当前队列中的个数
print(queue.qsize())
# 【一】Queue 先进先出
print(f'----------Queue-------------')
queue_normal = Queue(3)
queue_normal.put(1)
queue_normal.put(2)
queue_normal.put(3)
print(queue_normal.get())
print(queue_normal.get())
print(queue_normal.get())
# 1
# 2
# 3
# 【二】LifoQueue 后进先出
print(f'----------LifoQueue-------------')
queue_lifo = LifoQueue(3)
queue_lifo.put(1)
queue_lifo.put(2)
queue_lifo.put(3)
print(queue_lifo.get())
print(queue_lifo.get())
print(queue_lifo.get())
# 3
# 2
# 1
# 【三】PriorityQueue : 根据优先级数字越小的先出
print(f'----------PriorityQueue-------------')
queue_priority = PriorityQueue(3)
# 可以给放进队列的元素设置优先级:数字越小优先级越高!
queue_priority.put((50, 111))
queue_priority.put((0, 222))
queue_priority.put((100, 333))
print(queue_priority.get())
print(queue_priority.get())
print(queue_priority.get())
# (0, 222)
# (50, 111)
# (100, 333)
【四】进程池和线程池
1)池的概念
- 池就是用来保证计算机硬件安全的情况下最大限度的利用计算机
2)线程池
1.语法
from concurrent.futures import ThreadPoolExecutor
# 默认开设当前计算机 cpu 个数五倍数的线程数
# 可以指定线程总数
pool = ThreadPoolExecutor(5)
2.原理
- 池子造出来后 里面固定存在五个线程
- 这五个线程不会存在出现重复创建和销毁的过程
3.优点
- 避免了重复创建五个线程的资源开销
4.示例
- 同步:提交任务之后原地等待任务的返回结果,期间不做任何事
- 异步:提交任务之后不等待任务的返回结果,继续执行代码
同步提交
- 提交任务之后原地等待结果,不做任何事
from concurrent.futures import ThreadPoolExecutor
import time
# 构造线程池,指定线程总数
pool = ThreadPoolExecutor(5)
# 定义线程任务
def task(n):
print(n)
time.sleep(2)
if __name__ == '__main__':
# 向线程池中添加线程任务
pool.submit(task, 1)
# 异步提交 - 先执行子线程再打印主线程
print('this is a main task')
# 1
# this is a main task
异步提交
-
提交任务之后不需要原地等待结果,去做其他事
-
会先执行指定个数个进程,在回到主进程
from concurrent.futures import ThreadPoolExecutor
import time
# 构造线程池,指定线程总数
pool = ThreadPoolExecutor(5)
# 定义线程任务
def task(n):
print(n)
time.sleep(2)
if __name__ == '__main__':
for i in range(10):
pool.submit(task, i)
print('this is a main task')
# 0
# 1
# 2
# 3
# 4
# this is a main task
# 5
# 6
# 7
# 8
# 9
3)进程池
- 开设进程的进程 ID 号不会发生改变
【五】协调理论
- 基于单线程来实现并发
- 进程 --> 线程(进程下的进程) --> 协程(线程下的线程)
1)并发的本质
-
本节的主题是基于单线程来实现并发
-
- 即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发
- 为此我们需要先回顾下并发的本质: 切换+保存状态
-
CPU正在运行一个任务
-
- 会在两种情况下切走去执行其他的任务(切换由操作系统强制控制)
- 一种情况是该任务发生了阻塞
- 另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它。
2)yield关键字
保存当前的状态
下一次基于当前状态继续处理
让你看起来其实就是在并行运行
1.串行执行
import time
def func1():
for i in range(10000000):
i + 1
def func2():
for i in range(10000000):
i + 1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)
# 0.5446224212646484
2.基于yield并发执行
import time
def func1():
while True:
yield
def func2():
g = func1()
for i in range(10000000):
i+1
next(g)
start=time.time()
func2()
stop=time.time()
print(stop-start)
# 0.6688270568847656
3)实现遇到IO自动切换
-
第一种情况的切换。
-
在任务一遇到IO情况下
-
切到任务二去执行
-
这样就可以利用任务一阻塞的时间完成任务二的计算
-
效率的提升就在于此。
-
yield不能检测IO
-
- 实现遇到IO自动切换
【六】协调介绍
1)概念
-
是单线程下的并发,又称微线程,纤程。英文名Coroutine。
-
一句话说明什么是线程:
-
- 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
-
需要强调的是:
-
- python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
- 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
-
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
2)优点
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
- 单线程内就可以实现并发的效果,最大限度地利用cpu
- 应用程序级别速度要远远高于操作系统的切换
3)缺点
- 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
- 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程(多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他的任务都不能执行了)
4)总结
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
【七】Greenlet模块
1)介绍
-
如果我们在单个线程内有20个任务
-
- 要想实现在多个任务之间切换
- 使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send)
- 而使用greenlet模块可以非常简单地实现这20个任务直接的切换
2)安装
pip install greenlet
3)小结
-
greenlet只是提供了一种比generator更加便捷的切换方式
-
- 当切到一个任务执行时如果遇到io
- 那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
-
单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作
-
- 我们完全可以在执行任务1时遇到阻塞
- 就利用阻塞的时间去执行任务2。。。。
- 如此,才能提高效率,这就用到了Gevent模块。
【八】Gevent模块
1)介绍
- Gevent 是一个第三方库
- 可以轻松通过gevent实现并发同步或异步编程
- 在gevent中用到的主要模式是Greenlet
- 它是以C扩展模块形式接入Python的轻量级协程。
- Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调
2)安装
pip install gevent
【九】asyncio模块
1)介绍
- asyncio 模块是 Python 中实现异步的一个模块,该模块在 Python3.4 的时候发布
- async 和 await 关键字在 Python3.5 中引入。
- 因此,想要使用asyncio模块,建议 Python 解释器的版本不要低于 Python3.5 。
2)协调函数和协调对象
1.协调函数
-
直白的讲,定义为如下形式的函数
# 使用 async 声明的函数就是协程函数 async def fn(): pass
2.协程对象
-
所谓的协程对象就是调用协程函数之后返回的对象
# 使用 async 声明的函数就是协程函数 async def fn(): pass # 调用携程函数得到的对象就是协程对象 res = fn() print(res) # <coroutine object fn at 0x1029684a0>
【十】协调函数基本应用
1)基本应用
import asyncio
async def fn():
print('协程函数内部的代码')
# 执行协程代码的方式一
def main_first():
# 调用协程函数,返回一个协程对象
res = fn()
# todo:1、创建一个事件循环
loop = asyncio.get_event_loop()
# todo:2、将协程当作任务提交到事件循环的任务列表中,协程执行完成之后终止
loop.run_until_complete(res)
# 执行协程代码的方式二
def main_second():
# 调用协程函数,返回一个协程对象
res = fn()
# 解析:第二种方式在本质上和第一种方式是相同的,其内部先创建事件循环,然后执行 run_until_complete
# 但是要注意:该方式只支持 Python3.7+ 的解释器,因为该方式在 Python3.7 加入的。
asyncio.run(res)
if __name__ == '__main__':
main_first()
main_second()
2)await关键字
-
await 是一个 只能 在协程函数中使用的关键字,用于当协程函数遇到IO操作的时候挂起当前协程(任务),
-
当前协程挂起过程中,事件循环可以去执行其他的协程(任务)
-
当前协程IO处理完成时,可以再次切换回来执行 await 之后的代码
import asyncio async def fn(): print('协程函数内部的代码') # 遇到IO操作之后挂起当前协程(任务),等IO操作完成之后再继续往下执行。 # 当前协程挂起时,事件循环可以去执行其他协程(任务) response = await asyncio.sleep(2) # 模拟遇到了IO操作 print(f'IO请求结束,结果为:{response}') def main(): # 调用协程函数,返回一个协程对象 res = fn() # 执行协程函数 asyncio.run(res) if __name__ == '__main__': main() # 协程函数内部的代码 # IO请求结束,结果为:None
3)小结
-
上述的实例只是创建了一个任务
-
- 即:事件循环的任务列表中只有一个任务
- 所以在IO等待时无法演示切换到其他任务效果。
-
在程序中想要创建多个任务对象
-
- 需要使用Task对象来实现。
4)Task 对象
1.介绍
-
Tasks 用于并发调度协程
-
通过 asyncio.create_task(协程对象) 的方式创建 Task 对象
-
这样可以让协程加入事件循环中等待被调度执行。
-
除了使用 asyncio.create_task() 函数以外
-
还可以用低层级的loop.create_task() 或 ensure_future() 函数。并且不建议手动实例化 Task 对象。
-
本质上是将协程对象封装成 Task 对象
-
并将协程立即加入事件循环,同时追踪协程的状态。
-
注意事项:
-
- asyncio.create_task() 函数在 Python3.7 中被加入。
- 在 Python3.7 之前,可以改用低层级的
- asyncio.ensure_future() 函数。
2.示例
import asyncio
async def other_tasks():
print('start')
await asyncio.sleep(2) # 模拟遇到了IO操作
print('end')
return '返回值'
async def fn():
print('fn开始')
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task1 = asyncio.create_task(other_tasks())
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task2 = asyncio.create_task(other_tasks())
print('fn结束')
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待相对应的协程全都执行完毕并获取结果
response1 = await task1
response2 = await task2
print(response1, response2)
def main():
asyncio.run(fn())
if __name__ == '__main__':
main()
# fn开始
# fn结束
# start
# start
# end
# end
# 返回值 返回值
5)aiohtpp对象
1.介绍
-
我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。
-
- 本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。
- 安装方式,pip install aiohttp。
-
aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。
-
- asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,
- aiohttp就是基于asyncio实现的http框架。