首页 > 其他分享 >监控Celery不一定非要使用Flower

监控Celery不一定非要使用Flower

时间:2024-01-30 10:15:56浏览次数:28  
标签:Flower app inspect worker Celery 任务 result 监控

运维平台中有许多的周期/定时/异步任务,例如证书扫描、数据备份、日志清理、线上作业等等,这些任务的执行都是借助于Celery来完成的。任务多了之后就会遇到一系列的问题,例如我之前写过的将任务分多队列来解决生产环境下的任务优先级问题,除此之外还要经常关注队列的状态以及任务的运行情况,为了方便查看任务以及Worker的运行情况,我在后台中添加了队列状态这个功能

这个功能参考了Celery知名监控工具Flower的实现,之所以没有直接使用Flower,主要有几方面的考虑,其一是Flower的页面风格与我们本身的平台风格差异较大,其二是Flower要单独启动进程或者是去看代码深度集成,比较麻烦,其三也是想要更加深入的了解下Celery,毕竟项目用了它,了解不深遇到问题不好解决。基于以上几点考虑,就自己实现了Celery监控功能的开发。好在并不复杂,Celery本身提供了一组API,可以查询任务队列的状态、执行结果等信息。借助于这些API就能完成比较详细的监控,Celery的API主要有两个

inspect

app.control.inspect(): 这个方法返回一个Inspect对象,可以使用它来获取任务队列、工作节点等的信息。例如inspect().active()可以获取当前活动的任务列表,inspect().registered()可以获取已注册的任务列表,不指定worker的情况下查看全部worker的数据,如果指定worker则查看对应worker的数据

具体的用法如下:

from celery import Celery

app = Celery('your_celery_app_name')

# 检查工作节点的在线状态
worker_status = app.control.inspect([worker]).ping()

# 返回工作节点的统计信息,如活动任务数、完成任务数等
worker_stats = app.control.inspect([worker]).stats()

# 返回活动任务的信息
active_tasks = app.control.inspect([worker]).active()

# 返回已注册任务的信息
registered_tasks = app.control.inspect([worker]).registered()

# 返回计划中的任务的信息
scheduled_tasks = app.control.inspect([worker]).scheduled()

# 返回已预订任务的信息
reserved_tasks = app.control.inspect([worker]).reserved()

# 返回已撤销任务的信息
revoked_tasks = app.control.inspect([worker]).revoked()

# 返回活动队列的信息
active_queues = app.control.inspect([worker]).active_queues()

# 查询worker的配置信息
worker_conf = app.control.inspect([worker]).conf()

# 返回工作节点的报告信息
worker_reports = app.control.inspect([worker]).report()

# 查询特定任务的信息
task_info = app.control.inspect([worker]).query_task(task_id)

通过inspect可以查看celery整体以及单个worker的相关信息,例如首页的状态就是通过ping来实现的

一个card表示一个worker,点击card可以查看worker的详细信息,例如Pool标签的数据就来自于stats,Register标签的数据就来自于registered,Tasks标签就分别展示了activescheduledreservedrevoked的任务数据

通过这些信息可以全面掌握Celery的运行状态以及每个Worker的运行情况,做到对Celery的全局掌控

AsyncResult

AsyncResult: 这个类可以查询特定任务的状态和结果。通过任务的ID创建一个AsyncResult对象,并使用其方法来获取任务的状态、结果等信息。例如AsyncResult(task_id).state可以获取任务的状态,AsyncResult(task_id).result可以获取任务的执行结果

具体的用法如下:

from celery import Celery

app = Celery('your_celery_app_name')

result = app.AsyncResult(task_id)

# 获取任务状态
state = result.state

# 获取任务结果
result = result.result

# 返回一个布尔值,检查任务是否已经完成
is_ready = result.ready()

# 返回一个布尔值,检查任务是否成功完成
is_successful = result.successful()

# 返回一个布尔值,检查任务是否执行失败
is_failed = result.failed()

# 返回一个字符串,获取任务的错误追溯信息
traceback = result.traceback

# 返回一个AsyncResult对象,获取任务的父任务
parent_task = result.parent

# 返回一个列表,包含任务的子任务的AsyncResult对象,获取任务的子任务
child_tasks = result.children

# 返回一个字典,获取任务的其他信息
info = result.info

# 获取任务的结果,可以指定超时时间和是否向上传播异常
result = result.get(timeout=10, propagate=False)

# 忘记任务,将任务从结果存储中删除。一旦任务被遗忘,将无法查询其状态和结果
result.forget()

