首页 > 编程问答 >如何将优先级任务添加到celery队列而不禁用获取

如何将优先级任务添加到celery队列而不禁用获取

时间:2024-08-01 07:48:35浏览次数:10  
标签:python python-3.x rabbitmq celery fastapi

我有一个 celery 工作线程,并发度设置为 1 ,它从 RabbitMQ 获取任务。我想创建一个在单个并发设置中只有一个队列的系统,因此,所有任务都将添加到主队列中。 关于任务 - 它只是一个循环,我们用

更新状态。| ||并行地我有两个服务。 task.update_state() Celery-beat

@c_app.task(bind=True)
def task(self):
    n = 20
    for i in range(0, n):
        self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
        print('working')
        time.sleep(1)
    return n

服务,它创建 1000 个任务(数量作为示例)。

  • FastAP I 服务,它提供两个端点:
  • 创建任务具有 TOP 优先级并将其添加到主队列 获取有关活动任务和计划任务的实际信息(通过使用
    • 因此,可以询问 FastAPI: inspect() 当前活动任务进度 -|| |还有多少任务-

问题

  • :如何将具有更高优先级的任务添加到已经将任务安排给工作人员的队列中? inspect().active()
  • 这是我尝试过的: inspect().scheduled()

Celery 配置: 这是我的 Celery-Beat 任务,用于添加大量低优先级任务:

这是我的 FastAPI 端点,用于添加高优先级任务,正如我所料,应该在之后立即执行当前任务正在完成。

“current_progress”端点的“核心”返回当前进度和计划任务:

from celery.schedules import crontab
from kombu import Queue, Exchange

broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:root@localhost/celery_test_db'

worker_concurrency = 1
timezone = 'Europe/Moscow'
enable_utc = False
result_extended = True

beat_schedule = {
    'add-5-tasks-every-month': {
        'task': 'celery_app.tasks.add_5_tasks',
        'options': {'queue': 'celery_q'},
        'schedule': 20.0
    },
}

broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
    Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
)

问题

@c_app.task
def add_5_tasks():
    for _ in range(800):
        task.apply_async(countdown=1, queue='celery_q', priority=1)

:优先级没有按我的预期工作。|| |只有当我在配置中添加这些设置时它才有效:

@f_app.post("/add-task/")
def add_task():
    task_ = task.apply_async(priority=9, countdown=1, queue='celery_q')
    print('Task added with high priority:', task_.id)
    return {'task_id': task_.id,
            'message': 'Task added with high priority'}

但是,它会导致

i = c_app.control.inspect()
scheduled = i.scheduled()
reserved = i.reserved()
active = i.active()

变得绝对无用,因为我们连续只获取一个任务,所以工作人员认为我们在计划中只有一个任务。因此,我们可以在 主要问题
中看到单个任务,而不是任务列表:如何启用优先级并从

worker_prefetch_multiplier = 1
task_acks_late = True

获取有关计划任务的所有信息? inspect().scheduled() to become absolutely useless, as we fetch only one task in a row, so worker thinks that we have only one task in a schedule. So, instead of list of tasks we can see a single task in inspect().scheduled()

MAIN QUESTION : How to enable prioritisation and get all info about scheduled tasks from inspect().scheduled() ?


你遇到的问题是 Celery 在使用 worker_prefetch_multiplier = 1 时,只会一次性获取一个任务。这意味着,即使你将高优先级的任务添加到队列中,工作进程在完成当前任务之前也不会获取它。这也会影响 inspect().scheduled() 的结果,因为它只能看到当前预取的任务。

为了解决这个问题,你可以尝试以下几种方法:

方法一:增加 worker_prefetch_multiplier 的值

你可以尝试增加 worker_prefetch_multiplier 的值,例如将其设置为 2 或 3。这样,工作进程会一次性预取多个任务,从而更快地获取高优先级的任务。

优点:

  • 实现简单
  • 能在一定程度上提高高优先级任务的处理速度

缺点:

  • 可能会增加工作进程的内存占用
  • 如果高优先级任务数量很多,仍然可能出现延迟

方法二:使用单独的队列处理高优先级任务

你可以创建一个专门用于处理高优先级任务的队列。将 worker_prefetch_multiplier 设置为 1,并将 FastAPI 端点配置为将高优先级任务发送到这个队列。

优点:

  • 能确保高优先级任务得到及时处理
  • 不会影响 inspect().scheduled() 的结果

缺点:

  • 需要维护两个队列
  • 需要修改代码以支持不同的队列

方法三:使用信号通知工作进程

你可以在 FastAPI 端点添加高优先级任务后,使用 Celery 的信号机制通知工作进程。工作进程收到信号后,可以立即停止当前任务并开始处理高优先级任务。

