首页 > 编程语言 >RabbitMq 入门应用 提升性能 : 算法多阶段并行 (Python)

RabbitMq 入门应用 提升性能 : 算法多阶段并行 (Python)

时间:2024-09-26 15:46:03浏览次数:8  
标签:task pika Python RabbitMq queue 队列 rabbitmq channel 入门

大问题: 我们有一个算法,它可以被分为多个阶段进行(顺序不可颠倒),每个阶段的性能和资源要求不同(且不均衡程度比较高);

假设我们现在可以堆资源(较多的CPU和内存),如何将算法各个步骤拆分并进行性能均衡和实现,使得算法性能最大化以满足生产要求?

多进程:

由于算法有严格的顺序要求,如果是针对视频、自然语言等前后关联较强的数据,一次算法运行只能顺序处理一个问题;

比如一段视频只能读入并处理,多进程并不解决单个进程处理缓慢的问题;

同时Python自带的多进程数据共享堪称地狱(Manager)比较难以使用;

多线程:

多线程的数据共享度高(内存上并不分离),如果并行很容易出现争抢关键数据的情况,各种锁导致性能下降严重;

因此我们的选择的方法是异步+中间件(RabbitMq)

单纯地异步可以吗?但是Python/Java等语言天生对异步的支持就很差,想提升性能?想都别想!

我们只能像上面这样把算法拆开,在关键的节点处堆性能,减少算法从输入到输出的时延,同时避免单个进程导致的资源请求冲突问题。

那么怎么很方便地使用queue传递数据呢?中间件或者TCP只能传递bytes类型的数据?

对于Python而言有个非常好的方案:Pickle import pickle, pickle.dumps(),pickle.loads() 就足以解决大部分问题(而且足够快!),我们就能得到某些数据的bytes信息,然后直接把它丢进队列就好啦!

注意,对于某些指针对象,如高维 numpy.ndarray 的情形,如果它被封装在字典(dict) 或者 链表 (list) 里面,pickle 很可能只会序列化指针的地址(彻彻底底地浅拷贝),导致数据丢失,需要特别注意!

有关Rabbitmq我们需要掌握的技能

由于本篇Blog侧重工程开发和快速入门,我们只需要知道RabbitMq能够帮我们准确无误的传递消息(bytes);

RabbitMq在Linux上的安装 https://www.rabbitmq.com/docs/install-debian 里面这一块(无脑执行就可以了)

之后的启动,命令行输入: rabbitmq-server /etc/rabbitmq/rabbitmq-env.conf

/etc/rabbitmq/rabbitmq-env.conf 是默认的配置文件的位置,简单的操作我们不需要修改配置文件;

一些常用的操作:命令行输入

查看现在的队列的情况 (能看到现在队列里面消息的情况)

rabbitmqctl list_queues

删除队列

rabbitmqctl delete_queue $Your_queue_name

清空队列消息

rabbitmqctl purge_queue $Your_queue_name

详细内容请参考:

https://www.cnblogs.com/thomas-fan/p/15888556.html (比较齐全)

之后建议去看一看 HelloWorld和第一个工作模式(Worker),不过大部分的参数都不需要改动就能满足简单的需求

Python 使用

首先安装pika pip inistall pika 就可以

其实我们只要学会Hello World里面的内容就能做很多事情了!

newTask.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent
    ))
print(f" [x] Sent {message}")
connection.close()

worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue',on_message_callback=callback)

channel.start_consuming()

这两个程序运行起来(其中 worker.py 多跑几个,然后不停地运行 newTask.py 发送带有 '.' 的消息 如 '123...' 你就能观察到队列的运行啦(多个进程分别处理消息)!)

程序中重点语句:

channel.basic_consume(queue='task_queue',on_message_callback=callback) 监听队列,当有消息产生的时候调用 callback函数! ‘task_queue’是监听的队列

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent
    ))

发消息,往 'task_queue' 里面发

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

启动和注册队列;

更加进阶的知识:

https://www.rabbitmq.com/tutorials/tutorial-two-python

https://www.cnblogs.com/guyuyun/p/14970592.html

