首页 > 其他分享 >Celery包结构

Celery包结构

时间:2024-05-18 12:42:28浏览次数:19  
标签:celery task app py Celery 任务 import 结构

目录结构

项目名
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── user_tasks.py    # 所有用户相关任务函数
    	└── order_tasks.py    # 所有订单相关任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

image-20240516172937784

Celery.py

用于创建celery对象 以及配置相关配置

include用于监控任务

Celery 需要知道所有可用的任务。如果你有多个定义任务的模块,使用 include 可以确保这些模块在 Celery worker 启动时被正确加载,从而确保任务可以被找到并执行。

from celery import Celery
import time
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 1 创建app对象
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task]

task.py

用于定义一些需要执行的任务函数

通常按照功能模块划分

我的例子里 用户相关的任务就被写在user.py,订单相关的任务就被写在order.py

# 以下我只是写了一些示例 没有任何意义
# 定义的任务函数需要用app.task装饰器来装饰

# user.py

from .celery import app
import time

@app.task
def add(x, y):
    time.sleep(2)
    return x + y




# order.py
from .celery import app
import time


@app.task
def send_sms(mobile, code):
    return f'下单成功,手机号{mobile},发送短信{code}成功'

提交任务add_task.py

通过执行这个py文件来提交任务到消息队列中

在这文件中导入想要提交的任务函数

然后对这个函数调用delay方法,有参数的话传入对应参数

from celery_task.order_task import send_sms

#1  同步调用
# res=send_sms('111111',888)
# print(res)

# 2 提交到任务队列被 worker执行
res=send_sms.delay('1895323234',9999)
# 返回值是任务的唯一id
print(res) #536bd01f-0100-4266-93c6-a12ebe7bbef1

查看任务结果get_result.py

在这个文件中定义任务的唯一id

在执行这个py文件,就可以查看任务的结果

from celery_task.celery import app
from celery.result import AsyncResult

id = '536bd01f-0100-4266-93c6-a12ebe7bbef1'
if __name__ == '__main__':
    result = AsyncResult(id=id, app=app)
    if result.successful():
        result = result.get()
        print(result)
    elif result.failed():
        print('任务失败')
    elif result.status == 'PENDING':
        print('任务等待中被执行')
    elif result.status == 'RETRY':
        print('任务异常后正在重试')
    elif result.status == 'STARTED':
        print('任务已经开始被执行')

启动worker

# 包所在目录下,启动worker
celery -A celery_task worker -l debug -P eventlet

延时任务

延时任务需要用apply_async来提交任务

from celery_task.user_task import add
from datetime import datetime, timedelta

# eta需要传一个时间对象
eta = datetime.utcnow() + timedelta(seconds=10)
res = add.apply_async(args=(2, 6), eta=eta)

定时任务

需要在celery.py中写配置文件

然后在启动一个beat 来提交任务到消息队列

from celery import Celery
import time

# 消息中间件
broker = 'redis://127.0.0.1:6379/1'
# 结果存储
backend = 'redis://127.0.0.1:6379/2'

# 1 创建app对象
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])


# 定时任务:配置文件
# 配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-task': {
        'task': 'celery_task.user_task.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (300, 150),
    },
    'send-sms-task': {
        'task': 'celery_task.order_task.send_sms',
        # 'schedule': timedelta(seconds=30),
        'schedule': crontab(hour=11, minute=20),  # 每天11点20执行
        'args': ('189232222', 888),
    },
}


# 启动beat 定时提交任务到消息队列
celery -A celery_task beat -l debug
# 启动worker
celery -A celery_task worker -l debug -P eventlet

路飞项目轮播图定时更新

首先新在celery_task里新建一个任务文件home_task.py(因为轮播图是首页的功能)

然后定义更新轮播图任务

思路就是

拿到数据---》更新数据

from .celery import app
from django.core.cache import cache
from home.models import Banner
from home.common.serializer import BannerSerializer


@app.task
def update_banner():
    # 查询所有轮播图
    queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')[:3]
    serializer = BannerSerializer(instance=queryset, many=True)
    # 拼接图片路径
    for banner in serializer.data:
        banner['image'] = 'http://127.0.0.1:8000/' + banner['image']
    # 更新缓存数据
    cache.set('banner_data', serializer.data)
    return '轮播图数据更新成功'

