首页 > 其他分享 >Django 集成 celery 与执行结果监控

Django 集成 celery 与执行结果监控

时间:2024-07-31 17:19:56浏览次数:15  
标签:集成 beat py django celery 任务 import Django

Django 集成 celery 与执行结果监控

celery 架构图

一、搭建 Django 项目

  • 环境模块安装
    pip install django==3.2
    pip install celery
    pip install redis
    # window 环境下需要安装eventlet
    pip install eventlet
    
  • 创建 django 项目
    django-admin startproject django_celery
    
  • 修改 settings 配置文件
    # 1-修改代码文件,程序自动重启 
    # 2-Django程序出现异常时,向前端显示详细的错误追踪信息
    DEBUG = True
    
    # 允许访问的主机
    ALLOWED_HOSTS = ['*']
    # 语言
    # LANGUAGE_CODE = 'en-us'
    LANGUAGE_CODE = 'zh-hans'
    
    # 时区
    # TIME_ZONE = 'UTC'
    TIME_ZONE = 'Asia/Shanghai'
    
  • 配置路由与视图并测试项目
    • 在工程同名项目文件夹中创建 views 视图文件,并创建视图函数

      # django_celery/django_celery/views.py
      
      from django.http import HttpResponse
      
      
      def test_celery(request):
          return HttpResponse('邮件发送成功')
      
      
    • urls文件中设置路由映射

      # django_celery/django_celery/urls.py
      
      from django.contrib import admin
      from django.urls import path
      from . import views
      
      urlpatterns = [
          path('admin/', admin.site.urls),
      
          path('celery/', views.test_celery)
      ]
      
    • 运行测试

      • 启动项目

        python manage.py runserver 8080
        
      • 访问测试

        http://127.0.0.1:8080/celery/
        

二、在项目中集成 celery

Celery官方参考文档

  • 版本说明
    Celery 5.3.x支持Django 2.2 LTS或更新版本
    
  • 在工程同名子应用中创建 celery.py 模块,定义 celery 实例
    # django_celery\django_celery\celery.py
    
    import os
    from celery import Celery
    
    # 1. 导入django配置; 后续便于使用django组件功能
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery.settings')
    
    # 2. 实例化celery对象
    app = Celery('django_celery')
    
    # 3. 从django配置文件中导入celery相关的配置; 如果添加 namespace='CELERY' 参数,则celery的配置参数需要添加 "CELERY_" 的前缀
    app.config_from_object('django.conf:settings')
    
    # 4. 在django所有应用中自动搜索 tasks.py 文件, 作为任务文件
    app.autodiscover_tasks()
    
    
  • django 项目的配置文件 settings 中配置 celery 相关配置
    # ---------------- celery 相关配置 ----------------------
    # Broker配置,使用Redis作为消息中间件
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # BACKEND配置,使用Redis作为结果仓库
    RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    
    # 指定 Celery 能够接受的内容类型列表
    ACCEPT_CONTENT = ['json']
    # 任务将以json格式进行序列化
    TASK_SERIALIZER = 'json'
    # 结果以json格式进行序列化存储或传输
    RESULT_SERIALIZER = 'json'
    
    # 任务结果过期时间(单位:秒)
    TASK_RESULT_EXPIRES = 60 * 60 * 24
    # 时区配置
    TIMEZONE = "Asia/Shanghai"
    
    # 配置定时任务
    import datetime
    CELERYBEAT_SCHEDULE = {
        # 自定义周期执行的任务
        'add-every-10-seconds': {
            # 执行的任务
            'task': 'django_celery.tasks.add',
            # 执行时间设置
            # 'schedule': 1.0,  # 1秒执行一次
            # 'schedule': crontab(minute='*/1'),  # 1分钟执行 * 次
            # 'schedule': crontab(month_of_year="4", day_of_month="11", hour="8", minute="42"), # 每年4月11号,8点42分执行
            'schedule': datetime.timedelta(seconds=10),  # 每10秒执行一次
            'args': (123, 321)
        }
    }
    
  • 在工程同名子应用的 __init__ 文件中配置celery; 确保了Django启动时加载了celery应用
    # django_celery\django_celery\__init__.py
    
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    
  • django 应用中定义task任务
    # django_celery/django_celery/tasks.py
    
    import time
    from celery import shared_task
    
    
    # 定义任务函数
    @shared_task
    def send_email(user):
        time.sleep(2)
        return f'发送邮件成功:{user}'
    
    
    @shared_task
    def add(a, b):
        time.sleep(2)
        return a + b
    
    
  • 启动 worker 、启动 beat (需要先配置定时任务,并在项目目录下启动)
    • 在终端中启动 worker

      celery -A django_celery worker -l info -P eventlet
      
    • 在终端中启动 beat

      celery -A django_celery beat -l info
      
  • 在视图文件中发布 celery 任务
    # django_celery/django_celery/views.py
    
    from django.http import HttpResponse
    from django_celery.tasks import send_email
    
    
    def test_celery(request):
        async_result = send_email.delay(request.GET.get('user'))
        return HttpResponse(f'邮件发送成功,id = {async_result.id}')
    
    
  • 访问测试
    http://127.0.0.1:8080/celery/?user=18888888888@163.com
    

