说在前头:本文话糙理不糙,用大白话说明协程的核心思想,
协程,指的是单个线程里执行多个并发任务,一个协程对应一个任务,重点来了,!!!协程是用户空间的概念,也就是说不管你一个线程里有多少个协程,在操作系统看来,你就是一个单线程,只要你有一处代码阻塞了,那么os就会挂起整个线程,所以说这多个任务中一旦有一个调用了阻塞io阻塞了线程,那么整个线程都将阻塞在原地,该线程内的其余的任务都不能执行了即协程就会得不到切换。所以解决这个问题的关键,就是系统必须找到一种方法,可以阻止该线程对应的所有代码块中所有会使线程阻塞的阻塞io操作。换句话说就是,必须有一个管理员,可以检测到该线程里的所有io操作,一旦线程里的某处代码发起了io操作,那么在这个io操作实际执行之前,这个调度器就会得到通知,然后把所有的io操作都替换成统一的非阻塞io,然后再向操作系统发起非阻塞io,因为是非阻塞io,所以线程不会被挂起,也就是说线程依旧是就绪的,但是当前线程正在执行的代码块已经执行到阻塞io操作了,也就是说这块代码块应该是阻塞的,是不能继续运行的,但线程此刻是可以运行的,所以为了使得可运行的线程继续运行,这个管理员就必须用另一段代码块来替换掉当前的代码块,从而使得当前线程可以从新的代码块处开始运行,这样就实现了多个代码块的并发,这样就实现了同一个线程中,一个代码块的阻塞并不会影响到该线程的继续运行,所以这个管理员就是理解协程的关键,这里我们说的一个代码块,就是一个协程,这里说的管理员,就是协程调度器(python里的事件循环,go里面的协程调度器),用一个可继续运行的代码块替换掉阻塞的代码块,就是我们说的协程切换,这里我们说的io操作是一个泛指,凡是可能引起阻塞的操作我们都可以认为是一个io操作,比如sleep、网络请求、磁盘读写。我们前面说了,管理员也就是协程调度器必须在协程执行io操作的时候得到通知,所以要实现这个目的,解决办法之一就是提供了一套专门的io语法(python就是这样的,await),只要你协程用这种io语法来进行io操作,那么我协程调度器就能感知到这个操作,那么当你这个io操作进行的时候,协程调度器就能接管这次io,就能在发起io后,协程调度器还可以选择一个新的协程在线程中运行。当然,如果我们偏偏不用他提供的专门的io语法,而是绕过这个语法,绕过这个调度器,直接向操作系统发起阻塞io,那么此时协程调度器就无法感知到这次阻塞io了,因为协程是用户空间的概念,不管你一个线程里有多少个协程,在操作系统看来,你就是一个单线程,一个协程阻塞了,那么在操作系统看来就是你整个线程阻塞了,那么原先分配在此线程上运行的其他线程就都不会得到运行了。举个例子,在python中你可以把await搭配aiohttp理解成系统提供的一套专门io语法,你用这套io语法,那么一个协程发起io的时候不会阻塞掉这个线程,await aiohttp这套语法对于协程来说是阻塞的,但是对于协程调度器来说实际是非阻塞,但是如果你不用这套io语法,而是绕过这套语法,用一套协程调度器检测不到的io语法来执行阻塞io,比如你python中直接用request库来执行io操作,那么协程调度器(这里即python里的事件循环)就无法感知到这次io,就无法接管这次io,因为python是每个线程都有自己的协程队列和协程调度器(即事件循环),那么这次request阻塞io发起以后,这个线程就阻塞了,这样同样在这个线程运行的的协程调度器就无法继续运行,因为线程阻塞掉了,运行不了,这样其他协程就无法运行了,所以如果要不阻塞当前线程,那么这次request阻塞io就必须丢到一个新的线程去执行,这样阻塞也是阻塞新线程,而不会阻塞当前的线程,所以什么时候用协程实现,什么时候应该丢到线程池来处理就很清楚了,即如果本次阻塞io操作,协程调度器可以感知,那么就用协程,如果本次阻塞io操作,协程调度器无法感知,那么就新建一个线程或者线程池,然后把该任务包装一下丢到新线程或者新线程池,也就是说如果我们用的是协程友好的库即非阻塞的io库如python的aiohttp,那么我们就能在协程中使用,如果是python request之类的阻塞库,那么就丢到新线程或者线程池。
再次举例,以python为例,python是每个线程都有一个事件循环,即每个线程都有一个自己的协程调度器,协程a、协程b、事件循环x都在同一个线程t上运行,python中协程a(即一个通过async定义的函数)调用一个async函数b,此时只是返回一个协程对象,并不会真正运行,当协程a执行await b的时候对协程a来说他这里相当于一个同步io,但实际上await是协程调度器指定的io语法,当协程a使用await发起同步io的时候,协程调度器就会得到通知,就会挂起协程a,然后把协程b添加到线程对应的事件循环中,然后从线程t的协程等待队列中选取一个可运行的协程运行,当协程b运行完毕后,协程a就变为可运行,这样协程a下一次就有机会运行,如果协程a直接使用request包里的api,因为这些都是真正的阻塞io,绕过了协程调度器即协程调度器无法感知到此次io,那么这次阻塞io执行时整个线程都会被阻塞,直到io完成。同时python的协程适用于非阻塞io密集型任务,不适合计算密集型任务,因为python中只有通过await时才会主动让出cpu,计算密集型任务会长时间霸占线程使得其他协程得不到运行。
以go为例,go里面是全局的协程调度器,同时是抢占式调度,所以go里面的协程可以同时适用于计算密集型和io密集型。问:如果go的一个任务t是真正的阻塞io,这个任务被分配给了线程a,那么任务t会阻塞掉整个线程吗?答案是不会。在 Go 中,如果一个 Goroutine (即go里面的协程)执行了真正的阻塞 I/O 操作(比如系统调用),这个 Goroutine 会阻塞所在的操作系统线程。但 Go 语言的调度器能够很好地应对这种情况,不会导致整个线程阻塞,其他 Goroutines 可以继续执行。当一个 Goroutine 进行阻塞的 I/O 操作或系统调用时(比如文件读取、网络请求),Go 运行时会检测到该Goroutine 所在的线程阻塞。Go 的调度器会创建或重新分配一个新的线程,让原本将在这个被阻塞线程上运行的其他 Goroutines 继续在新的线程上执行,这样这个引起阻塞的io操作在其他Goroutines看来实际发出的是非阻塞io操作。这样,Go 运行时能避免因为一个协程阻塞 I/O 操作而导致该线程的所有的协程被阻塞即如果一个线程被阻塞,调度器会将剩余的 Goroutines 分配到其他线程执行。
再次笔记:python是每个线程都有一个单独的协程调度器,相当于一个局部的协程调度器,而go是有一个全局的协程调度器,只要使用指定的语法,协程调度器就能接管io并自动切换到另一个协程,如果可以直接发起阻塞io并且可以让协程调度器无法感知到该次io操作,那么该操作就能阻塞该协程所在的线程,也就是预先分配给该线程的所有协程都会被同步阻塞,只有阻塞io返回,该线程才能继续运行。
最后再次强调:协程,是用户空间的概念,协程调度器也是用户空间的概念,操作系统看到的就是一个单线程,如果你这个线程发出一个阻塞io,那么os就会立即阻塞并挂起这个线程,所以协程调度器的核心就是让这个线程不发出阻塞io,只要不发出阻塞io,那么这个线程就是可运行的,至于线程里运行什么代码块,这就是协程调度器该控制的事情,所以,再次强调,协程是用户空间线程的一种实现,操作系统是看不到,由用户自己来管理协程(比如go语言系统、python语言系统)。
备注:用户空间和内核空间
备注:io语法:io语法只是语法,其底层有自己的实现,提供给协程的io语法可以有阻塞io、非阻塞io,但是其底层实现,必须做到对于其他协程来说是非阻塞的
备注:统一替换成非阻塞io并不一定是指统一替换成非阻塞io,而是说协程发起的这次io,在其他协程看来你这个io是一次非阻塞io,并不会阻塞掉线程,从而其他协程可以继续运行。这里的不会阻塞掉线程的意思是,在其他协程看来,你这个线程没有阻塞,其他可就绪的协程是可以继续运行的,至于底层是怎么实现的,他不管,比如说底层可以是统一把所有io替换成非阻塞io(python中就可以这样操作,比如把所有的同步阻塞库request操作替换成非阻塞的aiohttp并搭配await来用,这样await在发起调用的协程来说是阻塞的,但是对于其他协程来说却是非阻塞的),也可以是底层创建一个新的线程,然后把所有其他协程都移到这个新线程上面去运行,留下这个引发线程阻塞的协程继续在旧线程运行(go的协程就是这么搞的),这样在其他协程看来,你这个线程是没有阻塞的
测试代码:
import asyncio
import threading
import time
# 只要线程没有阻塞,那么这个函数差不多每隔async_sleep_time 秒就会打印一条信息
# 如果线程被阻塞,那么就不会打印,可以通过i的值来查看
async def forever_loop_print(async_sleep_time: int = 1, info: str = None):
i: int = 0
if info is None:
info = "forever_loop_print"
while True:
print(f"{info}:i={i}")
await asyncio.sleep(async_sleep_time)
i = i + 1
async def do_net_io_async(idx: int, sleep_seconds: int):
print(f"net_io_async start and sleep : idx={idx},sleep_seconds={sleep_seconds}")
# 指定的io语法,通过这个语法,协程调度器可以感知到这次io,所以协程调度器可以接管这次io,从而底层实际发起非阻塞io,
# 从而本线程不会被阻塞,这样本线程对应的协程调度器可以继续运行,就可以从就绪协程列表中选择一个新的协程,
# 然后把本线程的使用权从协程调度器移交给这个新的协程,从而协程调度器可以实现协程切换,并使在本线程继续运行
# 备注1:asyncio.sleep是协程方式的sleep,和await搭配使用
# 备注2:这里的io语法指的是await+协程方式的非阻塞sleep
await asyncio.sleep(sleep_seconds)
print(f"net_io_async end and wakeup: idx={idx},sleep_seconds={sleep_seconds}")
async def do_net_io_block(idx: int, sleep_seconds: int):
print(f"net_io_block start and sleep : idx={idx},sleep_seconds={sleep_seconds}")
# 非指定的io语法,协程调度器无法感知到本次io,所以可以理解为这次io绕过了协程调度器,直接向os发起阻塞io
# os收到阻塞io就会把整个线程阻塞,这样同样在本线程运行的协程调度器和其他协程就都没有机会运行了,
# 因为整个线程都被阻塞了,所以只有等待本次io结束,协程调度器才有可能继续运行
# python是每个线程都有自己的协程队列和协程调度器(事件循环)
# 备注1:这里的io语法指的是time.sleep这样的阻塞io
time.sleep(sleep_seconds)
print(f"net_io_block end and wakeup: idx={idx},sleep_seconds={sleep_seconds}")
async def do_net_io(idx: int, sleep_seconds: int, block: bool) -> float:
if block is True:
start = time.time()
# 用协程的方式创建三个协程,只不过每个协程里执行的都是阻塞io
# asyncio.create_task时就会把协程加到线程的协程队列里
# 当然,因为task1/task2/task3都放到了协程队列,所以哪个先运行这个是不确定的
# !!但是可以肯定的是,在某一个task(假设是task1) do_net_io_block的时候整个线程都是阻塞的,
# 其他协程如task2、task3都是没有机会运行的
task1 = asyncio.create_task(do_net_io_block(idx, sleep_seconds))
idx += 1
task2 = asyncio.create_task(do_net_io_block(idx, sleep_seconds))
idx += 1
task3 = asyncio.create_task(do_net_io_block(idx, sleep_seconds))
await task1
await task2
await task3
end = time.time()
r = end - start
print(f"do_net_io_block: costs={r} seconds")
return r
else:
start = time.time()
# 用协程的方式创建三个协程,只不过每个协程里执行的都是协程方式的非阻塞io
# asyncio.create_task时就会把协程加到线程的协程队列里
# !!在task1 do_net_io_async的时候task2、task3是有机会运行的
task1 = asyncio.create_task(do_net_io_async(idx, sleep_seconds))
idx += 1
task2 = asyncio.create_task(do_net_io_async(idx, sleep_seconds))
idx += 1
task3 = asyncio.create_task(do_net_io_async(idx, sleep_seconds))
await task1
await task2
await task3
end = time.time()
r = end - start
print(f"do_net_io_async: costs={r} seconds")
return r
async def test_1():
print("******************start test_1*****************")
# 第一个async方式,应该在1个sleep_seconds左右
await do_net_io(idx=0, sleep_seconds=5, block=False)
# 第二个async方式,应该不会低于3个sleep_seconds
await do_net_io(idx=0, sleep_seconds=5, block=True)
print("-------------------end test_1------------------")
async def test_2():
print("******************start test_2*****************")
unstoptask = asyncio.create_task(forever_loop_print(async_sleep_time=1, info="forever_loop_print"))
# 在await asynctask sleep期间 unstoptask会持续打印info
asynctask = asyncio.create_task(do_net_io(idx=0, sleep_seconds=5, block=False))
await asynctask
# 在await blocktask sleep期间,unstoptask会停止打印info,因为整个线程都会被阻塞
blocktask = asyncio.create_task(do_net_io(idx=0, sleep_seconds=5, block=True))
await blocktask
# 在asyncio.sleep时,unstoptask会继续打印
await asyncio.sleep(5)
print("-------------------end test_2------------------")
if __name__ == '__main__':
# asyncio.run会等到test_1运行结束才会继续往下运行,所以可以看成是同步调用
asyncio.run(test_1())
print("\n\n\n")
asyncio.run(test_2())
测试结果:
******************start test_1*****************
net_io_async start and sleep : idx=0,sleep_seconds=5
net_io_async start and sleep : idx=1,sleep_seconds=5
net_io_async start and sleep : idx=2,sleep_seconds=5
net_io_async end and wakeup: idx=0,sleep_seconds=5
net_io_async end and wakeup: idx=1,sleep_seconds=5
net_io_async end and wakeup: idx=2,sleep_seconds=5
do_net_io_async: costs=5.003515005111694 seconds
net_io_block start and sleep : idx=0,sleep_seconds=5
net_io_block end and wakeup: idx=0,sleep_seconds=5
net_io_block start and sleep : idx=1,sleep_seconds=5
net_io_block end and wakeup: idx=1,sleep_seconds=5
net_io_block start and sleep : idx=2,sleep_seconds=5
net_io_block end and wakeup: idx=2,sleep_seconds=5
do_net_io_block: costs=15.026322603225708 seconds
-------------------end test_1------------------
******************start test_2*****************
forever_loop_print:i=0
net_io_async start and sleep : idx=0,sleep_seconds=5
net_io_async start and sleep : idx=1,sleep_seconds=5
net_io_async start and sleep : idx=2,sleep_seconds=5
forever_loop_print:i=1
forever_loop_print:i=2
forever_loop_print:i=3
forever_loop_print:i=4
net_io_async end and wakeup: idx=1,sleep_seconds=5
net_io_async end and wakeup: idx=2,sleep_seconds=5
net_io_async end and wakeup: idx=0,sleep_seconds=5
do_net_io_async: costs=4.998783111572266 seconds
net_io_block start and sleep : idx=0,sleep_seconds=5
net_io_block end and wakeup: idx=0,sleep_seconds=5
net_io_block start and sleep : idx=1,sleep_seconds=5
net_io_block end and wakeup: idx=1,sleep_seconds=5
net_io_block start and sleep : idx=2,sleep_seconds=5
net_io_block end and wakeup: idx=2,sleep_seconds=5
do_net_io_block: costs=15.02980637550354 seconds
forever_loop_print:i=5
forever_loop_print:i=6
forever_loop_print:i=7
forever_loop_print:i=8
forever_loop_print:i=9
-------------------end test_2------------------
Process finished with exit code 0
标签:协程,一文,seconds,阻塞,----,线程,io,sleep
From: https://blog.csdn.net/qq_35263706/article/details/142139274