优点:

  • 能最快速地处理高优先级任务
  • 不需要修改 worker_prefetch_multiplier 的值

缺点:

  • 实现比较复杂
  • 可能会影响低优先级任务的处理

示例代码 (方法二):

# Celery 配置
broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
    Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
    Queue("high_priority_q", Exchange("celery_q"), routing_key="high_priority_q", queue_arguments={'x-max-priority': 9}),
)

# FastAPI 端点
@f_app.post("/add-task/")
def add_task():
    task_ = task.apply_async(priority=9, countdown=1, queue='high_priority_q')
    print('Task added with high priority:', task_.id)
    return {'task_id': task_.id,
            'message': 'Task added with high priority'}

# Celery worker 启动命令
celery -A your_app worker -Q celery_q,high_priority_q 

选择哪种方法取决于你的具体需求和应用程序的负载情况。建议你根据自己的情况进行测试,选择最适合你的方案。

标签:python,python-3.x,rabbitmq,celery,fastapi
From: 78815656

相关文章

  • ffmpeg python 导致死锁
    我在使用ffmpegpython处理相机帧时遇到问题。我使用process.communicate()的第一种方法效果很好,但存在延迟问题。process=(ffmpeg.input('pipe:',format='rawvideo',pix_fmt='rgb24',s='{}x{}'.format(width,height))......
  • 将 HTTP 分块编码数据流代码片段从 Node.js 转换为 Python
    我有一个Node.js客户端代码,它将请求发送到HTTP服务器,然后连续接收分块编码数据。这是带有一些流量数据输出的Node.js代码。consthttp=require('http');constoptions={hostname:'...',path:'...',port:...,...};constreq=http.request(......
  • vsc python 调试器和 pylance 无法识别已安装的包
    我最近使用snowflake-connector-python在我的虚拟环境中安装了pipinstallsnowflake-connector-python[pandas]==2.7.6,当我在激活虚拟环境的情况下从命令行运行我的脚本时,它工作正常。我设置了与VSC解释器相同的虚拟环境,但尝试运行python调试器会引发异常......
  • 如何从python读取matlab持续时间对象
    我创建一个matlab持续时间对象并将其保存到.mat文件:timeend=seconds(123);save('time.mat',timeend,'-v7.3');然后我从python读取它:withh5py.File('time.mat','r')asf:var=f['timeend'][:]print(list(var))......
  • 通过 python 连接到 Snowflake 时出错“UnpicklingError: invalid load key, '\x00'
    我在使用snowflake.connector.connect通过python连接到snowflake时遇到以下错误importsnowflake.connector#pipinstallsnowflake-connector-python#iamgettingtheenvfrom.envfileistoredlocallycnx=snowflake.connector.connect(user=os.getenv('USER'),pass......
  • Python Selenium 单击 webdriverwait 与 find_element
    我无法理解这两个代码块之间的区别。发送点击在webdriverwait和find_elements中都有效。代码1fromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.support.uiimportWebDriverWaitfromselenium.webdriver.suppo......
  • Python 问题 如何创建在 PDF 中注册为剪切线的专色?
    我正在开发一个项目,需要我在图像周围创建一条剪切线,但在任何RIP程序(例如Versaworks或Flexi)上将其注册为实际剪切线时遇到困难。我尝试了很多不同的方法python库可以帮助解决这个问题,但我无法让它工作。我希望它像我们在Illustrator中所做的那样,创建一条名为CutConto......
  • 使用Python时如何避免`setattr`(和`getattr`)?以及是否有必要避免
    如果我想向协议缓冲区中的字段添加一个在编译时未知的值,我目前正在做setattr我通常不喜欢使用setattr,因为它看起来不太安全。但是当我知道该对象是protobuf时,我认为这很好,因为我设置它的值必须是protobuf允许的类型。所以也许它并不是真的不安全?让我举......
  • Java sshtools 生成的 EDDSA 签名与 Python 的 pycryptome 生成的签名不匹配
    我有一个python库,它使用pycryptodomelibrary使用openssh格式的ED25519私钥使用Ed25519算法对数据进行签名。然后需要使用sshtools库和相应的公钥在Java应用程序中验证签名。但是签名验证失败。约束:从文件中读取私钥/公钥很重要。我无法......
  • Elastic python请求超时错误:池达到最大大小,不允许更多连接
    我正在使用Elasticsearchpython模块。我正在尝试像这样建立到服务器的连接es=Elasticsearch([config.endpoint],api_key=config.key,request_timeout=config.request_timeout)服务器连接,然后我尝试执行丰富策略。es.enr......