首页 > 系统相关 >python 多进程并发:生产者+多消费者模式

python 多进程并发:生产者+多消费者模式

时间:2024-03-21 16:22:22浏览次数:18  
标签:__ 队列 消费者 python self 生产者 并发 name

多任务场景中,为了节省大量子任务串行执行的耗时,通常采用并发方式充分利用 cpu 和 内存来节省整体任务运行时间。

对于多任务并发,常见的做法自然是抽象出功能函数,借助 multiprocess 类在主进程中并发出多个子进程,或者构建进程池,将任务构造好后丢入进程池中来实现并发。这种方式对于逻辑结构较为简单的场景来说实现方便,稳定性也有保证,但是对于逻辑更为复杂的场景来说,会造成代码耦合,不够优雅~

其实从设计模式的角度来看,很多多任务场景都可以抽象成 生产者-消费者 模型,各个具体任务的区别无非采用 多生产者、多消费者、单生产者+单消费者、单生产者+多消费者、多生产者+单消费者、多生产者+多消费者中的哪一种模式。

现在常用的基于消息队列的分布式任务调度/消费系统,其实从宏观上来看就可以把它理解成一个 单/多生产者+多消费者 模式。这种模式下,生产者构建具体任务,并将构建好的任务放入任务队列中,然后并发的多个消费者都从任务队列中取任务来执行,消息队列如果放在单台机器上,那就是普通的多任务并发,放在多台机器上,就具有了分布式的雏形。

1 单生产者+多消费者

由于工作场景需要,需要通过 python 实现一个 单生产者+多消费者 模式,其中通过 Queue 实现消息共享来串联起生产和消费。特此记录下,既方便自己查询,也能够以飨来者。由于 python 全局锁 GIL 的原因,导致在 python 中线程并不能完全的实现并行,只能实现并发,所以代码中通过多进程的方式来实现并行。除此之外,对生产者和消费者模块的抽象是通过继承 Process 类并重写 run() 函数的方式来实现的。

首先看极简的抽象代码:

# 导入相关依赖
import time
import random
import multiprocessing
from multiprocessing import JoinableQueue

生产者:

class Producer(multiprocessing.Process):
    """
    生产者:制作淀粉肠
    """
    def __init__(self, t_name, queue):
        multiprocessing.Process.__init__(self, name=t_name)
        self.queue = queue			# 任务队列
        self.t_name = t_name
    
    def run(self):
        # 生产任务(生产 20 根淀粉肠)
        for i in range(20):
            print(self.t_name, "生产了一根淀粉肠", i)
            self.queue.put(i)   			# 把淀粉肠放入队列
        print("所有淀粉肠生产完毕~")
        self.queue.join()
        print("队列被拿完了~")

消费者:

def func(name, var):
    """
    具体的消费任务逻辑
    """
    print(name, "吃了一根淀粉肠", var)
    time.sleep(random.randint(5, 10))     # 模拟耗时操作
    return 'success'
    

class Consumer(multiprocessing.Process):
    """
    消费者:消费淀粉肠
    """
    def __init__(self, t_name, queue):
        multiprocessing.Process.__init__(self, name=t_name)
        self.queue = queue		# 任务队列
        self.t_name = t_name

    def run (self):
        while True:
            var = self.queue.get()
            func(self.name, var)        # 模拟耗时消费操作
            self.queue.task_done()      # 发送一次信号,证明一个数据已经被取走

主函数:

if __name__ == '__main__':
    workers = 16        # 消费者数量

    # 任务队列(设定最大长度为 16,队列满时生产者会等待,直到队列有空间再往里写新的任务)
    q = JoinableQueue(maxsize=16)

    # 生产者
    producer = Producer("生产者", q)
    producer.start()

    # 多个消费者
    for i in range(workers):
        consumer = Consumer(f"消费者 {i}", q)
        consumer.daemon = True                  # 将消费者进程设置为主进程的守护进程,这样当主进程结束后其会自动终止
        consumer.start()

    # 调用 join,作用是等到生产者进程执行结束后再让主进程继续往下执行(这样就能保证所有任务都被构造)
    producer.join()     

注意,上面代码使用了 JoinableQueue 而不是一般的 Queue,主要是因为 JoinableQueue 具有 q.task_done() 函数,结合 JoinableQueue.join() 能帮我们实现监控队列中所有任务都消费完后的情况,避免主进程结束引起消费者守护进程结束而导致队列中有些任务没有被消费完。读者可以自己注释 Productor 中的 self.queue.join() 然后看看打印结果比较下生产者和消费者的输出数量。

2 消费者结束逻辑优化

上面代码实现通过在消费者中通过 while True 循环持续监听队列,只有当主进程结束之后消费者进程才会结束。如果是一次性任务或者是长时监听场景自然没什么问题。但如果是定时调度场景,并且任务种类是动态变化的,主进程不会结束但会反复调用生产者和消费者,此时部分消费者进程可能会无法及时关闭进而形成僵尸进程。

为此,需要设计一种方式来使得消费者能在任务拿完之后退出消费。

方案:在生产者构造完所有任务后,再次向队列中放入 N(等于消费者进程数)个特殊的标识(例如 None),然后消费者从队列中拿取时,一但拿到的是这个特殊标识,就退出循环。

修改后的消费者:

