首页 > 编程语言 >[python]多线程快速入门

[python]多线程快速入门

时间:2024-10-29 19:58:43浏览次数:6  
标签:__ 10 入门 thread python self threading time 多线程

前言

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。由于CPython的GIL限制,多线程实际为单线程,大多只用来处理IO密集型任务。

Python一般用标准库threading来进行多线程编程。

基本使用

  • 方式1,创建threading.Thread类的示例
import threading
import time

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 创建三个线程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

    # join() 用于阻塞主线程, 等待子线程执行完毕
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

执行输出示例

main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
  • 方式2,继承threading.Thread类,重写run()__init__()方法
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, counter: int):
        super().__init__()
        self.counter = counter

    def run(self):
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, start time: {time.strftime('%F %T')}")
        num = self.counter
        while num > 0:
            time.sleep(3)
            num -= 1
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, end time: {time.strftime('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 创建三个线程
    t1 = MyThread(7)
    t2 = MyThread(5)
    t3 = MyThread(3)

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

    # join() 用于阻塞主线程, 等待子线程执行完毕
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

继承threading.Thread类也可以写成这样,调用外部函数。

import threading
import time

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")

class MyThread(threading.Thread):
    def __init__(self, target, args: tuple):
        super().__init__()
        self.target = target
        self.args = args
    
    def run(self):
        self.target(*self.args)

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 创建三个线程
    t1 = MyThread(target=task1, args=(7,))
    t2 = MyThread(target=task1, args=(5,))
    t3 = MyThread(target=task1, args=(3,))

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

    # join() 用于阻塞主线程, 等待子线程执行完毕
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

多线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的后果,这时候就需要某些同步机制。比如如下代码,结果是随机的(个人电脑用python3.13实测结果都是0,而低版本的python3.6运行结果的确是随机的)

import threading
import time

num = 0

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    for _ in range(100000000):
        num = num + counter
        num = num - counter
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 创建三个线程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    t4 = threading.Thread(target=task1, args=(6,))
    t5 = threading.Thread(target=task1, args=(8,))

    # 启动线程
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    # join() 用于阻塞主线程, 等待子线程执行完毕
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Lock-锁

使用互斥锁可以在一个线程访问数据时,拒绝其它线程访问,直到解锁。threading.Thread中的Lock()Rlock()可以提供锁功能。

import threading
import time

num = 0

mutex = threading.Lock()

def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    mutex.acquire()
    for _ in range(100000):
        num = num + counter
        num = num - counter
    mutex.release()
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 创建三个线程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))

    # 启动线程
    t1.start()
    t2.start()
    t3.start()

    # join() 用于阻塞主线程, 等待子线程执行完毕
    t1.join()
    t2.join()
    t3.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Semaphore-信号量

互斥锁是只允许一个线程访问共享数据,而信号量是同时允许一定数量的线程访问共享数据。比如银行有5个窗口,允许同时有5个人办理业务,后面的人只能等待,待柜台有空闲才可以进入。

import threading
import time
from random import randint

semaphore = threading.BoundedSemaphore(5)

def business(name: str):
    semaphore.acquire()
    print(f"{time.strftime('%F %T')} {name} is handling")
    time.sleep(randint(3, 10))
    print(f"{time.strftime('%F %T')} {name} is done")
    semaphore.release()

if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    threads = []
    for i in range(10):
        t = threading.Thread(target=business, args=(f"thread-{i}",))
        threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()
    
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

执行输出

main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30

Condition-条件对象

Condition对象能让一个线程A停下来,等待其他线程,其他线程通知后线程A继续运行。

import threading
import time
import random

class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()

    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 到达公司")
            self.cond.wait()  # 等待通知
            print(f"{time.strftime('%F %T')} {self.username} 开始工作")
            time.sleep(random.randint(1, 5))
            print(f"{time.strftime('%F %T')} {self.username} 工作完成")

class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()

    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 发出通知")
            self.cond.notify_all()  # 通知所有线程
        time.sleep(2)

if __name__ == "__main__":
    cond = threading.Condition()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"员工{i}", cond))

    for employee in employees:
        employee.start()
    boss.start()

    boss.join()
    for employee in employees:
        employee.join()

执行输出

2024-10-26 21:16:20 员工0 到达公司
2024-10-26 21:16:20 员工1 到达公司
2024-10-26 21:16:20 员工2 到达公司
2024-10-26 21:16:20 员工3 到达公司
2024-10-26 21:16:20 员工4 到达公司
2024-10-26 21:16:20 老王 发出通知
2024-10-26 21:16:20 员工4 开始工作
2024-10-26 21:16:23 员工4 工作完成
2024-10-26 21:16:23 员工1 开始工作
2024-10-26 21:16:28 员工1 工作完成
2024-10-26 21:16:28 员工2 开始工作
2024-10-26 21:16:30 员工2 工作完成
2024-10-26 21:16:30 员工0 开始工作
2024-10-26 21:16:31 员工0 工作完成
2024-10-26 21:16:31 员工3 开始工作
2024-10-26 21:16:32 员工3 工作完成

