首页 > 其他分享 >celery

celery

时间:2023-03-08 21:15:36浏览次数:42  
标签:task app py celery 任务 print

celery

介绍

是一个python的框架。

主要用来解决 定时任务 异步任务 延迟任务 的框架

官方文档

https://docs.celeryq.dev/en/stable/

运行原理

1 可以不依赖任何服务器,通过自身命令,启动服务
2 celery服务为为其他项目服务提供异步解决任务需求的
注: 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,ceTery就会在需要时异步完成项目的需求
人(django)是一个独立运行的服务  医院(celery)也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与:但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery架构

任务中间件 Broker, 其他服务提交的异步任务,放在中间件里面排队。

​ 任务中间件是需要借助于第三方redis

任务执行单元 worker 执行异步任务的进程

​ celery 提供的

结果存储 backend 在项目中就是函数的返回结果,存到backend中

​ 也需要借助第三方 redis, mysql

安装

pip install celery
windos启动worker需要下载pip install eventlet 这个来支持

快速使用

第一步

新建main,py设置中间件与执行结果库,写任务

from celery import Celery
"""
参数
中间件  broker
执行结果 backend 
"""

# 中间件 用本地的redis 1库,提交的任务放在里面
broker  ='redis://127.0.0.1:6379/1'
# 执行结果 用本地的redis 2库,执行完的结果放在里面
backend ='redis://127.0.0.1:6379/2'
# 实例化得到对象,并起个名字可以__name__获得这个文件的名字
app = Celery('test',broker=broker,backend=backend)

# 写任务 需要使用装饰器的形式
@app.task
def add(a,b):
    import time
    time.sleep(3)
    return a+b

提交任务 s1.py

# 导入任务提交使用
from main import add
# 同步调用
# res = add(5,6)
# print(res)

# 异步调用
res = add.delay(2,3)
print(res)  # 4ebd35bc-4dc4-4c96-9af7-81649ce607f7 返回了该任务的id值可以根据这个拿到任务结果
"这个时候任务还没有执行,worker没启动,他才是真正干活的人"

启动worker

windos启动worker需要下载pip install eventlet 这个来支持
# 启动worker命令
"""需要切换到任务所在目录下
	main 任务文件名
"""
4.x之前版本
celery worker -A main -l info -P eventlet
4.x之后
celery  -A main worker -l info -P eventlet
mac
celery  -A main  worker -l info
"""敲完命令会一直运行,只要提交任务就会自动执行"""

获取执行结果s2.py

# 把main的里面实例对象app导进来
from main import app
# 导入异步结果模块
from celery.result import AsyncResult
# 放入提交任务得到的id
id='4ebd35bc-4dc4-4c96-9af7-81649ce607f7'
if __name__=='__main__':
    # 实例化得到对象,传id与执行app
    a=AsyncResult(id=id,app=app)
    # 判断是否成功
    if a.successful():
        # 拿到结果
        result = a.get()
        print(result)
    # 执行失败
    elif a.failed():
        print('任务失败')
    # 执行等待
    elif a.status == 'PENDING':
        print('任务等待着被执行')
    elif a.status == 'RETRY':
        print('任务异常正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery包的使用

celery_task  	# celery包
   __init__.py # 包文件
   celery.py   # celery连接和配置相关文件,且名字必须交celery.py
   tasks.py    # 所有任务函数
add_task.py  	# 添加任务
get_result.py   # 获取结果
    

第一步

新建一个celery_task包

第二步

在包的下面新建celery.py存放

from celery import Celery

# 中间件 用本地的redis 1库,提交的任务放在里面
broker  ='redis://127.0.0.1:6379/1'
# 执行结果 用本地的redis 2库,执行完的结果放在里面
backend ='redis://127.0.0.1:6379/2'
# 实例化得到对象,并起个名字可以__name__获得这个文件的名字   include 指定管理的任务
app = Celery('test',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])


第三步

在celery_task下新建user_task.py存放用户任务

