首页 > 编程语言 >Python 之线程池

Python 之线程池

时间:2024-09-04 10:03:26浏览次数:17  
标签:__ Python threading Future 线程 printed test

Python 之线程池

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

一、线程池的使用

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
    shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

使用线程池来执行线程任务的步骤如下:

  • 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  • 定义一个普通函数作为线程任务。
  • 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  • 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

1. 使用 submit 方法提交任务

下面程序示范了如何使用线程池来执行线程任务:

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
    time.sleep(2)
    return 'finished'

def test_result(future):
    print(future.result())

if __name__ == "__main__":
    import numpy as np
    from concurrent.futures import ThreadPoolExecutor
    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
        future = threadPool.submit(test, i,i+1)

    threadPool.shutdown(wait=True)

结果如下:

结果:

test__0 threading is printed 0, 1
test__1 threading is printed 1, 2
test__2 threading is printed 2, 3
test__3 threading is printed 3, 4
test__1 threading is printed 4, 5
test__0 threading is printed 5, 6
test__3 threading is printed 6, 7

1.2 使用 map 方法提交任务

二、获取执行结果

2.1 使用 Future dd_done_callback() 回调函数来获取返回值

前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。

如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
直接调用result函数结果

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
    time.sleep(2)
    return 'finished'

def test_result(future):
    print(future.result())

if __name__ == "__main__":
    import numpy as np
    from concurrent.futures import ThreadPoolExecutor
    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
        future = threadPool.submit(test, i,i+1)
#         future.add_done_callback(test_result)
        print(future.result())

    threadPool.shutdown(wait=True)
    print('main finished')
结果:

test__0 threading is printed 0, 1
finished
test__0 threading is printed 1, 2
finished
test__1 threading is printed 2, 3
finished

去掉上面注释部分,调用future.add_done_callback函数,注释掉第16行

test__0 threading is printed 0, 1
test__1 threading is printed 1, 2
test__2 threading is printed 2, 3
test__3 threading is printed 3, 4
finished
finished
finished
test__1 threading is printed 4, 5
test__0 threading is printed 5, 6
finished

标签:__,Python,threading,Future,线程,printed,test
From: https://www.cnblogs.com/alex-oos/p/18395921

相关文章

  • SSA(麻雀优化算法)+CNN+LSTM时间序列预测算法(python代码)
    1.SSA(SparrowSearchAlgorithm)简介:SSA是一种新兴的群体智能优化算法,模拟麻雀觅食行为。麻雀群体中的“发现者”负责寻找食物,并将信息传递给“追随者”,后者根据这一信息进行觅食。SSA通过这种合作机制寻找最优解。SSA在优化问题中可以视为一种元启发式算法,擅长在复杂搜索......
  • Python2数据传输测试脚本
    服务端#-*-coding:utf-8-*-importsocketimportthreadingHOST='0.0.0.0'PORT=12345defhandle_client(conn,addr):print"连接地址:",addrtry:whileTrue:data=conn.recv(1024000)......
  • A-计算机毕业设计定制:76114客户关系管理系统(免费领源码)可做计算机毕业设计JAVA、PHP
    摘 要 随着信息化时代的发展,各行各业都逐渐意识到客户关系管理的重要性。传统的客户管理方式已经无法满足日益增长的客户群体及复杂的业务需求。因此,客户关系管理系统应运而生,以提高服务质量、降低成本、促进营销活动,并实现客户与企业之间更紧密的互动。本文主要探讨如何......
  • python 常见OS基本操作
    python常见OS基本操作Python的os模块提供了与操作系统交互的各种功能。下面按照一些常见的使用场景,按顺序介绍os模块的一些基本操作及其示例代码。1.获取和改变当前工作目录首先,通常我们会获取当前的工作目录,然后可能会改变工作目录。python深色版本importos#获取......
  • 员工工作服穿戴AI识别 Python
    员工工作服穿戴AI识别系统是基于人工智能技术,员工工作服穿戴AI识别通过在工厂和电力场景内部安装摄像头,对员工的工作服穿戴情况进行实时监控。当员工的工作服穿戴不符合规范时,员工工作服穿戴AI识别将自动发出警报,及时通知现场管理人员进行处理。员工工作服穿戴AI识别24小时不间断运......
  • 一个C++的 线程基类
      #include<iostream>#include<thread>#include<mutex>#include<condition_variable>#include<atomic>classThreadBase{public:ThreadBase():thread_(nullptr),stopFlag_(false){}virtual~ThreadBase(){......
  • 二、并发编程与多线程-2.1、J.U.C和锁(下篇)
    2.1、J.U.C和锁(下篇)2.1.8、什么是可重入锁?它的作用是什么?答:在Java中,可重入锁是一种同步机制,它允许同一个线程多次获取同一个锁。当一个线程持有该锁时,它可以反复进入被该锁保护的代码块,而不会被阻塞。这种机制也被称为递归锁。比如synchronized锁和ReentrantLock锁都是可......
  • 多线程、任务、异步的区别
    Task和Thread的区别这是一个高频,深刻的问题,无论去哪都逃不过被询问这个问题。Task是基于Thread的,这是众所周知的。但是Task和Thread的联系如此简单和纯粹确实我没想到的。甚至只需要几十行代码就能呈现其原理。一个简单的模拟实例说明Task及其调度问题,这真是一篇好文章。任务体......
  • Python多态
    #1多态#指同一种行为具有不同的表现形式#1.1多态的前提#继承#重写#classAnimal:#  defspeak(self):#    print('动物')#classDog(Animal):#  defspeak(self):#    print('狗')##classCat(Animal):#  defspeak(se......
  • 【Python学习笔记】第1章 问答环节
    人们为什么使用Python软件质量:可读性、可维护性开发者生产效率:代码更少程序的可移植性:同样的代码在不同的操作系统中都可以运行标准库的支持:内置可移植的功能模块组件构成:轻松地与应用程序的其他部分通信享受乐趣:略软件质量追求代码简洁,可读性模块化、面向......