Event-事件

在 Python 的 threading 模块中,Event 是一个线程同步原语,用于在多个线程之间进行简单的通信。Event 对象维护一个内部标志,线程可以使用 wait() 方法阻塞,直到另一个线程调用 set() 方法将标志设置为 True。一旦标志被设置为 True,所有等待的线程将被唤醒并继续执行。

Event 的主要方法

  1. set():将事件的内部标志设置为 True,并唤醒所有等待的线程。
  2. clear():将事件的内部标志设置为 False
  3. is_set():返回事件的内部标志是否为 True
  4. wait(timeout=None):如果事件的内部标志为 False,则阻塞当前线程,直到标志被设置为 True 或超时(如果指定了 timeout)。
import threading
import time
import random

class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()

    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 到达公司")
        self.cond.wait()  # 等待事件标志为True
        print(f"{time.strftime('%F %T')} {self.username} 开始工作")
        time.sleep(random.randint(1, 5))
        print(f"{time.strftime('%F %T')} {self.username} 工作完成")

class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()

    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 发出通知")
        self.cond.set()

if __name__ == "__main__":
    cond = threading.Event()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"员工{i}", cond))

    for employee in employees:
        employee.start()
    boss.start()

    boss.join()
    for employee in employees:
        employee.join()

执行输出

2024-10-26 21:22:28 员工0 到达公司
2024-10-26 21:22:28 员工1 到达公司
2024-10-26 21:22:28 员工2 到达公司
2024-10-26 21:22:28 员工3 到达公司
2024-10-26 21:22:28 员工4 到达公司
2024-10-26 21:22:28 老王 发出通知
2024-10-26 21:22:28 员工0 开始工作
2024-10-26 21:22:28 员工1 开始工作
2024-10-26 21:22:28 员工3 开始工作
2024-10-26 21:22:28 员工4 开始工作
2024-10-26 21:22:28 员工2 开始工作
2024-10-26 21:22:30 员工3 工作完成
2024-10-26 21:22:31 员工4 工作完成
2024-10-26 21:22:31 员工2 工作完成
2024-10-26 21:22:32 员工0 工作完成
2024-10-26 21:22:32 员工1 工作完成

使用队列

Python的queue模块提供同步、线程安全的队列类。以下示例为使用queue实现的生产消费者模型

import threading
import time
import random
import queue


class Producer(threading.Thread):
    """多线程生产者类."""

    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        super().__init__()

    def run(self) -> None:
        """Method representing the thread's activity."""

        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信号事件"
                )
                break
            if self.channel.full():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 队列已满, 全部停止生产"
                )
                self.done.set()
            else:
                num = random.randint(100, 1000)
                self.channel.put(f"{self.tname}-{num}")
                print(
                    f"{time.strftime('%F %T')} {self.tname} 生成数据 {num}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))


class Consumer(threading.Thread):
    """多线程消费者类."""

    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        self.counter = 0
        super().__init__()

    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信号事件"
                )
                break
            if self.counter >= 3:
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 全部停止消费"
                )
                self.done.set()
                continue
            if self.channel.empty():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 队列为空, counter: {self.counter}"
                )
                self.counter += 1
                time.sleep(1)
                continue
            else:
                data = self.channel.get()
                print(
                    f"{time.strftime('%F %T')} {self.tname} 消费数据 {data}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))
                self.counter = 0


if __name__ == "__main__":
    done_p = threading.Event()
    done_c = threading.Event()
    channel = queue.Queue(30)
    threads_producer = []
    threads_consumer = []

    for i in range(8):
        threads_producer.append(Producer(f"producer-{i}", channel, done_p))

    for i in range(6):
        threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))

    for t in threads_producer:
        t.start()

    for t in threads_consumer:
        t.start()

    for t in threads_producer:
        t.join()

    for t in threads_consumer:
        t.join()

线程池

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或其他更多资源。在多线程程序中,生成一个新线程之后销毁,然后再创建一个,这种方式就很低效。池化多线程,也就是线程池就为此而生。

将任务添加到线程池中,线程池会自动指定一个空闲的线程去执行任务,当超过最大线程数时,任务需要等待有新的空闲线程才会被执行。Python一般可以使用multiprocessing模块中的Pool来创建线程池。

import time

from multiprocessing.dummy import Pool as ThreadPool

def foo(n):
    time.sleep(2)


if __name__ == "__main__":
    start = time.time()
    for n in range(5):
        foo(n)
    print("single thread time: ", time.time() - start)

    start = time.time()
    t_pool = ThreadPool(processes=5)  # 创建线程池, 指定池中的线程数为5(默认为CPU数)
    rst = t_pool.map(foo, range(5))  # 使用map为每个元素应用到foo函数
    t_pool.close()  # 阻止任何新的任务提交到线程池
    t_pool.join()  # 等待所有已提交的任务完成
    print("thread pool time: ", time.time() - start)

