首页 > 系统相关 >python多线程与多进程开发实践及填坑记(1)

python多线程与多进程开发实践及填坑记(1)

时间:2024-07-05 18:00:06浏览次数:24  
标签:坑记 python app rabbitmq queue 进程 多线程 data

1. 需求分析

1.1. 概述

基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:

  1. 并行计算任务:使用multiprocessing模块实现并行计算任务,提高计算效率、计算能力。
  2. 消息侦听任务:使用threading模块完成RabbitMQ消息队列的侦听任务,将接收到的数据放入multiprocessing.Queue中,以便并行计算任务处理。
  3. Web服务:使用Flask框架实现Web API服务,提供启停消息侦听任务、启停并行计算任务以及动态调整参数的功能。
  4. 任务交互:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  5. 非阻塞运行:使用threading模块非阻塞地运行Flask Web服务。

1.2. 多线程与多进程

在Python环境中,多线程和多进程的区别主要体现在并发模型、资源利用、数据共享、以及适用场景等方面。

1.2.1. 并发模型

多线程 (Threading):

  • 并发性: 多线程是一种并发模型,多个线程共享同一进程的资源和内存空间,能够在单个进程内并发执行。
  • GIL 限制: 由于 Python 的全局解释器锁 (Global Interpreter Lock, GIL),同一时刻只有一个线程在执行 Python 字节码。这限制了多线程在 CPU 密集型任务中的并行性。
  • 适用任务: 适用于 I/O 密集型任务,如网络请求、文件操作等。

多进程 (Multiprocessing):

  • 并行性: 多进程是一种并行模型,每个进程都有独立的内存空间和资源,可以在多个 CPU 核心上并行执行。
  • 无 GIL 限制: 每个进程有自己的 Python 解释器和 GIL,能够真正实现并行计算,充分利用多核 CPU。
  • 适用任务: 适用于 CPU 密集型任务,如计算密集型数据处理。

1.2.2. 资源利用

多线程:

  • 内存利用: 线程共享同一进程的内存空间,内存开销较小。创建和销毁线程的成本较低。
  • CPU 利用: 由于 GIL 限制,多线程在 Python 中无法充分利用多核 CPU,尤其是在 CPU 密集型任务中。

多进程:

  • 内存利用: 每个进程有独立的内存空间,内存开销较大。创建和销毁进程的成本较高。
  • CPU 利用: 无 GIL 限制,可以充分利用多核 CPU,适合并行处理 CPU 密集型任务。

1.2.3. 数据共享和通信

多线程:

  • 数据共享: 线程共享同一进程的全局变量和内存空间,数据共享容易。
  • 同步机制: 由于共享内存,线程之间需要使用锁 (Lock)、条件变量 (Condition) 等同步机制来避免竞争条件和数据不一致。

多进程:

  • 数据共享: 进程间不共享内存,数据共享复杂。需要使用进程间通信 (IPC) 机制,如管道 (Pipe)、队列 (Queue)、共享内存 (Shared Memory) 等。
  • 同步机制: 使用 IPC 机制进行数据传递和同步。

1.2.4. 适用场景

多线程:

  • I/O 密集型任务: 例如网络爬虫、文件读写、数据库操作等。这些任务在等待 I/O 操作完成时可以切换到其他线程继续执行,提高效率。
  • 轻量级任务: 由于线程的创建和销毁成本较低,适合处理大量短时任务。

多进程:

  • CPU 密集型任务: 例如科学计算、图像处理、大数据分析等。这些任务需要大量计算资源,多进程可以充分利用多核 CPU 并行处理。
  • 隔离性要求高的任务: 进程间独立运行,互不影响,适合需要高隔离性的任务。

1.2.5. 常见问题与解决

多线程:

  • 死锁: 当多个线程互相等待对方释放锁时,会出现死锁情况。需要设计合理的锁机制或使用死锁检测工具。
  • GIL 限制: 对于 CPU 密集型任务,GIL 限制了多线程的并行性。可以通过多进程绕过 GIL 实现并行计算。

多进程:

  • 高内存消耗: 每个进程有独立的内存空间,内存使用较高。可以通过共享内存或减少进程数量优化内存使用。
  • 进程间通信复杂: 需要使用 IPC 机制进行进程间数据传递,设计和实现较为复杂。可以使用 Python 的 multiprocessing 模块提供的队列、管道等工具简化实现。