三、Admin 后台管理添加定时任务(进阶用法)

  • 背景说明
    为了解决我们每次添加一个定时任务都要 修改setting任务配置,重启 worker 和 beat 的问题
    
  • 安装 django-celery-beat 模块
    pip install django-celery-beat
    
  • settings 配置文件中注册应用,配置调度器
    • 注册应用

      INSTALLED_APPS = [
      	...
          'django_celery_beat',
      ]
      
    • 配置调度器

      # ---------------- celery 相关配置 ----------------------
      # 配置调度器; 一般与 CELERYBEAT_SCHEDULE配置项 二选其一使用
      CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
      
  • 数据库迁移
    • 为什么要进行数据库迁移

      django-celery-beat 应用会创建数据表,用来保存定时任务;该应用读取表数据执行定时任务
      
    • settings 中配置数据库

      DATABASES = {
          'default': {
              'ENGINE': 'django.db.backends.mysql',
              'HOST': '127.0.0.1',
              'PORT': 3306,
              'USER': 'root',
              'PASSWORD': 'mysql',
              'NAME': 'django_celery'
          }
      }
      
    • 安装 MySQL 数据库驱动程序

      pip install PyMySQL
      
    • Django的工程同名子目录的__init__.py文件中添加如下语句, 作用是让DjangoORM的能以MySQLdb 的的方式来调用PyMySQL

      from pymysql import install_as_MySQLdb
      
      install_as_MySQLdb()
      
    • 进行数据库迁移, 生成定时任务需要的数据表

      python manage.py makemigrations
      python manage.py migrate
      
    • 生成的与 celery 相关的数据表

  • 使用Admin 后台管理插入数据(定时任务相关)
    • 通过脚本命令创建管理员账户

      python manage.py createsuperuser
      
    • 使用 simpleui 插件美化 Admin simpleui 使用文档

      • 安装插件

        pip install django-simpleui
        
      • settings 配置文件中注册应用

        # 注意:simpleui应该注册在admin之前
        INSTALLED_APPS = [
            'simpleui',
            'django.contrib.admin',
            ...
            'django_celery_beat',
        ]
        
  • 手动添加任务
    • 启动 workerbeat

      # 这里建议注释掉CELERYBEAT_SCHEDULE={...}自动添加任务的配置项, 我们现在使用admin手动添加任务
      celery -A django_celery worker -l info -P eventlet
      celery -A django_celery beat -l info
      
    • 增加周期性任务

四、Admin 后台管理查看任务运行情况

  • 背景说明
    在控制台监控任务执行情况或者去redis中査看,还不是很方便,最好是能够通过web界面看到任务的执行情况, 如有多少任务在执行,有多少任务执行失败了等。
    
  • 安装 django-celery-results 模块
    • 模块工作原理

      将任务执行结果写入数据库,通过web界面显示; 通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据査看到任务状态。
      同时为可以制定更多的操作
      
    • 安装模块

      pip install django-celery-results
      
  • settings 配置文件中注册应用,修改结果存储位置
    • 注册应用

      INSTALLED_APPS = [
      	...
          'django_celery_beat',
          'django_celery_results',
      ]
      
    • 修改结果存储位置

      # 之前异步任务的执行结果存储在redis中, 修改为存储到数据库中; 因为存储在数据库中我们可以通过admin查看
      # RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
      # 使用django ORM 作为结果存储
      CELERY_RESULT_BACKEND = 'django-db'
      
  • 数据库迁移
    • 执行迁移命令

      python manage.py makemigrations
      python manage.py migrate
      
    • 生成的与 celery_results 相关的数据表

  • Admin 后台查看异步任务执行结果

  • 时区错误解决
    • 错误描述

      ValueError: Database returned an invalid datetime value. Are time zone definitions for your database installed?
      
    • 解决方案

      • 为MySQL安装时区数据

        mysql_tzinfo_to_sql /usr/share/zoneinfo | mysql -u root -p mysql
        
      • 重启数据库

        net stop MySQL  # 先停止服务
        net start MySQL  # 再启动服务
        
      • 确认 Django 时区配置

        LANGUAGE_CODE = 'zh-hans'
        TIME_ZONE = 'Asia/Shanghai'
        USE_I18N = True
        USE_L10N = True
        USE_TZ = False 
        