# 编写任务
from .celery import app
import time
@app.task
def send_sms(phone,code):
    print("给%s发送短信成功,验证码为:%s" % (phone, code))
    time.sleep(2)
    return True

在celery_task下新建order_task.py 存放订单任务

from .celery import app
import time
@app.task
def add(a, b):
    print('-----', a + b)
    time.sleep(2)
    return a + b

第四步

提交任务和包没关系在外面创建

send_sms_task.py

from celery_task.user_task import send_sms
# 同步调用
# res = send_sms('18888888',8888)

# 异步调用
res = send_sms.delay('18888888',8888)
print(res) #f6e913b7-da1a-462c-b5f4-6a8211d1d565

第五步

启动worker

到包的同级目录下执行命令

celery  -A celery_task worker -l info -P eventlet

第六步

获取结果

包的同级目录下新建get_result.py

# 把main的里面实例对象app导进来
from celery_task.celery import app
# 导入异步结果模块
from celery.result import AsyncResult
# 放入提交任务得到的id
id='f6e913b7-da1a-462c-b5f4-6a8211d1d565'
if __name__=='__main__':
    # 实例化得到对象,传id与执行app
    a=AsyncResult(id=id,app=app)
    # 判断是否成功
    if a.successful():
        # 拿到结果
        result = a.get()
        print(result)
    # 执行失败
    elif a.failed():
        print('任务失败')
    # 执行等待
    elif a.status == 'PENDING':
        print('任务等待着被执行')
    elif a.status == 'RETRY':
        print('任务异常正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

标签:task,app,py,celery,任务,print
From: https://www.cnblogs.com/LiaJi/p/17196262.html

相关文章

  • 路飞-day9——redis之列表类型、redis之hash类型、redis其他方法、redis管道、django
    目录一、redis之列表(List)类型二、redis之hash(字典)类型三、redis其他方法(所有类型通用的方法)四、redis管道五、django中使用redis方式一方式二方案一方案二六、celery介绍......
  • celery
    celery介绍和安装#Celery是什么-翻译过来是芹菜的意思,跟芹菜没有关系-框架:服务,python的框架,跟django无关-能用来做什么-1异步任务......
  • Python实战项目-9 Redis/celery-基础使用
    Redis介绍与安装Redis->缓存数据库【大部分时间用来做缓存,不仅仅可以做缓存】也是称为非关系型数据库,区别与Mysql关系型数据库-noSql:泛指非关系型数据库,notonlySql......
  • redis之列表、redis之hash、redis其他操作、redis 管道、django中使用redis、celery介
    目录1redis之列表2redis之hash3redis其他操作4redis管道5django中使用redis5celery介绍和安装6celery快速使用7celery包结构#1登录注册前端 -登录 -手......
  • celery task 异常捕获的两种方式.(sentry, mail)
    捕获celerytask异常的两种方式,sentry,mailsentry(sentry记录异常.)importsentry_sdkfromsentry_sdk.integrations.celeryimportCeleryIntegrationsentry_......
  • 记- django通过celery beat results实现定时任务
    1.实验环境python版本:3.7.8django版本:3.2.15celery版本:5.2.7django-celery版本:3.2.1django-celery-beat版本:2.4.0django-celery-results版本:2.4.0django-redis版本......
  • Celery
    CeleryCelery是基于Python实现的模块,用于执行异步定时周期任务的celery组成结构#1.app应用任务#2.缓存存放任务的Broker-Backend缓存任务和缓存任......
  • 转载 Celery入门
    原文:https://www.cnblogs.com/pyedu/p/12461819.html一、什么是Celery1.1、celery是什么Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步......
  • celery学习
     原文:https://www.bbsmax.com/A/gVdnwBZXzW/Celery对象核心的对象就是Celery了,初始化方法:classCelery(object):def__init__(self,main=None,loader=None,back......
  • celery
     celery,实现了异步的分布式任务队列。中间是两个队列,左边是执行任务,将任务提交到中间的队列中,右边是worker,能自动发现队列中的任务。左边调用任务,右边worker就会拿到这......