然后在celery.py里面更新配置文件

# 把home_task这个文件添加到监控列表
app = Celery('app', broker=broker, backend=backend, include=[
    'celery_task.order_task',
    'celery_task.user_task',
    'celery_task.home_task',
])

# 配置定时任务

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'update-banner-task': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=30),
        'args': (),
    },
}


# 启动beat 定时提交任务到消息队列
celery -A celery_task beat -l debug
# 启动worker
celery -A celery_task worker -l debug -P eventl

标签:celery,task,app,py,Celery,任务,import,结构
From: https://www.cnblogs.com/Hqqqq/p/18199222

相关文章

  • Celery快速使用
    安装#0创建Python项目#1创建虚拟环境#2安装celerypipinstallcelery#3安装redis(消息队列和结果存储使用redis)pipinstallredis#4安装eventlet(win平台,如果是mac,linux不需要)pipinstalleventlet快速使用celery_demo.py--主文件fromceleryimportCel......
  • 使用django_celery_beat在admin后台配置计划任务
    使用步骤安装包pipinstalldjango-celery-beatapp注册app注册INSTALLED_APPS=[....'django_celery_beat',]配置文件:屏蔽原来的调度器CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler'设置时区LANGUAGE_CODE='z......
  • Flower 监控celery任务
    Flower监控celery任务如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全Flower是一个用于监控和管理Celery集群的开源Web应用程序。它提供有关Celeryworkers和tasks状态的实时信息功能【1】......
  • Celery admin监视任务
    在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。这里要用到django-celery-results插件。通过插件可以使用Django的orm作......
  • 如何从多个文件夹内转移全部文件(忽略文件夹的结构)(进行复制)(再打包)
    首先,需要用到的这个工具:度娘网盘提取码:qwu2蓝奏云提取码:2r1z 04文件夹里面有只有1个名称为"1"的文件夹,“1”里面有“2”,“2”有“3”,“3”有“4”,从“1”开始,都有5个兔兔的图片,这是“1”里面的文件夹结构,现在要做的就是忽略文件夹结构,提取出全部的兔兔图片合并成一个压缩......
  • 数据结构学习笔记-有向图的度
    求有向图的度问题描述:已知有向图G用邻接矩阵存储,设计算法以分别求解顶点vi的入度、出度和度。【算法设计思想】出度的计算(getOutDegree)遍历法:通过遍历邻接矩阵中顶点vi所在行的所有元素来计算vi的出度。对于每个元素matrix[vi][j],如果其值不为0(表示存在从顶点vi到顶点......
  • celery异步框架
    celery介绍https://github.com/celery/celery/https://docs.celeryq.dev/en/stable/celery是一个分布式异步任务框架,是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务,是一个专注于实时处理的任务队列,支持任务调度,所以celery本质上是一个分布式......
  • Python知识 | Python的数据结构有哪些?
    Python的数据结构有哪些?Python数据结构概览在Python中,数据结构是编程语言的基础,它们决定了数据如何组织和存储。Python的标准库提供了多种内置数据结构,包括:列表(List)列表是一种可变的序列,可以随时添加、删除或修改其元素。列表以方括号[]表示,元素可以是任何类型的数据。元组(T......
  • AD导入DXF画板框结构
    一、安装导入DXF插件1.确定当前软件是否支持导入DXFa.在PCB编辑窗口下,依次选择file>import…b.在importfile窗口下查看文件后缀名下拉菜单是否有DXF,我这里是有的。如果没有需要下载DXF插件;二、导入DXF结构文件至此DXF文件已成功导入PCB。 三、根据DXF画板框1.去......
  • 计算机体系结构-Booth乘法
    原理解释电路实现以Radix-4Booth编码为例,Booth乘法的核心是部分积的生成,需要生成\(N/2\)个部分积,每个部分积与\([X]_补\)有关,存在\(-X,-2X,+X,+2X,0\)这五种可能,其中减去\(X_{补}\)的操作可以认为是按位取反的\(X_{补}\)在末尾+1。为了硬件实现方便,可以将末位1操作提取出来,假......