通过AsyncResult可以获取到任务执行的相关信息,对任务执行过程和结果都有很好的把控。不过这需要任务的ID,任务ID通常可以通过任务执行时获取,ops_coffee_auto_notify_task.delay()异步执行任务后返回的就是任务的ID。但对于任务的话我通常会单独记录任务执行的过程和状态,而不依赖Celery的结果记录,所以对于AsyncResult的需求并不强

但平常需要开关系统内置的周期任务,此时就需要知道系统任务列表,通过获取PeriodicTask表的数据即可,同时通过修改PeriodicTask表的enabled字段值来达到开启或暂停的目的

inspect更关注Celery的Worker运行情况,而AsyncResult则更关注于Celery的任务运行状态,通过两者的配合就能更好的掌握周期/定时/异步任务的执行情况了,通过监控功能的开发也对Celery有了更深的了解,同时运维自动化平台也变得更加完善

标签:Flower,app,inspect,worker,Celery,任务,result,监控
From: https://www.cnblogs.com/37Y37/p/17995858

相关文章

  • python自定义装饰器,实现轮询监控数据库,并且根据字段内容变更打印相应日志或结束循环
    使用方法可以写个whileTrue的循环监控数据,再分别定义每个状态要做什么事情注意:循环中不要写breake不要return,换成yield,把函数作为一个生成器,由装饰器控制循环代码示例importfunctoolsimporttimefromutils.log_settingimportloggerfromconfigimportsetti......
  • 专栏:数据库、中间件的监控一网打尽
    前言对于数据库、中间件的监控,目前社区里最为完善的就是Prometheus生态的各个Exporter,不过这些Exporter比较分散,不好管理,如果有很多目标实例需要监控,就要部署很多个Exporter,要是能有一个大一统的Exporter,具备所有这些Exporter的能力就好了。还真有,而且还不止一个,一个是 G......
  • 专栏:数据库、中间件的监控一网打尽
    前言对于数据库、中间件的监控,目前社区里最为完善的就是Prometheus生态的各个Exporter,不过这些Exporter比较分散,不好管理,如果有很多目标实例需要监控,就要部署很多个Exporter,要是能有一个大一统的Exporter,具备所有这些Exporter的能力就好了。还真有,而且还不止一个,一个是......
  • celery实现异步任务、定时任务和延迟任务
    1.异步任务任务名.delay(传参数)2.延迟任务fromdatetimeimportdatetime,timedelta#atetime.utcnow()当前utc时间#当前时间+15秒eta=datetime.utcnow()+timedelta(seconds=15)#取消订单任务res=cancel_order.apply_async(args=['10001',],eta=eta)#15s......
  • luffy_09 celery+1
    前倾回顾:#1redis是什么非关系型数据库(存数据)key-value形式存储5大数据类型:字符串,hash,列表,集合,有序集合纯内存存储#2为什么这么快?-纯内存-io多路复用网络模型-数据操作单线程,避免了线程切换#3python链接redis......
  • celery的快速使用
    celery的快速使用1.安装celery模块pipinstallcelery 2.书写demo.py文件fromceleryimportCeleryimporttimebroker='redis://127.0.0.1:6379/1'backend='redis://127.0.0.1:6379/2'app=Celery('app',broker=broker,backend=backend)......
  • 如何使用保留可探测字段参数的方法解决视频监控管理平台EasyCVR无法启动的问题
    有用户反馈,在使用EasyCVR时出现启动失败,服务无法使用的情况。收到用户反馈后,技术人员立即开展解决,以下为解决步骤:注:此解决方法为保留hardware_version可被探测的字段参数。1、首先查看报错日志:2、由上图可见,报错为LocalMachineCheckError!本地机器检查错误!随后检查配置文件,是否因......
  • 如何使用保留可探测字段参数的方法解决视频监控管理平台EasyCVR无法启动的问题
    安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力,也具备接入AI智能分析的......
  • AI智能视频监控系统自动检测、识别工服工装,准确率更高!
    工服工装的识别在很多行业场景中都具有重要的意义,如公共场所监控、工地管理、人员安全、明厨亮灶、医疗卫生等,员工穿戴特定的工服工装已然成为各个行业的安全要求与标准。随着AI+人工智能技术的快速发展,工服工装识别技术得以应用于实际项目现场中,实现自动化、高效化的管理。方......
  • 安防视频监控平台LntonAIServer视频算法分析平台算法检测明烟明火预警
    今天,我要讲述的是一个关于智慧与安全的故事,它发生在LntonAIServer视频算法分析平台上,一个能够精准检测明烟和明火预警的高科技平台。想象一下,当夜幕降临,城市的喧嚣渐渐平息,大多数人沉浸在甜美的梦乡时,LntonAIServer却在无声中坚守着它的岗位。它的眼睛不眨不闭,通过高......