五、通过 Flower 监控 celery 运行情况

  • Flower 基本介绍
    Flower 是一个用于监控和管理 celery 集群的开源 web 应用程序。它提供有关 celery workers 和tasks状态的实时信息; 如果不想通django的管理界面监控任务的执行,可以通过F1ower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全
    
  • Flower 功能说明
    1. 实时监控 celery 的 Events 
    	- 查看任务进度和历史记录
    	- 查看任务详细信息(参数、开始时间、运行时间等)
        
    2. 远程操作
      	- 查看workers状态和统计数据
      	- 关闭并重新启动workers实例
      	- 控制工作池大小和自动缩放设置
      	- 查看和修改工作实例消耗的队列
      	- 查看当前正在运行的任务
      	- 查看计划任务(预计到达时间/倒计时)
      	- 查看保留和撤销的任务
      	- 应用时间和速率限制
      	- 撤销或终止任务
        
    3. Broker 监控
      	- 查看所有celery 队列的统计信息
    
  • 将异步任务运行结果修改回 redis 存储
    # BACKEND配置,使用Redis作为结果仓库
    RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    # 使用django ORM 作为结果存储
    # CELERY_RESULT_BACKEND = 'django-db'
    
  • 安装 flower
    pip install flower
    
  • 启动 flower
    celery -A django_celery flower --port-5555
    
  • 启动 workerbeat
    # 这里建议注释掉CELERYBEAT_SCHEDULE={...}自动添加任务的配置项, 我们现在使用admin手动添加任务
    celery -A django_celery worker -l info -P eventlet
    celery -A django_celery beat -l info
    
  • 通过浏览器访问存储结果
    http://127.0.0.1:5555/
    

