首页 > 其他分享 >celery异步执行任务

celery异步执行任务

时间:2023-03-09 18:56:52浏览次数:37  
标签:异步 app redis celery 任务 执行

celery介绍和安装

Celery 的作用:
-1 异步任务
-2 定时任务
-3 延迟任务

celery的运行原理:
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery架构(Broker,backend 都用redis)

  • 1 任务中间件 Broker(中间件),其他服务提交的异步任务,放在里面排队
    -需要借助于第三方 redis rabbitmq
  • 2 任务执行单元 worker 真正执行异步任务的进程
    -celery提供的
  • 3 结果存储 backend 结果存储,函数的返回结果,存到 backend中
    -需要借助于第三方:redis,mysql

使用场景 :
异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务

celery快速使用

第一步:安装celery模块
pip install celery

第二步:
在scripts文件夹中 创建 celery_m文件夹
在celery_m文件夹下创建 main.py文件

from celery import Celery
# 提交的异步任务在这里面 指定redis地址和库
broker = 'redis://127.0.0.1:6379/2'
# 完成的结果放在这里面  指定redis地址和库
backend = 'redis://127.0.0.1:6379/3'

app = Celery(main='celery_text', backend=backend, broker=broker)
# 生成一个对象

@app.task
def add(a, b):
    import time
    time.sleep(3)
    return a + b
  
  
# 给需要用到异步执行的方法装上装饰器


第三步:

提交异步任务
可以再任意文件中执行
from main import add
把带异步执行的方法导入

# 异步调用
res = add.delay(2,3)
# 放入异步中待执行
print(res)
# 得到一个任务id 已经把这个任务提交到了异步待执行状态
# 9eb94989-ed8a-4d70-a349-658d026aa419




第四步:
开启异步任务执行 所有放在异步中的任务都会执行
在pycharm终端开启 路径要在 celery_m文件夹下
'''执行该命令需要到celery_mk这一级目录'''
执行命令
celery  -A celery  worker -l info




第五步:
根据待执行任务id 查看结果

# 查看异步提交的结果
from main import app
 # 导入我们创建的Celery对象 名叫app
from celery.result import AsyncResult
id = '0af2910f-f8aa-4f70-9c4e-bf09b95c8672'
# 放入异步是就会返回一个id给你, 根据id来查看异步执行的结果
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  
        result = a.get() 
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构

celery_pack         # celery包
    ├── __init__.py # 包文件
    ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    └── tasks.py    # 所有任务函数
├── add_task.py  	# 添加任务
└── get_result.py   # 获取结果

执行异步任务/延迟任务

celery.py---创建app

# 1)创建app + 任务

# 2)启动celery(app)服务:
MAC:
命令:celery -A celery_pack  worker -l info
要在celery_pack同级目录下执行命令/或上级目录

windows:
pip3 install eventlet
celery -A celery_pack  worker -l info -P eventlet

# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本


from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/2'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])

include=[需要管理的任务函数文件]
tasks.py--设置任务
统一存放编写需要用到celery服务的方法  

from .celery import app
import time


@app.task
def add(n, m):
    time.sleep(10)
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的结果:%s' % (n - m))
    return n - m

add_task.py--提交任务文件可单独放置

导入设置好的任务 通过.delay方法提交到worker中执行
得到一个任务id返回值,可以用此返回值查询异步worker执行好的结果

需要在父级路径下执行 
celery -A celery_pack  worker -l info
# 开始worker服务


#1 异步任务   
任务.delay(参数) 
# 延迟任务
任务.apply_async(args=[参数],eta=时间对象)


from celery_pack.tasks import add
# 传入需要异步执行的方法


# 添加立即执行任务
d1 = add.delay()
print(d1)


'''
设置延迟任务 
生成一个时间对象,执行.apply_async方法,需要参数传入 args=(200, 50)中,
eta= 过期时间 配置一个时间对象 
'''
# 添加延迟任务
eg:
from datetime import datetime, timedelta
time_obj=datetime.utcnow() + timedelta(seconds=30)
# 生成一个时间对象,延迟30秒执行(timedelta(seconds=30)) 

任务.apply_async(args=(200, 50), eta=time_obj)

该任务将会在运行后延迟30执行

get_result.py--获取结果

from scripts.celery_pack.celery import app

from celery.result import AsyncResult

id = '1937598a-f62a-4682-b93c-8a97a7e6e8fd'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

执行定时任务

celery.py文件中 ---添加定时任务配置

第一步:

from celery import Celery

broker = 'redis://127.0.0.1:6379/2'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/3'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])



# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from celery.schedules import crontab
app.conf.beat_schedule = {
    'task_1': {
        'task': 'celery_pack.tasks.bobo',
        # 'schedule': timedelta(seconds=3),
        # 时间对象 每3秒执行一次
        'schedule': crontab(hour=23,minute=58),  # 每天晚上23点58分执行
        # 'args': (300, 150),
    },
    'task_2': {
        'task': 'celery_pack.tasks.bobobo',
         # 需要执行的函数
        # 'schedule': timedelta(seconds=3),
        # 时间对象 每3秒执行一次
        'schedule': crontab(hour=8, day_of_week=1) # 每周一早八点执行
        # 'args': (300, 150),
    }
}

第二步:

tasks.py中 编写定时任务

from .celery import app


@app.task
def bobo():
    print('我执行了')
    return '定时任务'


@app.task
def bobobo():
    print('我执行了')
    return '定时任务'

第三步:

确定开始启动定时任务

切记 要在celery_pack包所在的父级路径下执行

1.启动beat
celery -A celery_pack beat -l info


2.启动worker
celery -A celery_pack worker -l info

# 只要任务的代码有改动都需要重启worker

django中使用celery

celery.py中

from celery import Celery
import os

broker = 'redis://127.0.0.1:6379/2'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/3'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'LufeiApi.settings.dev')
如果要在django中使用就一定要加这句话

tasks.py中

from .celery import app

from LufeiApi.apps.user.models import User

@app.task
def get_username(user_id):
    user_obj = User.objects.filter(pk=user_id).first()
    return user_obj.username
  
  
把需要异步执行的任务 编写到这里面

标签:异步,app,redis,celery,任务,执行
From: https://www.cnblogs.com/moongodnnn/p/17201063.html

相关文章