首页 > 其他分享 >flask整合rabbitMQ插件的方式

flask整合rabbitMQ插件的方式

时间:2023-10-25 18:00:57浏览次数:35  
标签:插件 flask self rabbitMQ queue connection def app channel



文章目录

  • 二、Python-flask-rabbitMQ-插件方式整合
  • 引言
  • 具体步骤
  • 1 安装依赖:
  • 2 编写实体类:
  • 3 编写消费者和生产者:
  • 4 初始化消费者和生产者:
  • 5 其他地方使用生产者


二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lock

import pika


# 定义交换机类型的枚举值
class ExchangeType:
    DEFAULT = 'default'
    DIRECT = "direct"
    FANOUT = "fanout"
    TOPIC = 'topic'


class RabbitMQ:
    def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):
        self.credentials = pika.PlainCredentials(username, password)
        self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)
        self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道
        self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步

        for _ in range(pool_size):
            connection = self._create_connection()
            channel = connection.channel()
            self.connection_pool.put((connection, channel))

    def _create_connection(self):
        return pika.BlockingConnection(self.parameters)

    def get_channel(self):
        with self.lock:
            connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道
        return connection, channel

    def release_channel(self, connection, channel):
        with self.lock:
            self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池

    def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):
        connection, channel = self.get_channel()
        try:
            channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型
            channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息
        finally:
            self.release_channel(connection, channel)

    def receive_messages(self, queue, callback):
        connection, channel = self.get_channel()
        try:
            channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化
            # channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留
            channel.basic_qos(prefetch_count=10)  # 每次从 RabbitMQ 获取 10 条消息
            channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数
            channel.start_consuming()  # 开始消费消息
        finally:
            self.release_channel(connection, channel)

3 编写消费者和生产者:

def connect_mq(app):
    # 初始化 RabbitMQ 实例
    rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')


    # 在应用上下文中注册 RabbitMQ 实例
    app.config['RABBITMQ'] = rabbitmq
    # consume_mq(app)
    #
    thread = threading.Thread(target=consume_mq, args=(app,))

    # 启动线程
    thread.start()

def consume_mq(app):
    # 启动消费者程序,开始接收和处理消息
    def callback(ch, method, properties, body):
        try:
            print(f"消息队列内容 {body.decode()}")
            # 处理rabbitMQ内容
            to_transcribe(body.decode())

        except Exception as e:
            print(str(e))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 启动消费者程序,开始接收和处理消息
    with app.app_context():
        rabbitmq = current_app.config['RABBITMQ']
        rabbitmq.receive_messages('audio_queue', callback)

4 初始化消费者和生产者:

def create_app():
    app = Flask(__name__)
    connect_mq_v1(app)

5 其他地方使用生产者

class MessageHandler:
    """处理存放音频,将所有的任务都放在MQ里面"""

    def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):

        # 文件夹名称
        self.dir_name = dir_name
        # 文件名称
        self.file_name = file_name
        # 文件上传类型
        self.request_type = request_type
        # 文件存储位置
        self.file_url = file_url
        # 客户端回调地址
        self.back_url = back_url
        # 唯一标识
        self.uuid_str = uuid_str

    def send(self):
        """
        :param content_type:队列类型
        :param rpc:MQ对象
        :return:
        """
        try:
            # 发送消息队列
            # rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')
            rabbitmq = current_app.config['RABBITMQ']
            rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))
            print("发送消息到mq成功,用于存放音频信息")
        except Exception as e:
            print(f"发送消息到mq服务失败,请检查, {e}")

    def to_json(self):
        _dict = self.__dict__
        return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()


标签:插件,flask,self,rabbitMQ,queue,connection,def,app,channel
From: https://blog.51cto.com/u_15854304/8023643

相关文章

  • Vue日历插件
    <template><divclass="page"><divclass="calendar"><divstyle="display:flex;justify-content:space-between;align-items:center;border:1pxsol......
  • windows安装rabbitMq
    这里安装的版本为erlang: V12.3rabbitMq:3.10.18注意:需要找对应的版本 下载与安装erlang原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装RabbitMQ的前提是安装Erlang。下载地址:http://www.erlang.org/downloads  这里的otp显示26.1.2   双击启动,点n......
  • RabbitMQ简介和安装
    一、RabbitMQ是什么?RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。Erlang是一门动态类型的函数式编程语言,它......
  • RabbitMQ通讯方式
    RabbitMQ提供了七种通讯方式,可以去官方查看:https://rabbitmq.com/getstarted.html一、RabbitMQ提供的通讯方式其中通讯方式如下:HelloWorld!:为了入门操作提供的方式Workqueues:一个队列被多个消费者消费Publish/Subscribe:手动创建Exchange(FANOUT)Routing:手动创建Exchange(DIRECT)Topi......
  • flask 学习之解决flask migrate时报Can't locate revision identified
    错误原因:数据库已经和flask建立的连接,但是项目中migrations/versions下没有迁移版本文件,但是数据库中alembic_version中有版本号,当数据迁移时没有找到对应的迁移脚本文件,解决方案:手动创建迁移脚本:flaskdbrevision-m  然后将数据库中的alembic_version中的version_num改成......
  • Flask后端开发(一)-基础知识和前期准备
    目录1.背景介绍1.1.项目背景1.2.项目难点1.3.项目环境2.flask后端开发实现的功能3.flask部署和前后端对接3.1.flask运行配置和服务器部署3.2.flask前后端传参4.后端测试工具4.1.工具介绍4.2.工具使用后记1.背景介绍1.1.项目背景就是前几个月临时接手了一个后端项......
  • cpp: vs 2022 文件头注释插件
    下载插件安装DoxygenCommentshttps://marketplace.visualstudio.com/items?itemName=FinnGegenmantel.doxygenComments 在工具--选项 进行设置 ......
  • RabbitMq---延时队列
    应对场景订单创建后--->开始锁库存而如果之后的服务出现异常,在订单创建的簇点会自动设置逻辑来处理但是之后解锁库存则需要延时队列来解决采用定时任务检查的话有以下问题:时效性(存在较大的时间误差):即使订单支付倒计时30min但是可能你正好在定时任务检查之后的1min时创......
  • rabbitmq 安装步骤
    背景:RabbitMQ由Erlang语言开发的,所以安装RabbitMQ之前,要先安装Erlang首先需要确认需要安装的RabbitMQ版本,例如3.7.8确认与3.7.8对应的Erlang版本,访问https://www.rabbitmq.com/which-erlang.html可确认RabbitMQ3.7.8对应的ERlang的最低版本与最高版本分别为:19.3和21.x,由此......
  • centos安装rabbitmq
    centos安装rabbitmq 官网地址:https://www.rabbitmq.com/download.html安装rabbitmq需要依赖erlang语言,erlang安装包:https://github.com/rabbitmq/erlang-rpm/releasesrabbitmq安装包:https://github.com/rabbitmq/rabbitmq-server/releases 1、上传到/usr/local/so......