1.2.6. 总结

  • 多线程: 适合 I/O 密集型任务,内存使用高效,但受 GIL 限制。
  • 多进程: 适合 CPU 密集型任务,可以充分利用多核 CPU,但内存消耗较大,进程间通信复杂。

根据任务性质选择合适的并发模型,可以提高程序的效率和性能。

2. 我初步实现多线程多进程核心代码

2.1. 程序结构及代码说明

程序结构原理图:
在这里插入图片描述
函数说明:

序号名称说明备注
1compute_result计算函数,例如你的优化算法需要并行处理
2send_result_to_rabbitmq发送消息到RabbitMQ
3consume_from_rabbitmq_and_enqueue侦听RabbltMQ消息
4worker工作进程函数
5start_listening启动消息侦听函数Web API
6stop_listening停止消息侦听函数Web API
7start_worker启动工作函数Web API
8stop_worker停止工作函数Web API

2.2. 示例代码

from flask import Flask, jsonify
from threading import Thread, Event
import multiprocessing
import pika
import json
from loguru import logger

# 创建一个事件来控制侦听
stop_event = Event()

# 定义web服务
app = Flask(__name__)

# 假设这是你的计算函数  
def compute_result(data, pso_params):  
    # 进行计算逻辑,这里简化为返回数据本身
    return {"result": data}

# 发送结果到RabbitMQ的函数  
def send_result_to_rabbitmq(channel, exchange_name, queue_name, routing_key, result):  
    try:
        channel.queue_declare(queue=queue_name, durable=True)
        # 初始化交换机
        channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=True)
        channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
        channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=json.dumps(result, ensure_ascii=False),
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
            )
        )
        logger.info(f"发送结果消息:{result} 到RabbitMQ")
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error sending result to RabbitMQ: {e}")
        raise 

# 从RabbitMQ接收数据并放入队列的函数(生产者)  
def consume_from_rabbitmq_and_enqueue(rabbitmq_params, rabbitmq_queue, data_queue):
    try:
        rabbitmq_connection = pika.BlockingConnection(rabbitmq_params)
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error connecting to RabbitMQ in main process: {e}")
        exit(1)
        
    try:
        channel = rabbitmq_connection.channel()
        channel.queue_declare(queue=rabbitmq_queue, durable=True)
        
        def callback(ch, method, properties, body):
            try:
                data_queue.put(body.decode('utf-8'))
                logger.info(f"接收到消息:{body.decode('utf-8')}")
            except Exception as e:
                logger.error(f"Error putting message into data_queue: {e}")

        channel.basic_consume(queue=rabbitmq_queue, on_message_callback=callback, auto_ack=True)
        
        try:
            while not stop_event.is_set():
                channel.connection.process_data_events(time_limit=1)  # 非阻塞地处理事件  
                # 注意:这里使用了process_data_events()而不是start_consuming(),因为我们需要非阻塞地运行  
        except KeyboardInterrupt:
            channel.stop_consuming()
        finally:
            rabbitmq_connection.close()
    except pika.exceptions.AMQPError as e:
        logger.error(f"Error consuming from RabbitMQ: {e}")
        raise

# 工作进程函数  
def worker(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params): 
    logger.info('Worker started') 
    while True:  
        try:  
            data = data_queue.get() 
            print(f'data_queue.get() is {data}') 
            if data is None:  
                break  
              
            result = compute_result(data, pso_params)  
            try:
                workerconnection = pika.BlockingConnection(rabbitmq_params)
                channel = workerconnection.channel()
            except pika.exceptions.AMQPError as e:
                logger.error(f"Error connecting to RabbitMQ in worker: {e}")
                return                          
            send_result_to_rabbitmq(channel, target_exchange, target_queue, routing_key, result)  
        except Exception as e:  
            print(f"An error occurred: {e}")  
    logger.info('Worker finished')

