首页 > 其他分享 >celery架构介绍和基本使用

celery架构介绍和基本使用

时间:2022-11-23 15:33:11浏览次数:60  
标签:task 架构 worker redis 介绍 celery 任务 执行

一、 Celery架构介绍

  • 分布式异步任务框架,可以支持大量任务的并发执行(celery服务主要为其他项目服务提供异步解决任务需求的)
  • 可以不依赖任何服务器,通过自身命令,启动服务,内部支持socket。
  • 不支持Windows,目前装一个第三方支持包,让它在Windows上运行。
  • 官方中文文档:http://docs.jinkan.org/docs/celery/

  1. 使用场景

  • 异步执行:解决耗时任务,将耗时操作任务提交给celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等(区分同步任务)
  • 延迟执行:解决延迟任务,比如延迟推送消息,订单30分钟内未支付,订单自动取消等
  • 定时执行:解决周期任务,比如每天数据统计,每天定时发送邮件等

  2. 组成部分

 

  • broker:任务中间件,用户提交的任务,存在这个里面(redis缓存数据库,rabbitmq是专业的消息队列)
  • worker:任务执行者,消费者,真正执行任务的进程(可能有多个worker)
  • backend:任务结果存储,任务执行后的结果。(也可能是完成的任务,也可能是没有完成的结果,抛异常的结果等,它们放在redis,rebbitmq,甚至是关系型数据库中) 图例:

 

 

 

二 、Celery的简单使用

 1. 安装

pip install celery==5.1.2

2. 创建任务文件:celery_task.py

from celery import Celery
import time

# backend='redis://:密码@127.0.0.1:6379/1'   如果有密码,固定写法
# 任务提交到1,worker执行完之后,结果存到2
broker = 'redis://127.0.0.1:6379/1'  # redis地址   6379端口的第一个库
backend = 'redis://127.0.0.1:6379/2'  # redis地址  6379端口的第二个库


# 1. 实例化得到一个对象
app = Celery(__name__, broker=broker, backend=backend)


# 2. 写任务
# 使用装饰器包裹任务(函数)
@app.task()
def add(a, b):
    time.sleep(2)   # time睡两秒
    return a+b

3. 提交任务:celery_submit.py

 ① 同步执行,直接得到结果

'''
普通的同步任务:
定义两个变量,得到一个对象,用对象一装饰就装饰了一个任务,下面可以写一堆任务。接下来再把任务提交到broker消息中间件中,再调函数同步执行
'''
import celery_task

res = celery_task.add(2, 3)     # 普通的同步任务,同步执行任务。
print(res)      # 5

 

 ② 异步任务

  • 第一步,提交 (使用任务名.apply_async(参数))  
    结果是任务id号,唯一标识这个任务
  • 第二步,让worker执行 -> 结果存到redis
  • 第三步,查看任务执行结果

异步提交任务:

import celery_task

# 有参数传参数,没有参数就不用传。可以使用两种方式传
# res = celery_task.add.apply_async(args=[2,3])   # 按位置传参
res = celery_task.add.apply_async(kwargs={'a':2, 'b':3})    # 按关键字传参
print(res)  # cb745b27-b848-4229-8e1c-408d7343fb83  结果是任务id号,唯一标识这个任务

 

在redis的db1中可以查看到celery的任务ID号:

 

 4. 执行任务:通过worker命令启动执行

① 非Windows系统
  • 5.x之前:celery worker -A celery_task -l info
  • 5.x以后:celery -A celery_task worker  -l info
② Windows系统
  • 安装eventlet
    • 通过命令安装 pip install eventlet
    • 通过pycharm  
  • 启动命令:
    • 5.x之前:celery worker -A celery_task -l info
    • 5.x以后:celery -A celery_task worker -l info

  

 

 

 启动后执行:

 

 在redis中查看到执行结果:

 

 5. 查看任务结果:celery_result.py

 

# 查看任务结果
from celery_task import app

from celery.result import AsyncResult

id = 'cb745b27-b848-4229-8e1c-408d7343fb83'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app) # 实例化等到一个对象,id就是上面的那个id,app就是刚刚celery_task中实例化得到的app
    if a.successful():  # 如果成功
        print('任务执行成功了')
        result = a.get()  # 拿到异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

 

 

标签:task,架构,worker,redis,介绍,celery,任务,执行
From: https://www.cnblogs.com/tully/p/16916855.html

相关文章