class Consumer(multiprocessing.Process):
    """
    消费者:消费淀粉肠
    """
    def __init__(self, t_name, queue):
        multiprocessing.Process.__init__(self, name=t_name)
        self.queue = queue		    # 任务队列
        self.t_name = t_name

    def run (self):
        while True:
            var = self.queue.get()      # 从队列中取出一根淀粉肠
            if var is None:
                break
            else:
                func(self.name, var)        # 模拟耗时消费操作
                self.queue.task_done()      # 发送一次信号,证明一个数据已经被取走

主函数:

if __name__ == '__main__':
    workers = 16        # 消费者数量

    # 任务队列(设定最大长度为 16,队列满时生产者会等待,直到队列有空间再往里写新的任务)
    q = JoinableQueue(maxsize=16)

    # 生产者
    producer  = Producer("生产者", q)
    producer.start()

    # 多个消费者
    consumer_list = []
    for i in range(workers):
        consumer = Consumer(f"消费者 {i}", q)
        # consumer.daemon = True                  # 将消费者进程设置为主进程的守护进程,这样当主进程结束后其会自动终止
        consumer.start()
        consumer_list.append(consumer)

    # 调用 join,作用是等到生产者进程执行结束后再让主进程继续往下执行(这样就能保证所有任务都被构造)
    producer.join()   

    # 写入特殊标识
    for i in range(workers):
        q.put(None)
    for consumer in consumer_list:
        consumer.join()

    # 验证各子进程状态
    print('-' * 90)
    print('productor:', producer.is_alive())
    for i, consumer in enumerate(consumer_list):
        print(f'consumer {i}:', consumer.is_alive())
    print('-' * 90)

其余部分代码和上面相同。


参考:

标签:__,队列,消费者,python,self,生产者,并发,name
From: https://www.cnblogs.com/zishu/p/18087651

相关文章

  • python 异常捕获、断言(assert 、finally) 与日志(loguru.logger)
    异常捕获常见的异常类型代码执行顺序从上到下依次运行的,如果出错了,后面的代码不会出错。--所以要对异常做处理。常见的异常的类型,不需要记;平时写代码的时候经常会报错,积累常见错误,排查问题。常见异常的报错的类型:NameError,IndexError,KeyError,ValueError,ZeroDivisionE......
  • python 之 垃圾回收机制(Garbage Collector,简称 GC)
    垃圾回收机制有三种,主要采用引用计数机制为主,标记-清除和分代回收机制为辅的策略。其中,标记-清除机制用来解决计数引用带来的循环引用而无法释放内存的问题,分代回收机制是为提升垃圾回收的效率。1.引用计数:Python中的每个对象都有一个引用计数,每当对象被引用时,其引用......
  • python 函数(解包、互相调用、作用域、函数的封装、内置函数:eval()、zip()、open())
    函数解包"""1、函数的注释:参数和返回值在注释里可以自动添加显示,只需手动加说明。2、函数的解包【拆包】:函数的参数要传递数据有多个值的时候,中间步骤拿到数据保存在元组或者列表或者字典里。-传递参数的时候加一个*或者**解包-一次拿到元组列表字典的......
  • 身份证ocr,python身份证识别ocr接口代码,实名认证接口
    基于文字识别技术产物的身份证识别接口现已成熟,通过手机、电脑或者摄像头终端设备拍照或者上传身份证图片即可实现身份证照片上文字的识别,从而提取到身份证信息。翔云除了提供身份证识别接口外,还完善了实名认证接口方案,搭配翔云身份证实名认证接口可谓是效率翻倍。身份证......
  • 基于Python3的数据结构与算法 - 17 哈希表
    一、哈希表哈希表是一个通过哈希函数来计算数据存储位置的数据结构,通常支持如下操作:insert(key,value):插入键值对(key,value)。get(key):如果存在键值对为key的键值对则返回其value,否则返回空值。delete(key):删除键为key的键值对。1.直接寻址法当关键字的全域U比较小......
  • 通俗易懂解释python和anaconda和pytorch以及pycharm之间的关系
    Python:Python就像是一门编程语言的工具箱,你可以把它看作是一种通用的编程语言,就像是一把多功能的工具刀。你可以使用Python来编写各种类型的程序,就像使用工具刀来制作各种不同的手工艺品一样。Anaconda:Anaconda就像是一个装有不同种类工具的大工具箱。这个工具箱里包括了Py......
  • 学会Python有哪些可以做的兼职?所有途径全在这里了...
    可以干的兼职有好多,主要围绕Python的应用方向来。自媒体现在很多搞技术的都开始进入自媒体领域,比如微信公众号、知乎、B站、抖音、小红书等。这些平台上只要你有流量,你就可以通过广告、播放量、带货等方式赚钱。当然了,自媒体需要积累,如果能够忍受前期0收入0阅读阶段,不断......
  • python statlic lib embedding
    pythonstaticlib因为默认没有编译内置库,因此需要配置setup.local文件,把内置库编译到staticlib。参考:https://wiki.python.org/moin/BuildStatically。(./configure--disable-shared即可)注意是Setup.local,不是Setup.dist*static*#GNUreadline.UnlikepreviousPythoni......
  • Python统计初步
    文章目录基本统计特征区间统计PandaspandasGUIPython科学计算:数组......
  • 2024. 1华为od机试C卷【传递悄悄话】Python
    题目给定一个二叉树,每个节点上站着一个人,节点数字表示父节点到该节点传递悄悄话需要花费的时间。初始时,根节点所在位置的人有一个悄悄话想要传递给其他人,求二叉树所有节点上的人都接收到悄悄话花费的时间。输入描述0920-1-1157-1-1-1-132注:-1表示空节点输出......