@app.route('/startlistening', methods=['GET'])
def start_listening():
    if stop_event.is_set():
        stop_event.clear()
    if not hasattr(app, 'pika_thread') or not app.pika_thread.is_alive():
        app.pika_thread = Thread(target=consume_from_rabbitmq_and_enqueue, args=(rabbitmq_params, rabbitmq_queue, data_queue))
        app.pika_thread.start()
        return jsonify({'status': 'listening'}), 200
    else:
        return jsonify({'status': 'already listening'}), 200

@app.route('/stoplistening', methods=['GET'])
def stop_listening():
    if hasattr(app, 'pika_thread') and app.pika_thread.is_alive():
        stop_event.set()
        app.pika_thread.join()
        del app.pika_thread
        return jsonify({'status': 'stopped'}), 200  
    else:
        return jsonify({'status': 'not running'}), 400

@app.route('/startworking', methods=['GET'])
def start_worker():
    if len(processes) == 0: 
        for _ in range(3):  
            p = multiprocessing.Process(target=worker, args=(data_queue, rabbitmq_params, target_exchange, target_queue, routing_key, pso_params))  
            p.start()  
            processes.append(p)   
            print(f'process id = {p.pid}')
        return jsonify({'status': 'started working'}), 200
    else:
        return jsonify({'status': 'already working'}), 202

@app.route('/stopworking', methods=['GET'])
def stop_worker():
    for p in processes:
        data_queue.put(None)
    for p in processes:
        p.join()
    processes.clear() 
    return jsonify({'status': 'stopped working'}), 200

if __name__ == "__main__":
    rabbitmq_queue = 'energyStorageStrategy.queue'
    target_queue = 'energyStorageStrategy.queue.typc-fpd-tysh'
    target_exchange = 'energyStorageStrategy.direct'
    routing_key = 'typc-fpd-tysh'
    pso_params = {}  # 假设你的PSO参数

    credentials = pika.PlainCredentials('rabbit', '****')  # mq用户名和密码
    rabbitmq_params = pika.ConnectionParameters('192.168.*.*', port=5671, virtual_host='/typc-fpd-dev', credentials=credentials)

    # 创建一个multiprocessing.Queue用于进程间通信  
    data_queue = multiprocessing.Queue()  

    # 创建工作进程列表
    processes = [] 

    print(' [*] Waiting for messages. To exit press CTRL+C')

    flask_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=5002, debug=True))
    flask_thread.start()
    
    start_worker()
    start_listening()

3. 代码中问题及其他

3.1. 重新启动侦听线程失败

程序没有报错,但是,没有启动侦听服务线程。
详见后续文章,主题是消息侦听与线程。

3.2. 重新启动工作进行报错

AttributeError: Can’t get attribute ‘worker’ on <module ‘main’ (built-in)>
详见后续文章。

3.3. Flask 应用上下文之外操作问题

问题描述:

site-packages\werkzeug\local.py", line 508, in _get_current_object
    raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of application context.

问题解析:
这个错误 RuntimeError: Working outside of application context. 通常发生在 Flask 应用中,当你尝试在 Flask 应用上下文之外执行某些操作时。在 Flask 中,应用上下文是一个用于存储当前应用相关信息的对象,比如配置和 URL 映射。当你调用 jsonify 或其他依赖于应用上下文的函数时,必须确保你处于应用上下文中。

在你的代码中,这个错误很可能是因为 start_listening 函数被设计为在 Flask 路由之外的地方被调用,或者是在 Flask 请求处理流程之外被调用。

解决办法:

    with app.app_context():
        start_worker()
        start_listening()

使用 app.app_context() 来手动创建一个应用上下文。

3.4. Linux环境防火墙

在Linux环境中,别忘了打开防火墙的端口:

root@DeepLearning pvet]# sudo iptables -I INPUT -p tcp --dport 5003 -j ACCEPT

4. 总结

  1. 模块化:将不同的功能模块化,便于维护和扩展。
  2. 多进程与多线程结合:使用multiprocessing实现并行计算任务,使用threading实现RabbitMQ消息侦听和Flask Web服务的非阻塞运行。
  3. 进程间通信:通过multiprocessing.Queue实现消息侦听任务与并行计算任务之间的资源交互。
  4. 事件控制:通过threading.Event控制消息侦听任务的启停。

这种架构设计能够满足需求,并且具有较好的扩展性和可维护性。如果有更多具体的需求或优化,可以在此基础上进一步完善。