线程池执行器

python的内置模块concurrent.futures提供了ThreadPoolExecutor类。这个类结合了线程和队列的优势,可以用来平行执行任务。

import time
from random import randint
from concurrent.futures import ThreadPoolExecutor

def foo() -> None:
    time.sleep(2)
    return randint(1,100)

if __name__ == "__main__":
    start = time.time()
    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for n in range(10):
            futures.append(executor.submit(foo))  # Fan out
            
    for future in futures:  # Fan in
        print(future.result())
    print("thread pool executor time: ", time.time() - start)

执行输出

44
19
86
48
35
74
59
99
58
53
thread pool executor time:  4.001955032348633

ThreadPoolExecutor类的最大优点在于:如果调用者通过submit方法把某项任务交给它执行,那么会获得一个与该任务相对应的Future实例,当调用者在这个实例上通过result方法获取执行结果时,ThreadPoolExecutor会把它在执行任务的过程中所遇到的异常自动抛给调用者。而ThreadPoolExecutor类的缺点是IO并行能力不高,即便把max_worker设为100,也无法高效处理任务。更高需求的IO任务可以考虑换异步协程方案。

参考

  • 郑征《Python自动化运维快速入门》清华大学出版社
  • Brett Slatkin《Effective Python》(2nd) 机械工业出版社

标签:__,10,入门,thread,python,self,threading,time,多线程
From: https://www.cnblogs.com/XY-Heruo/p/18514316

相关文章

  • python+flask框架的基于微信小程序的校园跑腿系统服务端视频8(开题+程序+论文) 计算机
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着移动互联网技术的快速发展,微信小程序作为一种轻量级的应用形式,已广泛应用于各类服务场景。在校园环境中,学生对于便捷生活服务的需求日......
  • python+flask框架的基于微信小程序的校园互助平台服务端视频8(开题+程序+论文) 计算机
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着移动互联网技术的飞速发展,微信小程序作为一种轻量级的应用形式,已经深入到人们生活的方方面面。在校园环境中,学生之间经常存在各种互助......
  • python+flask框架的基于微信小程序的校园跳蚤市场管理系统的设计与实现服务端视频8(开
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景随着移动互联网技术的快速发展,微信小程序作为一种轻量级的应用形式,在校园生活中的应用日益广泛。校园跳蚤市场作为学生二手物品交易的重要......
  • 【Python入门】7天速成Python桌面应用开发高手,WxPython vs PyQt:谁更胜一筹?
    ......
  • 深度学习入门笔记——DataLoader的使用
    如何使用数据集DataSet?在介绍DataLoader之前,需要先了解数据集DataSet的使用。Pytorch中集成了很多已经处理好的数据集,在pytorch的torchvision、torchtext等模块有一些典型的数据集,可以通过配置来下载使用。以CIFAR10数据集为例,文档已经描述的很清晰了,其中要注意的就是transform......
  • Python Web 前后端分离 后台管理系统 Django+vue(完整代码)
    1.前后端分离的架构在前后端分离的架构中,前端和后端分别作为独立的项目进行开发和部署。前端项目通过API与后端项目进行通信。前端:使用Vue.js构建用户界面,调用后端提供的RESTfulAPI获取和发送数据。后端:使用Django构建API,处理业务逻辑和数据存储2.创建django项目及vue项目......
  • python+flask框架的基于微信小程序的考勤出勤管理系统(开题+程序+论文) 计算机毕业设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容选题背景关于考勤出勤管理系统的研究,现有研究主要以传统企业考勤系统、高校教务管理系统中的考勤模块为主,这些系统多采用PC端或专用的考勤设备,虽然......
  • Python控制结构
    在编程的世界里,控制结构就像是一位指挥家,能够引导程序按顺序演奏出美妙的乐章。今天,我们将深入探讨Python中的条件判断和循环结构,当然,不会忘记用幽默的方式来让这些概念更易懂!条件判断条件判断是控制程序流的重要组成部分。我们常用的 if-elif-else 结构就像是生活中的选......
  • 16.网工入门篇--------介绍下网络服务及应用
    一、网络服务的概念网络服务是指通过网络提供的软件功能或设施,它允许不同的设备和用户在网络环境中进行信息交换、资源共享和协作。这些服务基于各种网络协议,以实现高效、可靠的通信。二、常见网络服务类型(一)文件传输服务FTP(文件传输协议)原理:FTP是一种用于在网络上进......
  • 基于Python星载气溶胶数据处理与反演分析
    Python作为一种强大且易于学习的编程语言,已广泛应用于数据科学和大气科学领域,Python凭借其强大的数据处理能力,可以高效处理海量的气溶胶数据。例如,通过Pandas库,研究人员可以进行高效的数据清洗、整理和分析;NumPy库则提供了强大的数值计算功能,能够快速进行各种数学和统计运算;Car......