我有一个 celery 工作线程,并发度设置为 1 ,它从 RabbitMQ 获取任务。我想创建一个在单个并发设置中只有一个队列的系统,因此,所有任务都将添加到主队列中。 关于任务 - 它只是一个循环,我们用
更新状态。| ||并行地我有两个服务。
task.update_state()
Celery-beat
@c_app.task(bind=True)
def task(self):
n = 20
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
print('working')
time.sleep(1)
return n
服务,它创建 1000 个任务(数量作为示例)。
- FastAP I 服务,它提供两个端点:
-
创建任务具有 TOP 优先级并将其添加到主队列
获取有关活动任务和计划任务的实际信息(通过使用
- )
-
因此,可以询问 FastAPI:
inspect()
当前活动任务进度 -|| |还有多少任务-
问题
-
:如何将具有更高优先级的任务添加到已经将任务安排给工作人员的队列中?
inspect().active()
-
这是我尝试过的:
inspect().scheduled()
Celery 配置: 这是我的 Celery-Beat 任务,用于添加大量低优先级任务:
这是我的 FastAPI 端点,用于添加高优先级任务,正如我所料,应该在之后立即执行当前任务正在完成。
“current_progress”端点的“核心”返回当前进度和计划任务:
from celery.schedules import crontab
from kombu import Queue, Exchange
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:root@localhost/celery_test_db'
worker_concurrency = 1
timezone = 'Europe/Moscow'
enable_utc = False
result_extended = True
beat_schedule = {
'add-5-tasks-every-month': {
'task': 'celery_app.tasks.add_5_tasks',
'options': {'queue': 'celery_q'},
'schedule': 20.0
},
}
broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
)
问题
@c_app.task
def add_5_tasks():
for _ in range(800):
task.apply_async(countdown=1, queue='celery_q', priority=1)
:优先级没有按我的预期工作。|| |只有当我在配置中添加这些设置时它才有效:
@f_app.post("/add-task/")
def add_task():
task_ = task.apply_async(priority=9, countdown=1, queue='celery_q')
print('Task added with high priority:', task_.id)
return {'task_id': task_.id,
'message': 'Task added with high priority'}
但是,它会导致
i = c_app.control.inspect()
scheduled = i.scheduled()
reserved = i.reserved()
active = i.active()
变得绝对无用,因为我们连续只获取一个任务,所以工作人员认为我们在计划中只有一个任务。因此,我们可以在
主要问题
中看到单个任务,而不是任务列表:如何启用优先级并从
worker_prefetch_multiplier = 1
task_acks_late = True
获取有关计划任务的所有信息?
inspect().scheduled()
to become absolutely useless, as we fetch only one task in a row, so worker thinks that we have only one task in a schedule. So, instead of list of tasks we can see a single task in
inspect().scheduled()
MAIN QUESTION
: How to enable prioritisation and get all info about scheduled tasks from
inspect().scheduled()
?
你遇到的问题是 Celery 在使用
worker_prefetch_multiplier = 1
时,只会一次性获取一个任务。这意味着,即使你将高优先级的任务添加到队列中,工作进程在完成当前任务之前也不会获取它。这也会影响
inspect().scheduled()
的结果,因为它只能看到当前预取的任务。
为了解决这个问题,你可以尝试以下几种方法:
方法一:增加 worker_prefetch_multiplier 的值
你可以尝试增加
worker_prefetch_multiplier
的值,例如将其设置为 2 或 3。这样,工作进程会一次性预取多个任务,从而更快地获取高优先级的任务。
优点:
- 实现简单
- 能在一定程度上提高高优先级任务的处理速度
缺点:
- 可能会增加工作进程的内存占用
- 如果高优先级任务数量很多,仍然可能出现延迟
方法二:使用单独的队列处理高优先级任务
你可以创建一个专门用于处理高优先级任务的队列。将
worker_prefetch_multiplier
设置为 1,并将 FastAPI 端点配置为将高优先级任务发送到这个队列。
优点:
- 能确保高优先级任务得到及时处理
-
不会影响
inspect().scheduled()
的结果
缺点:
- 需要维护两个队列
- 需要修改代码以支持不同的队列
方法三:使用信号通知工作进程
你可以在 FastAPI 端点添加高优先级任务后,使用 Celery 的信号机制通知工作进程。工作进程收到信号后,可以立即停止当前任务并开始处理高优先级任务。
优点:
- 能最快速地处理高优先级任务
-
不需要修改
worker_prefetch_multiplier
的值
缺点:
- 实现比较复杂
- 可能会影响低优先级任务的处理
示例代码 (方法二):
# Celery 配置
broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
Queue("high_priority_q", Exchange("celery_q"), routing_key="high_priority_q", queue_arguments={'x-max-priority': 9}),
)
# FastAPI 端点
@f_app.post("/add-task/")
def add_task():
task_ = task.apply_async(priority=9, countdown=1, queue='high_priority_q')
print('Task added with high priority:', task_.id)
return {'task_id': task_.id,
'message': 'Task added with high priority'}
# Celery worker 启动命令
celery -A your_app worker -Q celery_q,high_priority_q
选择哪种方法取决于你的具体需求和应用程序的负载情况。建议你根据自己的情况进行测试,选择最适合你的方案。
标签:python,python-3.x,rabbitmq,celery,fastapi From: 78815656