标签:坑记,python,app,rabbitmq,queue,进程,多线程,data
From: https://blog.csdn.net/xiaoyw/article/details/140129785

相关文章

  • Python异步编程技术详解:async、await、yield和anext
    Python异步编程技术详解:async、await、yield和anext1.async和await2.yield3.anext4.StopAsyncIteration5.综合示例:异步聊天机器人总结异步编程是Python中一种强大的并发编程模式,可以显著提高I/O密集型应用的性能。本文将详细介绍Python中的几种重要的异步编......
  • Python金融时间序列模型ARIMA 和GARCH 在股票市场预测应用|附代码数据
    原文链接:http://tecdat.cn/?p=24407最近我们被客户要求撰写关于金融时间序列模型的研究报告,包括一些图形和统计输出。这篇文章讨论了自回归综合移动平均模型(ARIMA)和自回归条件异方差模型(GARCH)及其在股票市场预测中的应用 ( 点击文末“阅读原文”获取完整代码数据******......
  • 还在为如何学Python而犯难吗?在这里分享下最全的Python学习路线!
    先是Python基础知识的看完基础的,那就来进阶的,前面的是入门,现在的也就是作为一个程序员的基础最后在来个爬虫的觉得有用的朋友,记得点赞哟,如果觉得还看得不过瘾,可以私信我,我这里有Python爬虫系统教学的免费直播观看名额或者还想看其他知识点的,可以在下面留言。......
  • Python学习篇:流程控制详细介绍(四)
    目录1前言2条件判断2.1基本语法2.2使用示例2.3注意事项3循环3.1for循环3.2while循环3.3循环控制语句 4异常处理 4.1基本语法4.2使用示例 4.3注意事项 1前言Python中的流程控制是编程中非常重要的一部分,它允许你根据条件、循环或其他因素来......
  • 超详细Python教程——函数和模块的使用
    函数和模块的使用在讲解本章节的内容之前,我们先来研究一道数学题,请说出下面的方程有多少组正整数解。事实上,上面的问题等同于将8个苹果分成四组每组至少一个苹果有多少种方案。想到这一点问题的答案就呼之欲出了。可以用Python的程序来计算出这个值,代码如下所示。"""......
  • 一个用来画拉氏图的简单Python脚本
    技术背景关于拉氏图的更多介绍,可以参考下这篇博客,这里简单引述一部分内容:Ramachandranplot(拉氏图)是由G.N.Ramachandran等人于1963年开发的,用来描述蛋白质结构中氨基酸残基二面角\(\psi\)和\(\phi\)是否在合理区域的一种可视化方法。同时也可以反映出该蛋白质的构象是否合理......
  • Python多线程-线程池ThreadPoolExecutor
    1.线程池不是线程数量越多,程序的执行效率就越快。线程也是一个对象,是需要占用资源的,线程数量过多的话肯定会消耗过多的资源,同时线程间的上下文切换也是一笔不小的开销,所以有时候开辟过多的线程不但不会提高程序的执行效率,反而会适得其反使程序变慢,得不偿失。为了防止无尽的线程......
  • Python的垃圾回收机制
    Python的垃圾回收机制引入计数器为主,标记清除和分代回收为辅1.引入计数器环状双向链表refchain在python程序中创建的任何对象都会放在refchain链表中当python程序运行时,会根据数据类型的不同找到其对应的结构体,根据结构体中的字段来进行创建相关的数据,然后将对象添加到rec......
  • python中logging
    Python的logging模块是一个用于记录应用程序日志消息的标准模块。它非常强大且灵活,允许你记录各种级别的日志消息,并配置输出格式、日志的存储位置以及处理日志的不同方式。以下是logging模块的基本介绍和用法。defcreate_logger(log_file):log_format='%(asctime)s......
  • 为什么现在的AI编程师都是用Python来编程?
    前言: 在当今AI大火的时节,涌入了一大批AI编程师,和AI训练师!显而易见他们都是用的Python语言来编程的。当然AI也给我们的工作带来了很多便利,比如AI绘画,写文章,视频剪辑,脚本创做等等方面现在都可以来用AI来协助我高效完成工作。那么我们来看看现在的AI编程师为什么都用Python语言......