首页 > 其他分享 >ThreadPoolExecutor的使用

ThreadPoolExecutor的使用

时间:2023-05-14 22:23:35浏览次数:44  
标签:函数 print 任务 Future 线程 使用 action ThreadPoolExecutor

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

ThreadPoolExecutor(max_workers=2, thread_name_prefix='test_thread')

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。


在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

使用线程池来执行线程任务的步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。


下面程序示范了如何使用线程池来执行线程任务:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

上面程序中,第 13 行代码创建了一个包含两个线程的线程池,接下来的两行代码只要将 action() 函数提交(submit)给线程池,该线程池就会负责启动线程来执行 action() 函数。这种启动线程的方法既优雅,又具有更高的效率。

当程序把 action() 函数提交给线程池时,submit() 方法会返回该任务所对应的 Future 对象,程序立即判断 futurel 的 done() 方法,该方法将会返回 False(表明此时该任务还未完成)。接下来主程序暂停 3 秒,然后判断 future2 的 done() 方法,如果此时该任务已经完成,那么该方法将会返回 True。

程序最后通过 Future 的 result() 方法来获取两个异步任务返回的结果。

读者可以自己运行此代码查看运行结果,这里不再演示。

当程序使用 Future 的 result() 方法来获取结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。

获取执行结果

前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。

如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

下面程序使用 add_done_callback() 方法来获取线程任务的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task, 50会作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池再提交一个task, 100会作为action()函数的参数
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 为future1添加线程完成的回调函数
    future1.add_done_callback(get_result)
    # 为future2添加线程完成的回调函数
    future2.add_done_callback(get_result)
    print('--------------')

上面主程序分别为 future1、future2 添加了同一个回调函数,该回调函数会在线程任务结束时获取其返回值。

主程序的最后一行代码打印了一条横线。由于程序并未直接调用 future1、future2 的 result() 方法,因此主线程不会被阻塞,可以立即看到输出主线程打印出的横线。接下来将会看到两个新线程并发执行,当线程任务执行完成后,get_result() 函数被触发,输出线程任务的返回值。

另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。

此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。

例如,如下程序使用 Executor 的 map() 方法来启动线程,并收集线程任务的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
    # 使用线程执行map计算
    # 后面元组有3个元素,因此程序启动3条线程来执行action函数
    results = pool.map(action, (50, 100, 150))
    print('--------------')
    for r in results:
        print(r)

上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。

运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。

通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。也就是说,上面 results 的第一个元素是 action(50) 的结果,第二个元素是 action(100) 的结果,第三个元素是 action(150) 的结果。

import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed, wait


def async_add(max):
    sum = 0
    for i in range(max):
        sum += i
    time.sleep(1)
    print(f'{threading.current_thread().name} execute finished, result is {sum}')
    return sum
    

def call_back(future):
    print(f'future state: {future._state}')
    print('in call_back func')
    
     



if __name__ == '__main__':
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix='test_thread')
    future1 = pool.submit(async_add, 20)
    print(future1.done())
    print(future1.result())
    future1.add_done_callback(call_back)

    for i in range(1, 15):
        pool.submit(async_add, i)

    # 也可以使用 with 语句创建线程池
    # with ThreadPoolExecutor(max_workers=3) as pool:
    #     for i in range(1, 14):
    #         pool.submit(async_add, i)

    # as_completed() 接收一个 future 列表
    # all_task = [pool.submit(async_add, (i)) for i in range(15)]
    # for future in as_completed(all_task):
    #     res = future.result()
    #     print(f"res: {res}")

    # 通过 pool 的 map 获取已经完成的 task 的值
    # for data in executor.map(async_add, [i for i in range(15)]):
    #     print(f"res: {data}")

    print(f'-----{threading.current_thread().name} execute finished-----')

 

标签:函数,print,任务,Future,线程,使用,action,ThreadPoolExecutor
From: https://www.cnblogs.com/hushaojun/p/17400405.html