六、任务成功/失败监控告警

  • settings 中配置邮件发送相关的参数
    # ---------------- 邮件发送相关配置 ----------------------
    # smtp.163.com 或者 smtp.qq.com
    EMAIL_HOST = 'smtp.163.com'
    EMAIL_PORT = 465
    # 邮件发送的账号
    EMAIL_HOST_USER = 'YourEmail@163.com'
    EMAIL_HOST_PASSWORD = 'YourAuthorizationPassword'
    # 接收方显示的发件人
    DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
    # 使用SSL
    EMAIL_USE_SSL = True
    
  • Django 应用中找到需要进行任务监控的应用,编辑应用的tasks.py 文件
    import time
    from celery import shared_task
    from celery import Task
    from django.core import mail
    from django.conf import settings
    
    
    # 定义异步任务执行结果的追踪类
    class CrawlTaskTracker(Task):
    
        def on_success(self, retval, task_id, args, kwargs):
            email_content = f'【爬虫任务成功】 任务ID: {task_id}, 任务参数: {args}'
            mail.send_mail('celery监控告警-成功', email_content, settings.EMAIL_HOST_USER, [settings.EMAIL_HOST_USER])
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            email_content = f'【爬虫任务失败】 任务ID: {task_id}, 任务参数: {args}, 请前往flower查看失败原因!!!'
            mail.send_mail('celery监控告警-失败', email_content, settings.EMAIL_HOST_USER, [settings.EMAIL_HOST_USER])
    
        def on_retry(self, exc, task_id, args, kwargs, einfo):
            print('【任务重试】任务ID: {task_id}, 任务参数: {args}')
    
    
    # 将任务结果追踪类绑定到异步任务(tracker为CrawlTaskTracker的对象)
    @shared_task(bind=True, base=CrawlTaskTracker)
    def crawl_cnblogs(tracker):
        print('开始爬虫任务!')
        time.sleep(3)
        return True
    
    
  • 启动 flowerworkerbeat
    # 注意:修改任务需要重启这些工具
    celery -A django_celery flower --port-5555
    celery -A django_celery worker -l info -P eventlet
    celery -A django_celery beat -l info
    
  • Admin 中添加并启动异步爬虫任务(crawl_cnblogs
  • 查看邮件消息收发情况

六、附

  • 附1:文件结构目录
    django_celery
        ├───db.sqlite3
        ├───manage.py
        └───django_celery
            ├───__init__.py
            ├───settings.py
            ├───celery.py
            ├───tasks.py
            ├───urls.py
            ├───views.py
            ├───wsgi.py
            └───asgi.py
    
  • 附2:核心运行环境
    celery==5.4.0
    Django==3.2
    django-celery-beat==2.6.0
    django-celery-results==2.5.1
    django-simpleui==2024.4.1
    django-timezone-field==7.0
    eventlet==0.36.1
    flower==2.0.1
    PyMySQL==1.1.1
    redis==5.0.7
    

标签:集成,beat,py,django,celery,任务,import,Django
From: https://www.cnblogs.com/cs-songbai/p/18335042

相关文章

  • 计算机毕业设计django/flask导师双选指导系统python+vue
    通过分析企业对于本科生导师指导平台的需求,创建了一个计算机管理本科生导师指导平台的方案。文章介绍了本科生导师指导平台的系统分析部分,包括可行性分析等,系统设计部分主要介绍了系统功能设计和数据库设计。 Python版本:python3.7+前端:vue.js+elementui框架:django/flask都......
  • VSCode:Python 虚拟环境未在集成终端中自动激活
    我最近安装了VSCode,并注意到当我打开集成终端时,Python虚拟环境不会自动激活。从此链接中VSCode内提供的信息:https://github.com/microsoft/vscode-python/wiki/Activate-Environments-in-Terminal-Using-Environment-Variables看来Python扩展可能不会对......
  • C#银行卡ocr识别接口的简单集成方式
    银行卡识别接口是指:以文字识别技术为基础衍生的银行卡卡面信息识别接口,该接口可以快速、精准的将银行卡卡面上包含银行卡号、卡类型、银行名称等文字信息提取成功,以帮助需要支付的平台进行银行卡身份的快速核验。企业又该如何快速的对银行卡识别接口进行集成?可以选择翔云......
  • Springboot集成Flyway详解
    1、背景随着项目的增多,各个项目的版本之间存在差异,因此在升级时,维护项目版本和最新版本之间增量的sql脚本成为一个严重的问题,非常耗时耗力,因此引入一个数据库变更管理工具迫在眉睫。目前比较常用的有flyway和liquibase,liquibase使用xml文件来定义和管理数据库脚本,不依赖于具体的......
  • express开发api指南--集成ORM(prisma)
    9.1安装依赖#全局安装prismacnpminstallprisma-g#添加开发依赖cnpminstallprisma--save-dev#创建Prisma架构文件模板,设置PrismaORM项目npxprismainit#安装并生成PrismaClientcnpminstall@prisma/client-gcnpminstall@prisma/client--save9.2连......
  • 自定义的基于线程的监控如何影响 celery 任务的启动时间?
    我使用Flask和celery来构建后端api。为了防止任务运行时间过长,我实现了一个自定义的基于线程的监视类来监视任务,并在任务运行时间过长时停止它们。这是我的实现。importosimportsysfromflaskimportFlaskfromceleryimportCelery,Taskimportloggingf......
  • 搭建 STM32 网关服务器的全流程:集成嵌入式 C++、TCP/IP 通信、Flash 存储及 JWT 认证(
    引言随着物联网(IoT)技术的快速发展,基于STM32的服务器(类似网关)在数据采集、设备控制等方面的应用越来越广泛。本文将介绍搭建一个基于STM32的服务器所需的技术栈,以及详细的搭建步骤和代码示例。技术栈介绍在搭建基于STM32的服务器时,我们需要用到以下技术栈和组件:1.硬......
  • HarmonyOS 集成 Flutter 问题记录
    1、DevEco-Studio升级到DevEco-StudioNEXTDeveloperBeta25.0.3版本之后报错:>hvigorERROR:Schemavalidatefailed.Detail:Pleasecheckthefollowingfields.{instancePath:'modules[2].srcPath',keyword:'pattern',params:{pa......
  • 计算机毕业设计django+vue“爱宠”宠物用品商店【开题+论文+程序】
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着社会经济的快速发展和人们生活水平的不断提升,宠物已成为越来越多家庭的重要成员,宠物经济的繁荣也随之而来。然而,传统的宠物用品购买方......
  • Springboot3集成knife4j,swagger实现在线接口文档
    什么是knife4jknife4j是一个集Swagger2和OpenAPI3为一体的增强解决方案,帮助开发者快速聚合使用OpenAPI规范,快速生成API文档,并且提供一些额外的功能,比如:API文档生成:可以根据Controller和方法上的注解自动生成Markdown格式的API文档在线访问API:可以在knife4j的页面直接访问......