首页 > 其他分享 >RBMQ案例二:工作队列模式

RBMQ案例二:工作队列模式

时间:2023-02-05 15:55:26浏览次数:30  
标签:pika RBMQ 队列 worker queue 案例 消息 channel

 

 

工作队列模式

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们安排任务稍后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工人时,任务将在他们之间共享。

循环调度

使用任务队列的优势之一是能够轻松并行化工作。如果我们正在积压工作,我们只需添加更多的工作人员,这样就可以轻松扩展。 首先,让我们尝试同时运行两个worker.py脚本。他们都将从队列中获取消息,但究竟如何呢?让我们来看看。 您需要打开三个控制台。两个将运行worker2.py 脚本。这些控制台将是我们的两个消费者 - C1 和 C2。  

公平派遣

您可能已经注意到,调度仍然不能完全按照我们的意愿进行。比如有两个worker的情况,当奇数消息都重,偶数消息都轻时,一个worker会一直很忙,另一个worker几乎不做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地分发消息。 发生这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不会查看消费者未确认消息的数量。它只是盲目地将每条第 n 条消息发送给第 n 个消费者。 为了打败它,我们可以使用带有prefetch_count=1设置的Channel#basic_qos通道方法 。这使用basic.qos协议方法来告诉 RabbitMQ 一次不要给一个 worker 一个以上的消息。或者,换句话说,在 worker 处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个还不忙的工人。  

一、代码--消息生产者 new_task.py

#!/usr/bin/env python
import time

import pika
import json
import datetime

# 产生消息的入口,生产queue消息


def producer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                                  credentials=pika.PlainCredentials('guest', 'guest')))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    ## 循环生成100条消息
    for i in range(200):
        message = json.dumps({'id': "80000%s" % i, "amount": 100 * i, "name": "melon", "createtime": str(datetime.datetime.now())})
        channel.basic_publish(exchange='', routing_key='task_queue', body=message,
                              properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE))
        time.sleep(1)
        print(" [x] Sent %r" % message)
    connection.close()

producer()

 

二、代码 消息的消费者: worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                          credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print("[x] Received %r" % body.decode())
    time.sleep(1)
    time.sleep(body.count(b'.'))
    print("[x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

 

三、消息的消费

 

 

 

 

 

 

 

标签:pika,RBMQ,队列,worker,queue,案例,消息,channel
From: https://www.cnblogs.com/1314520xh/p/17093482.html

相关文章

  • Spring2 - 入门案例
    Spring基本操作导入依赖在pom.xml中添加依赖添加依赖:<dependencies><!--springcontext依赖--><!--当你引入SpringContext依赖之后,表示将Spring的基础依......
  • 找了几个 Solon 的商业落地项目案例!
    Solon是啥?是一个高效的Java应用开发框架:更快、更小、更简单。(代码仓库:https://gitee.com/noear/solon)提倡:克制、简洁、开放、生态启动快5~10倍;qps高2~3倍;运......
  • 4.5栈、队列以及环形缓冲区
    栈和队列,都可以不通过指定地址和索引来对数组的元素进行读写。需要临时保存计算过程中的数据、连接在计算机上的设备或者输入输出的数据时,都可以通过这些方法来使用内存。......
  • System V消息队列
    概述:消息队列是进程通信的一种方式。头文件:#include<sys/types.h>#include<sys/msg.h>一、创建或打开一个消息队列intmsgget(key_tkey,intmsgflg);参数key......
  • Pandas 人口密度案例分析
    fromturtleimportleftimportpandasaspd"""需求:1.导入文件,查看原始数据2.将人口数据和各州简称数据进行合并3.将合并的数据中重复的abbreviation列进行删除......
  • 【八大数据排序法】快速排序法的图形理解和案例实现 | C++
    第十八章快速排序法:::hljs-center目录第十八章快速排序法●前言●认识排序●一、快速排序法是什么?1.简要介绍2.具体情况3.算法分析●二、案例实现1.......
  • wireshark 抓包整理———— 从一个小案例开始 [一]
    前言前面已经有抓包系列了,简单写一下wireshark的抓包系列,共36节,18个理论小栗子,36个实战栗子。正文这个例子是<<wireshark分析就这么简单>>的一个例子。这个例子是这样......
  • python基础:文件内光标移动案例(了解)、计算机硬盘修改数据的原理(了解,为了文件内容修改作
    目录一、文件内光标移动案例(了解)二、计算机硬盘修改数据的原理(了解,为了文件内容修改作解释)三、文件内容修改四、函数1、概念讲解2、语法结构3、函数的定义与调用4、函数的......
  • C语言学习 指针: 案例: 交换两个变量的值
    第一版:1#include<stdio.h>2#include<stdlib.h>3#include<string.h>4#include<io_utils.h>56voidSwap(inta,intb){7inttemp=a;8a=b;......
  • 数组-链表-栈-队列(下)
    LeetCode66.加一(模板题)给定一个由整数组成的非空数组所表示的非负整数,在该数的基础上加一。最高位数字存放在数组的首位,数组中每个元素只存储单个数字。你可以......