相关文章

  • 使用4G通信模块和MQTT协议,完成物联网设备开发。
    使用4G通信模块和MQTT协议,完成物联网设备开发。(1)安装并使用4G模块通信模块,建立microPython开发环境;(2)使用提供的Demo开发例程,使用MQTT传输协议连接阿里或腾讯网站,完成物联网设备开发。(3)将温湿度信息上传到网站;(4)手机APP查看数一、这是我之前写关于阿里云怎么在线调试设备的。......
  • Mybatis-Plus使用技巧
    selectOne和selectListselectOne如果没有数据会得到nullselectList如果没有数据会得到长度为0的list自动填充任何使用wrapper的时候,自动填充都是失效的,必须带实体类,可以new一个更新字段为null默认情况:全局配置默认值为not_null,传递的参数中某个字段为null,则默认不会对为nu......
  • ThreadPoolExecutor获取线程池中已经运行完的任务结果
    方法一:使用as_completed函数fromconcurrent.futuresimportThreadPoolExecutor,as_completedfromtimeimportsleepdefmethod(times):sleep(times)print('sleep{}secondes'.format(times))returntimespool=ThreadPoolExecutor(max_wor......
  • Qt5 C++ 多线程工业气体标定 1)使用OPC 封装COM 2)C++调用OPC; 3
    Qt5C++多线程工业气体标定1)使用OPC封装COM2)C++调用OPC;3)使用经典界面;4)使用QT专业皮肤编程qss;5)C++链接PLC读写数据;6)赠送KEPSVR服务器;参数如下:-----------------------------1)编程语言:C++(11或以上);-----------------------------2)编程环境:QT5.14;-----------------------------......
  • Ubuntu下串口工具 PicoCOM 的使用和时间戳显示
    PICOCOMUbuntu下的串口软件,除了CuteCOM,screen,MiniCOM以外,还有一个和MiniCOM很像的PicoCOM.最近在调试CH340C串口的过程中,发现只有PicoCOM的连接Reset才能正常工作,因此单独记录一下.GitHub仓库https://github.com/npat-efault/picocom仓库的所有者Ni......
  • 深入理解计算机网络:使用Python和Socket编程实现TCP_IP协议族
    本文将介绍如何使用Python和Socket编程实现TCP/IP协议族。TCP/IP协议族是互联网上使用最广泛的协议族之一,TCP协议和IP协议是其最核心的两个协议。在本文中,我们将分别介绍TCP协议和IP协议的基本原理,并使用Python和Socket编程实现它们。TCP协议TCP协议是一种面向连接的、可靠的传输协......
  • c# can总线开发接口源代码,完美解决内存溢出,程序闪退问题,可放心使用。
    c#can总线开发接口源代码,完美解决内存溢出,程序闪退问题,可放心使用。另本示例中还有TCPClient,串口连接源代码,供给有需要的人学习。ID:4830648911771998......
  • c# mqtt高性能服务器端源代码。 你还在使用第三方服务软件吗?不如试试这
    c#mqtt高性能服务器端源代码。你还在使用第三方服务软件吗?不如试试这个开发框架,助你一臂之力,无限制,无全开源,无版权约束,全是自主开发。开源框架包括服务器和客户端,支持mqtt3.0及5.0。可嵌入到自己的服务系统及软件客户端中,不受第三方约束。你要问我稳定性如何?我能回答的是已经运行......
  • 微信使用SQLite?
    SQLite是一个被大家低估的数据库,但有些人认为它是一个不适合生产环境使用的玩具数据库。事实上,SQLite是一个非常可靠的数据库,它可以处理TB级的数据,但它没有网络层。接下来,本文将与大家共同探讨SQLite在过去一年中最新的SQL功能。SQLite“只是”一个库,它不是传统意义上的服......
  • Android 使用 camera2 拍照
    本文示例代码可见:Github-AndroidCamera2TakePhotocamera2基础从Android5.0开始,Google重新设计了相机功能的架构,并提供了camera2API,以取代已弃用的cameraAPI。android.hardware.camera2包是提供了用于连接Android设备和各个相机设备的API,这些API不仅大幅提高......