总结:

例子中的消息是持久化的,消息和队列若不被处理和删除就会一直存在于队列当中

对于大部分的上手的调试工作,清除队列消息再重启或者在保留消息的情形下进入消息处理程序进行Debug都是非常方便且常用的方法!

标签:task,pika,Python,RabbitMq,queue,队列,rabbitmq,channel,入门
From: https://www.cnblogs.com/zoutj-projects/p/18433570

相关文章

  • [Python手撕]判断二叉搜索树
    #Definitionforabinarytreenode.#classTreeNode:#def__init__(self,val=0,left=None,right=None):#self.val=val#self.left=left#self.right=rightclassSolution:defisValidBST(self,root:Optional[TreeNod......
  • 学Python要求学历吗?有什么好处?
    Python是一门非常不错的编程语言,学习Python你可以提高就业竞争力和工作机会,而且Python是一门全能语言,无论是否从事编程相关工作,学习它都可以带来许多的帮助,可谓是无所不能。那么想学Python有学历限制吗?以下是具体内容介绍。想学Python有学历限制吗?没有硬性学历要求,学......
  • python接口串口数据
    importtimeimportserialdeftest_receive():#配置串口参数port='/dev/ttyUSB0'#根据你的设备更改端口号baud_rate=9600#波特率bytesize=serial.EIGHTBITS#数据位parity=serial.PARITY_NONE#校验位stop_bits=serial.......
  • 基于python数据挖掘技术的线上招聘信息数据可视化分析系统 q3122-- Scrapy爬虫
    目录项目介绍实现功能截图技术栈Scrapy爬虫框架关键技术和使用的工具环境等的说明解决的思路开发流程爬虫核心代码展示系统设计论文书写大纲详细视频演示源码获取项目介绍基于数据挖掘技术的线上招聘信息分析系统旨在通过应用先进的数据分析方法,为求职者和招聘者提......
  • MyBatis-Plus的使用基础入门案例
    目录文章目录目录简介特性框架结构第一个案例准备工作初始化工程添加依赖完整的pom配置编写实体类编写Mapper修改启动类--扫描Mapper测试运行简介MyBatis-Plus(简称MP)是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,为简化开发、提高效率而生......
  • (免费源码)计算机毕业设计必看必学 原创定制程序 java、PHP、python、小程序、文案全套
    PHP校园点餐小程序摘 要随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,校园点餐小程序被用户普遍使用,为方便用户能够可以随时进行校园点餐小程序的数据信息管理,特开发了基......
  • (免费源码)计算机毕业设计必看必学 原创定制程序 java、PHP、python、小程序、文案全套
     springboot高校实验室管理系统摘要随着社会的发展,社会的方方面面都在利用信息化时代的优势。互联网的优势和普及使得各种系统的开发成为必需。本文以实际运用为开发背景,运用软件工程原理和开发方法,它主要是采springboot技术和mysql数据库来完成对系统的设计。整个开发过......
  • (免费源码)计算机毕业设计必看必学 原创定制程序 java、PHP、python、小程序、文案全套
     SpringBoot自然灾害应急救灾物资共享管理系统摘 要随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势。物资共享当然也不例外。自然灾害应急救灾物资共享管理系统是以实际运用为开发背景,运用软件工程原理和开发......
  • (免费源码)计算机毕业设计必看必学 原创定制程序 java、PHP、python、小程序、文案全套
    高校学生社团管理系统摘要随着计算机科学技术的日渐成熟,人们已经深刻地认识到了计算机在各个领域中发挥的功能的强大,计算机已经进入到了人类社会发展的各个领域,并且发挥着十分重要的作用。目前学校学生社团的管理是一项系统而复杂的工作,它需要一个团队互相配合、分工协作。......
  • (免费源码)计算机毕业设计必看必学 Ssm作业管理系统的设计与实现02334 原创定制程序 jav
    Ssm作业管理系统的设计与实现摘 要科技进步的飞速发展引起人们日常生活的巨大变化,电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流,人类发展的历史正进入一个新时代。在现实运用中,应用软件的工作规则和......