楔子
在从头开始开发新的 IO 密集型应用程序时,asyncio 可能是首选技术,并且也要使用与 asyncio 搭配工作的非阻塞库,如 asyncpg、aiohttp 等等。然而我们工作的很大一部分可能是使用阻塞 IO 库管理现有的代码,例如对 HTTP 发请求的 requests,用于 PostgreSQL 数据库的 psycopg2,或其他阻塞库,我们可能还会遇到不支持异步库的情况。在这些情况下,是否有一种方法可以获得并发,并带来性能提升,同时仍然使用 asyncio API 呢?
多线程是这个问题的一种解决方案,由于阻塞 IO 会释放全局解释器锁,因此可以在单独的线程中同时运行 IO。与 multiprocessing 库非常相似,asyncio 也提供了一种利用线程池的方法,因此我们可以在仍然使用 asyncio API 的同时获得线程带来的优势,例如 gather 和 wait。
在本篇文章中,我们将学习如何使用多线程和 asyncio 在线程中运行阻塞 API,例如请求。此外,我们还将学习如何像进程那样同步共享数据,并研究更高级的锁,例如可重入锁和死锁。此外我们还将了解如何通过构建响应式 GUI 来运行 HTTP 压力测试,从而 asyncio 与同步代码结合起来。最后,我们再研究几个例外情况,在这些例外情况下,线程可用于 CPU 密集型工作。
threading 模块
Python 允许开发人员通过 threadig 模块创建和管理线程,该模块公开了 Thread 类,该类在实例化时接收一个函数,从而可以在单独的线程中运行它。Python 解释器在一个进程中运行单线程,这意味着即使代码在多个线程中运行,一次也只能运行一个 Python 字节码,因为全局解释器锁一次只允许一个线程执行代码。
这似乎是 Python 限制了我们使用多线程所带来的优势,但在少数情况下,全局解释器锁被释放,主要是在 IO 操作期间。这种情况下,Python 可以释放 GIL,因为在底层会通过低级操作系统调用来执行 IO。这些系统调用在 Python 解释器之外,这意味着在我们等待 IO 完成时不需要运行任何 Python 字节码。
为更好地了解如何在阻塞 IO 的上下文中创建和运行线程,我们重温一下该系列第三篇文章中的回显服务器示例。回顾一下,要处理多个连接,我们需要将套接字切换为非阻塞模式,并使用 IO 多路复用来监视套接字上的事件。但如果使用的是不能选择非阻塞套接字的遗留代码库怎么办?还能构建一个可以同时处理多个客户端的回显服务器吗?
由于套接字的 recv 和 sendall 是 IO 密集型方法,因此释放 GIL,我们应该能够在单独的线程中同时运行它们。这意味着可为每个连接的客户端创建一个线程,并在该线程中读取和写入数据。此模型是 Apache 等 Web 服务器中的常见范例,被称为 thread-per-connection 模型。让我们通过在主线程中等待连接,然后为每个连接的客户端创建一个线程来尝试这个想法。
from threading import Thread
import socket
def echo(conn: socket.socket):
while True:
data = conn.recv(1024)
print(f"收到数据 {data}, 现在发送")
conn.sendall(data + b"~")
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("localhost", 9999))
server.listen()
while True:
conn, _ = server.accept()
thread = Thread(target=echo, args=(conn,))
thread.start()
在代码中,通过一个无限循环来监听服务器套接字上的连接。一旦有客户端进行连接,就创建一个新线程来运行 echo 函数。这意味着我们将在线程中调用echo,然后启动线程并再次循环,等待第二个连接。同时,在创建的线程中,一直循环监听来自客户端的数据,当得到数据时,会回显它。并且由于每个 recv 和 sendall 是在单独的线程中运行,因此这些操作不会相互阻塞,只会阻塞正在运行的线程。
这解决了多个客户端无法通过阻塞套接字同时连接的问题,尽管该方法存在一些线程独有的问题。如果在连接客户端时尝试使用 CTRL+C 终止该进程会发生什么?应用程序是否可以干净地关闭我们创建的线程呢?
事实证明,进程并没有干净地被关闭。如果你终止应用程序,应该会在 server.accept() 上看到一个 KeyboardInterrupt 异常,应用程序将挂起,因为后台线程一直将程序保持活动状态。此外,任何已经连接的客户端仍然可发送和接收消息。换句话说,这个异常只有主线程可以接收到,而主线程又会等待子线程运行完毕,所以最终结果就是:主线程抛异常,但程序不结束。
有几种方法可以处理这个问题,具体来说,我们可以使用守护线程(demon),或者设计自己的方式来取消或中断正在运行的线程。守护线程是一种用于长时间运行后台任务的特殊线程,这些线程不会阻止应用程序关闭。说白了,主线程执行完毕后不会立即退出,它会等待所有的子线程执行完毕,但子线程如果是守护线程,那么主线程就不会再等了。
由于当前 Python 的主线程不是守护线程,这意味着,如果将所有连接线程都设为守护线程,应用程序将在发生 KeyboardInterrupt 时终止。
# 创建线程的时候将 daemon 参数设置为 True
thread = Thread(target=echo, args=(conn,), daemon=True)
# 或者直接通过 thread.daemon = True 也可以
thread.start()
但设置为守护线程也有一个问题,就是当线程停止时我们无法运行任何清理或关闭逻辑,因为守护线程会突然终止。假设在关闭时,我们想向每个客户端写出服务器正在关闭的信息。有没有办法让异常中断线程时,可以干净地关闭套接字呢?如果调用套接字的 shutdown 方法,任何现有的对 recv 的调用都将返回 zero,并且 sendall 将抛出异常。如果从主线程调用 shutdown,这将会中断正在阻塞 recv 或 sendall 调用的客户端线程。然后,可在客户端线程中处理异常,并执行任何我们想要的清理逻辑。
为此,可以通过继承 Thread 类本身来创建线程,这与以前略有不同。我们可以定义一个 cancel 方法,在这个方法中来关闭客户端套接字。然后,对 recv 和 sendall 的调用将被中断,允许我们退出 while 循环并关闭线程。
from threading import Thread
import socket
class EchoThread(Thread):
def __init__(self, conn):
super().__init__()
self.conn = conn
def run(self):
try:
while True:
data = self.conn.recv(1024)
# 客户端断开连接时,返回 b""
if not data:
# BrokenPipeError 继承自 OSError
raise BrokenPipeError("连接断开")
print(f"收到数据: {data}")
self.conn.sendall(data + b"~")
except OSError:
print(f"线程 shutdown")
def close(self):
# 如果线程处于活跃状态,则关闭连接
if self.is_alive():
self.conn.sendall(b"shutdown")
# 不再和客户端进行数据的读取和写入,但是连接依旧保持着
self.conn.shutdown(socket.SHUT_RDWR)
# 所以再调用一个 close 方法
self.conn.close()
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("localhost", 9999))
server.listen()
connection_threads = []
try:
while True:
conn, addr = server.accept()
thread = EchoThread(conn)
connection_threads.append(thread)
thread.start()
except KeyboardInterrupt:
print("主线程收到 Ctrl+C 引发的 KeyboardInterrupt")
for t in connection_threads:
# 在每个子线程上调用 close 方法,以便于在主线程收到中断时,关闭每个客户端连接
t.close()
解释一下代码,我们首先创建了一个继承自 Thread 的类 EchoThread,这个类用我们原来的 echo 函数的代码覆盖了 run 方法,但做了一些修改。首先将所有内容包装在一个 try catch 块中并拦截 OSError 异常,当关闭客户端套接字时,sendall 等方法会抛出此类异常。我们还检查来自 recv 的数据是否为空,数据为空有两种情况:客户端关闭连接,或当我们自己关闭连接时。这种情况下,我们抛出一个 BrokenPipeError(OSError 的子类),在 except 块中执行 print 语句,然后退出 run 方法,该方法会关闭线程。
然后我们还定义了一个 close 方法,此方法在关闭客户端连接之前,首先检查线程是否处于活动状态。如果线程都销毁了,那么里面的连接也就不存在了。
最后,在监听新传入连接的主循环中,我们拦截 KeyboardInterrupt 异常。一旦到异常,就在创建的每个线程上调用 close 方法。如果连接仍处于活动状态,将向客户端发送一条消息,并关闭连接。
总之,取消 Python 中正在运行的线程,通常是一个棘手的问题,并且取决于你试图处理的特定关闭情况。你需要特别注意线程不会阻塞应用程序退出,你需要确定在何处放置适当的中断点,从而退出线程。
通过 asyncio 使用线程
我们现在知道如何创建和管理多个线程来处理阻塞工作,这种方法的缺点是必须单独创建和跟踪线程。我们希望能够使用学到的所有基于异步的 API 来等待线程的结果,而无需自己管理。就像之前介绍的进程池一样,我们希望也可以使用线程池,以池的方式管理线程。在本文中,将介绍一个流行的阻塞 HTTP 客户端库,并了解如何使用线程和 asyncio 来并发运行 Web 请求。
requests 库
requests 库是一个流行的 Python HTTP 客户端库,自称为"人类的 HTTP",你可以在 https://requests.readthedocs.io/en/master/ 查看该库的最新文档。通过它,你可以像使用 aiohttp 一样向 Web 服务器发出 HTTP 请求。
import requests
def get_status_code(url: str) -> int:
response = requests.get(url)
return response.status_code
url = "https://www.example.com"
print(get_status_code(url))
print(get_status_code(url))
"""
200
200
"""
requests 库是阻塞的,这意味着每次调用 requests.get 都会阻止任何线程执行其他 Python 代码,直到请求完成。如果尝试在协程或任务中单独使用这个库,它将阻塞整个事件循环,直到请求完成。如果有一个需要 2 秒的 HTTP 请求,则应用程序除了等待这 2 秒之外什么也做不了。因此要在 asyncio 中正确使用这个库,必须在线程内运行这些阻塞操作。
线程池执行器
与进程池执行器非常相似,concurrent.futures 库提供了 Executor 抽象类的实现:ThreadPoolExecutor。线程池执行器和进程池类似,会维护一个线程池,然后可将任务提交到该线程池。
虽然默认情况下,进程池会为机器可用的每个 CPU 内核创建一个工作进程,但确定要创建多少个工作线程则有点复杂。
在后台,默认线程数的公式是 min(32, os.cpu_count() + 4),这会导致工作线程的数量上限为 32,数量下限为 5。上限设置为 32 以避免在具有大量 CPU 内核的机器上创建数量惊人的线程(因为线程的创建和维护将耗费资源),而下限设置为 5 则是因为在较小的 12 核机器上,仅启动几个线程不太可能提高性能,为 IO 密集型工作创建比可用 CPU 核数更多的线程通常是有意义的。例如,在 8 核机器上,通过上面的公式将创建 12 个线程。虽然只有 8个线程可以并发运行,但可让其他线程暂停等待 IO 完成,让操作在 IO完成时恢复它们。
import time
from concurrent.futures import ThreadPoolExecutor
import requests
def get_status_code(url: str) -> int:
response = requests.get(url)
return response.status_code
start = time.perf_counter()
get_status_code("https://www.baidu.com")
end = time.perf_counter()
print(f"1 个请求在 {end - start} 秒内完成")
start = time.perf_counter()
with ThreadPoolExecutor() as pool:
urls = ["https://www.baidu.com"] * 1000
results = list(pool.map(get_status_code, urls))
end = time.perf_counter()
print(f"1000 个请求在 {end - start} 秒内完成")
"""
1 个请求在 0.0374612 秒内完成
1000 个请求在 3.6260832 秒内完成
"""
对于我当前的 24 核心机器,使用默认线程数,此代码可以在 4 秒内执行完成。如果不使用线程池(比如通过以下代码执行相同的请求操作),所用时间将存在很大的差异:
import time
import requests
def get_status_code(url: str) -> int:
response = requests.get(url)
return response.status_code
start = time.perf_counter()
url = "https://www.baidu.com"
for _ in range(1000):
get_status_code(url)
end = time.perf_counter()
print(f"1000 个请求在 {end - start} 秒内完成")
"""
1000 个请求在 34.509156000000004 秒内完成
"""
运行此代码需要 30 秒以上的时间,这使得线程代码比同步代码快 10多倍,因此使用线程池给我们带来了相当大的性能提升。
虽然使用线程池带来了不小的性能提升,但你可能还记得在使用 aiohttp 的时候,我们能够在不到 1 秒的时间内并发执行 1000 个请求,为什么这个线程版本比之前慢这么多?请记住,工作线程的最大数量是有限制的,因此可通过在创建线程池时传入 max_workers = 1000 来解决这个问题。
但即便这么做了,也不会有协程代码获得的性能提升大,这是因为线程相关的资源开销较大。线程是在操作系统级别创建的,创建起来比协程消耗更多的资源。此外,线程在操作系统级别具有上下文切换成本,在上下文切换发生时,保存和恢复线程状态会消耗掉使用线程获得的一些性能增益。
确定用于特定问题的线程数量时,最好从小处着手(将 CPU 内核数加上较小的数字作为起点),对其进行测试和基准测试,再逐渐增加线程数量。通常会找到一个最佳位置,此后无论添加多少线程,运行时间都会停滞不前,甚至可能会带来性能下降。相对于要发出的请求而言,这个最佳位置通常是一个相当小的数字(明确地说,为 1000 个请求创建 1000 个线程可能不会带来最佳的资源利用率)。
使用 asyncio 的线程池执行器
使用带有异步事件循环的线程池执行器与使用 ProcessPoolExecutor 没有太大区别,这就是利用抽象 Executor 基类的美妙之处,因为我们只需要更改一行代码就可以使用相同的代码来运行线程或进程。让我们修改运行 1000 个 HTTP 请求的示例,从而使用 asyncio.gather 而不是 pool.map。
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
import requests
def get_status_code(url: str) -> int:
response = requests.get(url)
return response.status_code
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
tasks = [loop.run_in_executor(pool, get_status_code, "https://www.baidu.com")
for _ in range(1000)]
results = await asyncio.gather(*tasks)
print(all(map(lambda x: x == 200, results)))
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"1000 个请求在 {end - start} 秒内完成")
"""
True
1000 个请求在 3.7066948 秒内完成
"""
我们像以前一样创建线程池,但不是使用 map,而是通过使用 loop.run_in_executor 调用 get_status_code 函数来创建任务列表。一旦有了任务列表,就可以使用 asyncio.gather 或我们之前学习的任何其他 asyncio.API 等待它们完成。
在后台,loop.run_in_executor 调用线程池执行器的 submit 方法,这会将传入的每个函数放入一个队列中。然后池中的工作线程从队列中取出相关函数,运行每个工作项直到它完成。与使用没有 asyncio 的线程池相比,这种方法在性能上并不会产生任何优势,但是当我们等待 await asyncio.gather 完成时,其他代码可以运行。
默认执行器
阅读 asyncio 文档,你可能会注意到 run_in_executor 方法的 executor 参数可以设置为 None,这种情况下,run_in_executor 将使用事件循环的默认执行器。什么是默认执行器呢?将其视为整个应用程序的可重用独立执行器即可。默认执行器将始终默认为 ThreadPoolExecutor,除非使用 loop.set_default_executor 方法设置自定义执行器,这意味着可以简化上面的代码。
import time
import asyncio
import requests
def get_status_code(url: str) -> int:
response = requests.get(url)
return response.status_code
async def main():
loop = asyncio.get_running_loop()
tasks = [loop.run_in_executor(None, get_status_code, "https://www.baidu.com")
for _ in range(1000)]
results = await asyncio.gather(*tasks)
print(all(map(lambda x: x == 200, results)))
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"1000 个请求在 {end - start} 秒内完成")
"""
True
1000 个请求在 3.7066948 秒内完成
"""
在代码中,我们不再创建自己的 ThreadPoolExecutor,而是将 None 作为执行器传入。第一次调用 run_in_executor 时,asyncio 为我们创建并缓存了一个默认的线程池执行器。对 run_in_executor 的每个后续调用都重用之前创建的默认执行器,这意味着该执行程序将作为后续事件循环的全局执行程序。这个池的关闭也和之前看到的不同,以前当退出带有块的上下文管理器时,创建的线程池执行器将关闭。当使用默认执行器时,它直到事件循环关闭时才会被关闭,这通常发生在应用程序完成时。想要使用线程时,使用默认的线程池执行器可以简化编码和程序执行,但我们还能不能让这一切变得更简单呢?
在 Python 3.9 中,引入 asyncio.to_thread 可以进一步简化将任务放在默认线程池执行程序中,它接收一个在线程中运行的函数和一组要传递给该函数的参数。然后它在默认线程池执行器和当前事件循环中运行带有参数的函数,这让我们可以进一步简化线程代码。
async def main():
tasks = [asyncio.to_thread(get_status_code, "https://www.baidu.com")
for _ in range(1000)]
results = await asyncio.gather(*tasks)
print(all(map(lambda x: x == 200, results)))
到目前为止,我们看到了如何在线程内运行阻塞代码。将线程与 asyncio 相结合的强大之处在于,可以在等待线程完成时运行其他代码。但要了解如何在线程运行时运行其他代码,我们需要了解一下关于锁的问题。
锁、共享数据和死锁
就像 multiprocessing 代码一样,当共享数据时,多线程代码也容易受到竞态条件的影响,因为我们不能控制执行顺序。每当你有两个线程或进程可以修改共享的非线程安全数据时,你都需要使用锁来确保正确的同步访问。从概念上讲,这与我们采用 multiprocessing 的方法没有什么不同,但线程的内存模型稍微改变了这种方法。
回顾一下,在使用 multiprocessing 时,默认情况下,我们创建的进程不共享内存。这意味着需要创建特殊的共享内存对象(Value、Array 等等),并正确地初始化它们,以便每个进程都可对该对象进行读写。但由于线程可以访问它们与父进程相同的内存,所以不再需要额外的共享操作,线程可以直接访问共享变量。
这稍微简化了一些,但是因为不会使用内置锁的共享 Value 对象,所以需要自行创建它们。要实现这一点,需要使用线程模块的 Lock 实现,从线程模块导入 Lock,然后围绕关键代码段调用它的 acquire 和 release 方法,或者在上下文管理器中使用它。
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
import requests
lock = threading.Lock()
counter: int = 0
def get_status_code(url: str) -> int:
global counter
response = requests.get(url)
with lock:
counter += 1
return response.status_code
async def reporter(request_count: int):
while counter < request_count:
print(f"完成请求 {counter}/{request_count}")
await asyncio.sleep(0.5)
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
request_count = 1000
# 将 reporter() 任务加入事件循环
report_task = asyncio.create_task(reporter(request_count))
tasks = [loop.run_in_executor(pool, get_status_code, "https://www.baidu.com")
for _ in range(1000)]
results = await asyncio.gather(*tasks)
await report_task
print(all(map(lambda x: x == 200, results)))
asyncio.run(main())
"""
完成请求 0/1000
完成请求 122/1000
完成请求 267/1000
完成请求 392/1000
完成请求 543/1000
完成请求 639/1000
完成请求 793/1000
完成请求 914/1000
True
"""
我们创建了一个全局计数器变量,以及一个 lock,以便在临界区同步对它的访问。在 get_status_code 函数中,增加计数器时获取锁。然后在主协程中启动了一个报告后台任务,该任务输出每 500 毫秒完成了多少请求。
现在知道了多线程和 multithreading 锁的基础知识,但是关于锁还有很多东西需要学习。接下来,我们将了解一下可重入(reentrancy)的概念。
可重入锁
简单的锁可以很好地协调多个线程对共享变量的访问,但当一个线程试图获取它已经取得的锁时会发生什么呢?这是安全的吗?由于同一个线程已经获得锁,这应该没问题,因为根据定义这是单线程的,因此是线程安全的。
虽然这种访问应该没问题,但它确实会导致当前使用的锁出现问题。为了说明这点,假设有一个递归求和函数,它接收一个整数列表并生成列表的总和。让我们尝试用一个普通的锁来实现它,看看会发生什么。
from threading import Lock, Thread
from typing import List
lock = Lock()
def sum_list(int_list: List[int]) -> int:
print(f"等待获取锁")
with lock:
print(f"获取到锁")
if len(int_list) == 0:
print(f"计算完成")
return 0
else:
print(f"计算剩余的 {len(int_list)} 个元素的和")
return int_list[0] + sum_list(int_list[1:])
thread = Thread(target=sum_list, args=([1, 2, 3, 4],))
thread.start()
thread.join()
"""
等待获取锁
获取到锁
计算剩余的 4 个元素的和
等待获取锁
"""
如果以上 4 条信息后,应用程序将永远挂起,那为什么会发生这种情况?如果运行这个程序,第一次完全可以获得 lock,然后递归地对列表的其余部分调用 sum_list。这将导致尝试第二次获取 lock,这是代码挂起的地方,因为已经获得了锁,所以会永远阻塞第二次获得锁的行为。这也意味着永远不会退出第一个 with 块,也不能释放锁,我们在等一个永远不会释放的锁。
说白了,递归会多次调用 sum_list,第一次调用触发第二次调用,第二次触发第三次,第三次触发第四次。然后第四次调用返回之后,第三次调用再返回,然后第二次返回,第一次返回,整个递归结束。
而第二次尝试获取锁时,发现锁已经被获取了,什么时候获取的呢?显然是第一次递归时获取的。而第一次递归什么时候将锁释放掉呢?显然要等到第二次返回的时候,而第二次递归要想执行,需要先获得锁,于是就陷入了死胡同。
from threading import Lock
lock = Lock()
lock.acquire()
# 程序会阻塞在这里
lock.acquire()
由于这个递归来自产生它的同一个线程,因此多次获得锁应该不是问题,因为这不会导致竞态条件。所以为了支持这些用例,线程库提供了可重入锁。可重入锁是一种特殊类型的锁,同一个线程可以多次获得这种锁,从而允许该线程"重新进入"关键部分。线程模块在 RLock 类中提供了可重入锁,只需要修改两行代码,我们就可以用上面的代码来解决这个问题。
from threading import RLock
lock = RLock()
如果修改这些行,代码将正常工作,并且单个线程将能多次获得锁。在内部,可重入锁通过保持递归计数来工作。每次从第一个获得锁的线程获得锁时,计数就会增加,而每次释放锁时,计数就会减少。当计数为 0 时,锁最终被释放,以便其他线程可以获取它。
from threading import Lock
from typing import List
class IntListThreadSafe:
def __init__(self, wrapped_list: List[int]):
self._lock = Lock()
self._inner_list = wrapped_list
def indices_of(self, to_find: int) -> List[int]:
"""
找到列表中所有值等于 to_find 的元素的索引
"""
with self._lock:
return [index for index, value in enumerate(self._inner_list) if value == to_find]
def find_and_replace(self, to_place: int, replace_with: int):
"""
将列表中值为 to_place 的元素全部换成 replace_with
"""
with self._lock:
indices = self.indices_of(to_place)
for index in indices:
self._inner_list[index] = replace_with
threadsafe_list = IntListThreadSafe([1, 2, 1, 2, 1])
threadsafe_list.find_and_replace(1, 2)
如果另一个线程在 indices_of 调用期间修改了列表,可能会获得错误的返回值,因此需要在搜索匹配的索引之前获取锁。出于同样的原因,find_and_replace 方法也必须获取锁。然而对于普通锁,当调用 find_and_replace 时,程序最终会永远挂起。因为 find_and_replace 方法首先获取锁,然后调用另一个方法,该方法尝试获取相同的锁,所以会一直阻塞。
这种情况下,我们应该使用 RLock 解决此问题,因为对 find_and_replace 的调用将始终从同一线程获取锁,所以这说明了何时需要使用可重入锁的一般准则。如果你正在开发个线程安全的类,其中包含获取锁的方法 A 和也需要获取锁并调用方法 A 的方法 B,那么你可能需要将 Lock(不可重入锁)换成 RLock(可重入锁)。
需要说明的是,不管是 Lock 还是 RLock,都可以实现互斥多个线程,保证同时只有一个线程能进入临界区。但 Lock 对于自己也是如此,即使是同一线程连续获得锁时,也会阻塞。而 RLock 不会,它是可重入锁,同一个线程可以获取多次,内部会有一个引用计数。
死锁
你可能熟悉新闻政治谈判中的僵局概念,即一方向另一方提出要求,另一方提出反要求。双方在下一步问题上存在分歧,谈判陷入僵局。计算机科学中的某些概念与之相似,因为我们达到了一个没有解决方案的共享资源争用状态,应用程序将永远挂起。
上面看到的不可重入锁可能导致程序永远挂起,这就是死锁的一个例子,这种情况下,我们会陷入与自己的谈判停滞不前的状态,要求获得一个永远不会释放的锁。而当两个线程使用多个锁时,也会出现这种情况。
假设线程 A 请求线程 B 获得的锁,而线程 B 正在请求 A 获得的锁,将陷入挂起和死锁。这种情况下,使用可重入锁也无济于事,因为有多个线程卡在等待另一个线程持有的资源上。
A:你先松手
B:不,你先松手
A:你松我就松
B:你松我就松
让我们看看如何在代码中创建这种类型的死锁,我们将创建两个锁,锁 A 和 B。需要具有获取这两个锁的两个方法:一种方法是先获取锁 A,然后获取锁 B;另一种方法是先获取锁 B,然后获取锁 A。
from threading import Lock, Thread
import time
lock_a = Lock()
lock_b = Lock()
def a():
with lock_a:
print("在函数 a 中获取 lock_a")
time.sleep(1)
with lock_b:
print("在函数 a 中获取 lock_b")
def b():
with lock_b:
print("在函数 b 中获取 lock_b")
time.sleep(1)
with lock_a:
print("在函数 b 中获取 lock_a")
t1 = Thread(target=a)
t2 = Thread(target=b)
t1.start()
t2.start()
t1.join()
t2.join()
"""
在函数 a 中获取 lock_a
在函数 b 中获取 lock_b
"""
首先调用函数 a 并获取锁 lock_a,然后引入了人为的延迟,让函数 b 有机会获取锁 lock_b,这将处于函数 a 持有锁 lock_a,而函数 b 持有锁 lock_b 的状态。接下来,函数 a 尝试获取锁 lock_b,但函数 b 持有该锁。同时函数 b 尝试获取锁 lock_a,但函数 a 持有它,挂起等待释放它的锁。两个方法都被挂起,彼此等待释放资源,程序陷入了死锁。
如何处理这种情况?一种解决方案是所谓的鸵鸟算法,它以鸵鸟在感觉到危险时将头伸入沙子的情况命名(尽管鸵鸟实际上并没有这种行为)。使用这种策略,我们忽略了问题,并设计了一种策略来在遇到问题时重新启动应用程序。这种方法背后的驱动理念是,如果问题很少出现,那么投资修复是不值得的。如果从上面的代码中去掉 sleep,将很少看到死锁发生,因为它依赖于一个非常特定的操作序列。但这并不是直正的修复,也不是理想的解决方案,而是用于很少发生死锁时的一种策略。
但是有一个简单的解决方法,将两个方法中的锁,更改为始终以相同的顺序获取。例如函数 a 和 b 都可以先获取锁 lock_a,然后获取锁 lock_b。于是问题很简单就解决了,因为我们永远不会按照可能发生死锁的顺序来获取锁。另一种选择是重构锁,这样我们就只使用一个锁而不是两个锁,一个锁不可能出现死锁(不包括我们前面看到的可重入的死锁)。总之,在处理需要获取的多个锁时,请事先考虑:"我是否以一致的顺序获取这些锁?有没有一种方法可以重构它,使它只使用一个锁?"
现在已经看到了如何使用 asyncio 有效地使用线程,并研究了更复杂的锁定场景。接下来,让我们看看如何使用线程将 asyncio 集成到现有的同步应用程序中,这些应用程序可能无法使用 asyncio 顺利运行。
单线程中的事件循环
我们主要关注于构建完全使用协程和 asyncio 自下而上实现的应用程序,当遇到那些不适合单线程并发模型的工作时,就在线程或进程中运行它。但并不是所有的应用程序都适合这个范例,如果在一个现有的同步应用程序中工作,并且希望使用 asyncio 该怎么办?
我们常遇到的这种情况是构建桌面用户界面,构建 GUI 的框架通常会运行自己的事件循环,而事件循环会阻塞主线程,这意味着任何长时间运行的操作都可能导致用户界面挂起。此外,这个 UI 事件循环也会阻止我们创建异步事件循环。下面我们来学习如何使用多线程同时将通多个事件循环,通过 tkinter 构建一个用户界面来展示。
Tkinter
Tkinter 是默认 Python 安装中提供的独立于平台的桌面 GUI 工具包,Tkinter 是 Tk interface 的缩写,是用 tcl 语言编写的低级 TK GUI 工具包的接口。随着 Tkinter Python 库的创建,它已经成长为 Python 开发人员构建桌面用户界面的流行方式。
Tkinter 带有一组小部件,例如标签、文本框和按钮,可将它们放置在桌面窗口中。当与小部件交互时,例如输入文本或按下按钮,可以触发一个函数来执行代码,至于响应用户操作而运行的代码可以是更新另一个小部件或触发另一个操作。
Tkinter 和其他许多 GUI 库通过自己的事件循环绘制小部件,并处理小部件交互。事件循环不断地重新绘制应用程序、处理事件并检查是否有任何代码应该运行,以响应小部件事件。为了熟悉 Tkinter 及其事件循环,让我们创建一个简单的 Hello world 应用程序,上面带有 Say hello 按钮,当点击该按钮时,将在控制台输出 Hello there!。
import tkinter
from tkinter import ttk
window = tkinter.Tk()
window.title("hello world app")
window.geometry("200x100")
def say_hello():
print("Hello there!")
hello_button = ttk.Button(window, text="Say hello", command=say_hello)
hello_button.pack()
window.mainloop()
此代码首先创建一个 Tkinter 窗口,并设置应用程序标题和窗口大小。然后在窗口上放置一个按钮,并将其命令设置为 say_hello 函数。当用户按下此按钮时,say_hello 函数将执行,输出消息。然后调用 window.mainloop() 来启动 Tk 事件循环,运行应用程序。
这里要注意的一件事是应用程序将阻塞在 window.mainloop() 处,在 mainloop 内部,此方法运行 Tk 事件循环。这是一个无限循环,它将检查窗口事件并不断重绘窗口,直到我们关闭它。Tk 事件循环与 asynio 事件循环有一些相似之处,例如如果尝试在按钮的命令中运行阻塞工作会发生什么?比如使用 time.sleep(10) 为 say_hello 函数添加10 秒的延迟,就会看到一个问题:应用程序将冻结 10 秒。
此时关都关不掉,只能强制杀死。所以与 asyncio 非常相似,Tkinter 在其事件循环中运行所有内容。这意味着如果有一个长时间运行的操作,例如发出一个 Web 请求或加载一个大型文件,将阻塞 Tk 事件循环,直到该操作完成。
所以整个过程就是事件循环不停地检测相关事件,一旦事件发生就执行绑定的操作,当操作执行完毕后再将执行权交给事件循环,继续响应事件。但某个操作如果耗时过长,执行权一直不交给事件循环(单线程),那么就无法响应后续的命令。而对用户的影响就是 UI 将挂起,并变得无响应。用户不能点击任何按钮,我们不能更新任何带有状态或进度的小部件,并且操作系统可能会显示个转轮以指示应用程序正在挂起。这显然是一个不受欢迎的、反应迟钝的用户界面。说白了就是如果操作能瞬间完成,那么事件循环就可以很快地响应下一个点击事件(或者说命令),但操作太慢了,导致事件循环拿不到执行权,从而无法响应用户的后续操作。
那么如何解决呢?很容易想到,如果可以发出不阻塞 Tk 事件循环的异步请求,就可以避免这个问题。但实际情况比看起来更棘手,因为 Tkinter 不支持 asyncio,而且你无法传入协程以在单击按钮时运行。而尝试在同一个线程中同时运行两个事件循环也是行不通的,Tkinter 和 asyncio 都是单线程的,所以这个想法就试图在同一个线程中同时运行两个无限循环一样,这无法实现。如果在Tkinter 事件循环之前启动 asyncio 事件循环,则 asyncio 事件循环将阻止 Tkinter 事件循环的运行,反之亦然。
那么问题来了,有没有办法在单线程应用程序中同时运行 asyncio 应用程序呢?事实上,可通过在单独的线程中运行 asyncio 事件循环来组合这两个事件循环,从而创建一个可以正常运行的应用程序。让我们看看如何使用一个应用程序来执行此操作,该应用程序将使用进度条来响应长时间运行任务的状态。
使用 asyncio 和线程构建响应式 UI
首先让我们介绍一下应用程序,并勾勒出一个基本的 UI。我们将构建一个 URI 压力测试应用程序,此应用程序将接收一个 URL 和发送请求的次数,当按下提交按钮时,将使用 aiohttp 尽快发送 Web 请求,向选择的 Web 服务器提供预定义的负载。由于这可能需要很长时间,我们将添加一个进度条来可视化测试中的进度。每完成 1% 的总请求,我们将更新进度条以显示进度。此外,如果用户愿意,可以提供取消请求的按钮。UI 将包含一些小部件,包括用于输入 URL 的文本框、用于设定请求次数的文本框、开始按钮和进度条。
现在已经勾勒出了 UI 结构,然后需要考虑如何让两个事件循环并行运行。基本思想是在主线程中运行 Tkinter 事件循环,并在单独的线程中运行 asyncio 事件循环,然后当用户单击提交时,我们将向 asyncio 事件循环提交一个协程来运行压力测试。随着压力测试的运行,将从 asyncio 事件循环发出命令返回到 Tkinter 事件循环,从而更新进度条。
这种新架构包括跨线程通信,这种情况下,我们需要注意竞态条件,特别是因为异步事件循环不是线程安全的。但 Tkinter 在设计时考虑了线程安全,因此从单独的线调用它的问题更少(至少在 Python 3.x 中是这样,稍后会进行更详细的介绍)。
我们可能很想使用 asyncio.run 从 Tkinter 提交协程,但是这个函数会阻塞,直到传入的协程完成,并导致 Tkinter 应用程序挂起。需要一个函数,它可以将协程提交到事件循环而不会阻塞。有一些新的 asyncio 函数需要学习,它们既是非阻塞的,又有内置的线程安全性,可以正确提交此类任务。第一个是异步事件循环中名为 call_soon_threadsafe 的方法,这个函数接收一个 Python 函数(不是协程),并安排它在 asyncio 事件循环的下一次迭代中以线程安全的方式执行它。第二个函数是 asyncio.run_coroutine_threadsafe,这个函数接收一个协程,会安排它以线程安全的方式运行。然后该函数会返回一个 future,我们可以使用它来访问协程的结果。重要但又令人困惑的是,这个 future 不是 asyncio 的 future,而是来自 concurrent.futures 模块中的 future。因为 asyncio 的 future 不是线程安全的,但 concurrent.futures 中的 future 是线程安全的,并且这两个 future 具有相同的功能。
让我们开始定义并实现一些类,从而根据上面描述的内容构建压力测试应用程序。首先构建压力测试类,该类将负责启动和停止一项压力测试,并跟踪已完成的请求数量。构造函数将接收一个 URL、一个异步事件循环、要发出的请求数量,以及一个进度更新器回调。当想要触发进度条更新时,会调用这个回调,开始实现 UI 时,此回调将触发对进度条的更新。在内部,将计算一个刷新率,这是执行回调的频率,我们会将这个比率设置为计划发送的总请求数的 1%。
import asyncio
from asyncio import AbstractEventLoop
from concurrent.futures import Future
from typing import Callable, Optional
from aiohttp import ClientSession
class StressTest:
def __init__(self, loop: AbstractEventLoop,
url: str, total_requests: int,
callback: Callable[[int, int], None]):
self._completed_requests: int = 0
self._load_test_future: Optional[Future] = None
self._loop = loop
self._url = url
self._total_requests = total_requests
self._callback = callback
self._refresh_rate = total_requests // 100
def start(self):
# 开始发出请求,并存储 future,以便以后可以在需要时取消
future = asyncio.run_coroutine_threadsafe(self._make_requests(), self._loop)
self._load_test_future = future
def cancel(self):
if self._load_test_future is not None:
# 如果想取消,在负载测试 future 时调用 cancel 函数
# 然后在内部调用 future 的 cancel 方法将 future 给取消掉
# 一旦取消了,future 里面的 self._make_request() 就不再执行了
self._loop.call_soon_threadsafe(self._load_test_future.cancel)
async def _get_url(self, session: ClientSession, url: str):
try:
await session.get(url)
except Exception as e:
print(e)
self._completed_requests += 1
# self._refresh_rate 为总请求数 // 100
# 所以每完成 self._refresh_rate 次请求,就代表完成了总请求次数的 1%
# 一旦完成了 1% 的请求,使用已完成的请求数和总请求数调用回调
if self._completed_requests % self._refresh_rate == 0 \
or self._completed_requests == self._total_requests:
self._callback(self._completed_requests, self._total_requests)
async def _make_requests(self):
async with ClientSession() as session:
requests = [self._get_url(session, self._url) for _ in range(self._total_requests)]
await asyncio.gather(*requests)
在 start 方法中,通过 run_coroutine_threadsafe 调用 self._make_requests 方法,它将开始 asyncio 事件循环上发出请求,还在 self.load_test_fuure 中跟踪返回的 future,跟踪这个 future 可以让我们在 cancel 方法中取消负载测试。在 self._make_requests 方法中,我们创建了一个列表协程来发出所有 web 请求,并将它们传递给 asyncio.gather 以运行它们。而发送请求是通过调用 self._get_url 协程实现的,并且每完成一个请求,就递增一次 self._completed_requests 计数器,并在必要时使用已完成的请求总数调用回调。可通过简单地实例化它,并调用 start 方法来使用这个类,也可通调用 cancel 方法来取消执行。
需要注意,尽管来自多个协程的更新发生在 self._completed_requests 计数器周围,并且我们没有使用任何锁。但请记住,asyncio 是单线程的,并且 asyncio 事件循环在任何给定时间只运行一段 Python代码,所以它是协程安全的。
接下来,让我们实现 Tkinter GUI 来使用这个负载测试器类,为了代码简洁,我们将直接继承 TK 类,并在构造函数中初始化小部件。当用户单击开始按钮时,我们将创建一个新的 StressTest 实例并启动它。
import asyncio
from asyncio import AbstractEventLoop
from concurrent.futures import Future
from typing import Callable, Optional
from threading import Thread
from queue import Queue
from tkinter import Tk, Label, Entry, ttk
from aiohttp import ClientSession
class StressTest:
def __init__(self, loop: AbstractEventLoop,
url: str, total_requests: int,
callback: Callable[[int, int], None]):
self._completed_requests: int = 0
self._load_test_future: Optional[Future] = None
self._loop = loop
self._url = url
self._total_requests = total_requests
self._callback = callback
self._refresh_rate = total_requests // 100
def start(self):
# 开始发出请求,并存储 future,以便以后可以在需要时取消
future = asyncio.run_coroutine_threadsafe(self._make_requests(), self._loop)
self._load_test_future = future
def cancel(self):
if self._load_test_future is not None:
self._loop.call_soon_threadsafe(self._load_test_future.cancel)
async def _get_url(self, session: ClientSession, url: str):
try:
await session.get(url)
except Exception as e:
print(e)
self._completed_requests += 1
if self._completed_requests % self._refresh_rate == 0 \
or self._completed_requests == self._total_requests:
self._callback(self._completed_requests, self._total_requests)
async def _make_requests(self):
async with ClientSession() as session:
requests = [self._get_url(session, self._url) for _ in range(self._total_requests)]
await asyncio.gather(*requests)
class LoadTester(Tk):
def __init__(self, loop, *args, **kwargs):
super().__init__(*args, **kwargs)
self._queue = Queue()
self._refresh_ms = 25
self._loop = loop
self._load_test: Optional[StressTest] = None
# 以下是一些窗口设置,不用太关注
self.title("URL Requester")
self._url_label = Label(self, text="URL:")
self._url_label.grid(column=0, row=0)
self._url_field = Entry(self, width=50)
self._url_field.grid(column=1, row=0)
self._request_label = Label(self, text="请求数:")
self._request_label.grid(column=0, row=1)
self._request_field = Entry(self, width=50)
self._request_field.grid(column=1, row=1)
# 当被单击时,提交按钮将调用 _start方法
self._submit = ttk.Button(self, text="提交", command=self._start)
self._submit.grid(column=2, row=1)
self._pb_label = Label(self, text="进度:")
self._pb_label.grid(column=0, row=3)
self._pb = ttk.Progressbar(self, orient="horizontal", length=200, mode="determinate")
self._pb.grid(column=1, row=3, columnspan=2)
def _update_bar(self, pct: int):
"""
更新进度条方法,将进度条设置为从 0 到 100 的百分比完成值
这个方法应该只在主线程中调用
"""
if pct == 100:
self._load_test = None
self._submit["text"] = "提交"
else:
self._pb["value"] = pct
self.after(self._refresh_ms, self._poll_queue)
def _queue_update(self, completed_requests: int, total_requests: int):
"""
传递给压力测试的回调函数,它将进度更新添加到队列中
"""
self._queue.put(int(completed_requests / total_requests * 100))
def _poll_queue(self):
if not self._queue.empty():
percent_complete = self._queue.get()
self._update_bar(percent_complete)
else:
if self._load_test:
self.after(self._refresh_ms, self._poll_queue)
def _start(self):
if self._load_test is None:
self._submit["text"] = "Cancel"
test = StressTest(self._loop, self._url_field.get(),
int(self._request_field.get()),
self._queue_update)
self.after(self._refresh_ms, self._poll_queue)
test.start()
self._load_test = test
else:
self._load_test.cancel()
self._load_test = None
self._submit["text"] = "提交"
class ThreadedEventLoop(Thread):
def __init__(self, loop: AbstractEventLoop):
super().__init__()
self._loop = loop
self.daemon = True
def run(self):
self._loop.run_forever()
loop = asyncio.new_event_loop()
asyncio_thread = ThreadedEventLoop(loop)
asyncio_thread.start()
app = LoadTester(loop)
app.mainloop()
首先定义一个继承自 Thread 的类 ThreadedEventLoop 来运行事件循环,在这个类的构造函数中,接收一个事件循环,并将线程设置为守护线程。将线程设置为守护进程是因为异步事件循环将在该线程中阻塞并永远运行,如果在非守护模式下运行,这种类型的无限循环将阻止 GUI 应用程序关闭。在线程的 run 方法中,调用事件循环的 run_forever 方法,它实际上只是启动事件循环,一直运行下去,直到停止事件循环。
一旦创建了这个类,就用 new_event_loop 方法创建一个新的 asyncio 事件循环。然后创建一个 ThreadedEventLoop 实例,将刚创建的循环传入并启动它。这将创建个新线程,其中运行事件循环。最后,创建 LoadTester 应用程序的一个实例,并调用 mainloop 方法,启动 Tkinter 事件循环。
现在我们已经了解了如何将线程用于各种 IO 密集型工作负载,但对于 CPU 密集型的工作负载呢?回顾一下,GIL 阻止在线程中同时运行 Python 字节码,但是有一些值得注意的例外情况,让我们可以在线程中执行一些 CPU 密集型操作。
使用线程执行 CPU 密集型工作
全局解释器锁是 Python 中的一个棘手同题,一般情况下,多线程只对阻塞 IO 工作有意义,因为 IO 会释放 GIL,在大多数情况下都是如此。但当库的大部分工作是在低级的 C 代码中完成时,也可以将 GIL 释放掉。像有一些著名的库,例如 hashlib 和 numpy,它们在纯 C 中执行 CPU 密集型工作时会释放 GIL,这使我们能够用多线程来提高某些 CPU 密集型工作负载的性能。
如果想释放 GIL 并避免发生并发错误,那么正在运行的代码必须是不和 Python 对象交互的纯 C 代码,此时多线程是可以并行的。
我们来看两个例子。
多线程与 hashlib
自古以来,数据安全从未像现在这样重要,确保数据不被黑客读取是避免泄露客户敏感数据(如密码、其他可用于识别或伤害客户的信息)的关键。
散列算法通过获取一段输入数据,并创建一段对人类来说不可读和不可恢复(如果算法是安全的)的新数据来解决这个问题。例如,密码 password 可能被散列成一个看完来更像 a12bc21df 的字符串。虽然没有人可以读取或恢复输入数据,但我们仍能检查一条数据是否与哈希值匹配,这对于在登录时验证用户密码或检查数据是否被篡改等场景中非常有用。
当今有许多不同的哈希算法,例如 SHA512、BLAKE2 和 scrypt,但 SHA 不是存储密码的最佳选择,因为它容易受到暴力攻击。其中一些算法在 Python 的 hashlib 库中实现,该库中的许多函数在散列大于 2048 字节的数据时释放 GIL,因此多线程是提高该库性能的一种方法。此外,用于散列密码的 scrypt 函数总是释放GIL。
让我们了解一个假设场景,看看多线程何时可能为 hashlib 带来性能提升。想象一下你刚刚开始在一家公司担任首席软件架构师,经理为你分配了第一个 bug,让你开始学习公司的开发过程--登录系统的一个小问题。为了调试这个问题,你开始查看一些数据库表,但令你惊讶的是,你发现所有客户的密码都以明文形式存储。这意味着,如果数据库遭到入侵,攻击者可以获取所有客户的密码,并以他们的身份登录,从而可能暴露敏感数据,例如保存的信用卡号。你将此问题告诉了你的经理,他要求你尽快找到问题的解决方案。
使用 scrypt 算法对明文密码进行哈希处理是解决此类问题的很好方法,它是安全的,并且原始密码是不可恢复的,因为它引入了 salt。salt 是一个随机数,可确保为密码获得的哈希值是唯一的。为了使用 scrypt 进行测试,我们快速编写一个同步脚本来创建随机密码,并对它们进行哈希处理,以了解需要运行多长时间。在本例中,我们将测试 10000 个随机密码。
import hashlib, os, string, time, random
def random_password(length: int) -> bytes:
samples = string.ascii_letters
return "".join(random.choice(samples) for _ in range(length)).encode("utf-8")
passwords = [random_password(10) for _ in range(10000)]
def hash_password(password: bytes):
salt = os.urandom(16)
return hashlib.scrypt(password, salt=salt, n=2048, p=1, r=8)
start = time.perf_counter()
for password in passwords:
hash_password(password)
end = time.perf_counter()
print(end - start)
"""
27.922882899999998
"""
首先编写一个函数来创建随机大小写密码,然后使用它创建 10000 个随机密码,每个密码 10个字符。然后使用 scrypt 函数对每个密码进行哈希处理,这里我们忽略一些细节(scrypt 函数的 n、p 和 r 参数),但这些参数可用于调优哈希的安全性和内存/CPU 的使用情况。
在我当前的 i7-13700k 上运行此代码,只需 28 秒就可以完成,这个运行时间还算是不错的。但问题是你的用户数据众多,你需要哈希 100000000 个密码,根据这个测试推算,你大概需要 20 多天才能完成这个任务。为了缩短这个时间,你可能会将数据集分开,在多台机器上运行这个过程。虽然这是一个办法,但是不是有些浪费资源呢?因为我们的机器都是多核的,目前只用了一个核,可不可以通过多线程来解决这个问题呢?来试一下。
import hashlib
import os
import random
import string
import time
from concurrent.futures import ThreadPoolExecutor
import asyncio
def random_password(length: int) -> bytes:
samples = string.ascii_letters
return "".join(random.choice(samples) for _ in range(length)).encode("utf-8")
passwords = [random_password(10) for _ in range(10000)]
def hash_password(password: bytes):
salt = os.urandom(16)
return hashlib.scrypt(password, salt=salt, n=2048, p=1, r=8)
async def main():
loop = asyncio.get_running_loop()
tasks = []
with ThreadPoolExecutor() as pool:
for password in passwords:
tasks.append(loop.run_in_executor(pool, hash_password, password))
results = await asyncio.gather(*tasks)
print(len(results))
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(end - start)
"""
10000
2.5447689000000002
"""
这种方法需要创建一个线程池执行器,并为希望进行散外计算的每个密码创建一个任务。由于 hashlib 释放了 GIL,我们实现了不错的性能提升。这段代码只需要是 2.5 秒,而不是之前的 28 秒。这样运行时间大幅度减少,我们再在不同的机器上同时运行这个应用程序,从而进一步减少运行时间,或者使用一个具有更多 CPU 核心的机器。
多线程与 NumPy
NumPy 是一个非常流行的 Python 库,广泛用于数据科学和机器学习项目,它含许多数学函数,其性能往往优于普通的 Python 数组。性能的提高是因为底层库的大部分是用 C 和 Fortran 实现的,这两种低级语言比 Python 性能更好。
因为这个库的许多操作都在 Python 之外的低级代码中,这为 NumPy 释放 GIL 并允许我们对一些代码进行多线程处理提供了机会。这里需要注意的是,这个功能没有详细的说明文档,但通常可以安全地假设矩阵运算可能是多线程的,从而获得性能份势。也就是说,根据 NumPy 函数的实现方式,性能提升可能很大也可能很小。如果代码直接调用 C 函数并释放 GIL,则有可能获得更大的性能提升。如果调用中使用很多 Python 代码,那么性能提升将会较小。鉴于这没有详细的文档可供参考,你可能需要通过多次尝试来找到合适的设置。另外,你需要评估与性能提升相比,这些额外的设置所带来的成本是否合算。
为了在实际看到这一点,我们创建一个包含 50 行 30 亿个数据点的大型矩阵,任务是求出每一行的平均值。NumPy 有一个平均值函数 mean,可以用来计算平均值。这个函数有一个 axis 参数,它让我们可以计算一个轴上的所有平均值,而不必编写循环。将 axis 设置为 0 表示计算每一列的平均值,将 axis 设置为 1 表示计算每一行的平均值,显然这里我们要设置为 1。
import numpy as np
import time
data_points = 3000000000
rows = 50
# 创建数组
array = np.arange(data_points).reshape(rows, -1)
start = time.perf_counter()
# 计算平均值
res = np.mean(array, axis=1)
end = time.perf_counter()
print(end - start)
"""
2.8423732
"""
下面让我们稍微调整一下这段代码,使用线程进行计算。在单独的线程中运行每一行的均值,并使用 asyncio.gather 等待所有行的均值。
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
import asyncio
def mean_for_row(arr, row):
return np.mean(arr[row])
data_points = 3000000000
rows = 50
# 创建数组
array = np.arange(data_points).reshape(rows, -1)
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
tasks = []
for i in range(rows):
tasks.append(loop.run_in_executor(pool, mean_for_row, array, i))
result = await asyncio.gather(*tasks)
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(end - start)
"""
0.9795692999999996
"""
首先创建一个 mean_for_row 函数,它计算一行的平均值。由于在单独的线程中计算每一行的均值,因此不需要再指定 axis 参数。然后创建一个带有线程池执行器的主协程,并创建一个任务来计算每一行的平均值,等待所有计算完成。
我们看运行时间从 2.84 秒变成了 0.97 秒,性能提升了约 3 倍。某些情况下,多线程可以帮我们使用 Numpy。但请记住,在尝试使用线程化或 multiprocessing 来提高性能之前,应该尽可能对 NumPy 代码进行向量化,这意味着避免使用 Python 循环或 NumPy 的 apply_along_axis(只是隐藏了 Python 循环)这样的函数。在 NumPy 中,通过将尽可能多的计算库的底层实现,通常会得到更好的性能。
小结
在本篇文章中,我们学习了以下内容:
- 如何使用线程模块运行 IO 密集型工作;
- 如何在应用程序关闭时干净地终止线程;
- 如何使用线程池执行器将工作分配给线程池,这允许使用像 gather 这样的 asyncio API 来等待线程的结果;
- 如何使用现有的阻塞 IO API,例如请求,并在线程中运行它们,并使用线程池和 asyncio 来提升性能;
- 如何使用线程模块中的锁来避免竞态条件,此外我们还学习了如何使用可重入锁来避免死锁;
- 如何在单独的线程中运行 asyncio 事件循环,并以线程安全的方式向其发送协程,这让我们可以使用 Tkinter 等框架构建响应式用户界面;
- 如何为 hashlib 和 numpy 使用多线程,低级库有时会释放 GIL,这让我们可以使用线程处理 CPU 密集型工作;