首页 > 其他分享 >Celery快速入门的教程

Celery快速入门的教程

时间:2024-03-29 17:57:27浏览次数:15  
标签:celery 教程 task 入门 app sms send Celery print

1、快速入门

1.1、config.py【redis和rabbitmq配置代码】

from celery import Celery

broker = 'amqp://celery:[email protected]:5672/celery_host'  # rabbitMQ
backend = 'redis://:[email protected]:6379/2'  # redis地址
# 1 实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker)

1.2、tasks.py【异步的任务代码】

from config import app

# 相加的任务
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

1.3、调用的代码【main.py】

# 启动的命令
# windows:
# pip3 install eventlet
# celery -A tasks worker -l info -P eventlet


import tasks
from config import app
from celery.result import AsyncResult


def sync_test():
    # 同步代码
    res = tasks.add(2, 3)  # 普通的同步任务,同步执行任务
    print(res)


def async_test() -> str:
    # 异步代码
    return tasks.add.apply_async(kwargs={'a': 2, 'b': 3})


def get_result(task_id):
    # 查看任务执行结果
    a = AsyncResult(id=task_id, app=app)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')


if __name__ == '__main__':
    #1、同步任务
    # sync_test()

    #2、开启任务
    # task_id = async_test()
    # print(task_id) # aff61b49-bbeb-4a71-9332-76b1e7e74c93
    get_result('aff61b49-bbeb-4a71-9332-76b1e7e74c93')

2、多任务创建运行

2.1、目录结构

├── celery_task
│   ├── compute_task.py
│   ├── __init__.py
│   ├── main.py
│   ├── order_task.py
│   └── sms_send_task.py
├── config.py

2.2、任务代码

2.2.1、任务1【celery_task/compute_task.py】

from config import app


@app.task()
def add(a, b):
    return a + b

2.2.2、任务2【celery_task/sms_send_task.py】

import time

from config import app


# 发送短信任务
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模拟发送短信延迟
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'

2.2.3、任务3【celery_task/order_task.py 】

from config import app


# 生成订单任务
@app.task()
def make_order():
    with open(r'C:\order.txt', 'a', encoding='utf-8') as f:
        f.write('生成一条订单\n')
    return True

2.4、celery配置代码【config.py】

from celery import Celery

broker = 'amqp://celery:[email protected]:5672/celery_host'  # rabbitMQ
backend = 'redis://:[email protected]:6379/2'  # redis地址
# 1 实例化得到celery对象

app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.compute_task',
    'celery_task.order_task',
    'celery_task.sms_send_task'
])
app.conf.update({'broker_connection_retry_on_startup': True})

2.5、运行worker

celeryProject>celery -A config worker -l info -P eventlet

2.6、运行任务

from celery_task import compute_task, order_task, sms_send_task
from config import app


def send_task():
    # 提交一个发送短信任务
    res = sms_send_task.send_sms.apply_async(args=['18972370000', '8888'])
    print('sms_send_task:', res)
    # 提交一个生成订单任务
    res = order_task.make_order.apply_async()
    print('order_task', res)

    res = compute_task.add.apply_async(args=[1, 1])
    print('compute_task', res)


def get_result(task_id):
    from celery.result import AsyncResult
    a = AsyncResult(id=task_id, app=app)
    # print(app.conf)
    if a.successful():
        print('任务执行成功了')
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')


if __name__ == '__main__':
    # send_task()
    task_dict = {'sms_send_task': '587808ea-f968-42ad-ab42-b47bbc0b9940',
                 'order_task': '943c4a74-5b33-4352-a8fa-27d9efd7524b',
                 'compute_task': '30b66a71-71b1-42bd-8eb0-cd1afddf177c'}
    for _, task_id in task_dict.items():
        print(get_result(task_id))

3、延迟任务

3.1、任务代码【sms_send_task.py】

from config import app


# 发送短信任务
@app.task()
def send_sms(phone, code):
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'

3.2、调用代码

def delay_task():
    # 添加延迟任务方式一:
    from datetime import datetime, timedelta
    # datetime.utcnow()  获取当前的utc时间
    eta = datetime.utcnow() + timedelta(seconds=50)  # 50s后的utc时间
    # 10s后,发送短信
    res = sms_send_task.send_sms.apply_async(args=('12345566650', '8888'), eta=eta)
    print(res)
    # 使用第二种方式执行异步任务(两者传参不同;不写时间,就表示立即执行):
    res = sms_send_task.send_sms.delay('12345566600', '9999')
    print(res)


if __name__ == '__main__':
    delay_task()

3.3、观察命令打印

4、定时任务

4.1、定时的任务代码【sms_send_task.py】

from config import app


# 发送短信任务
@app.task()
def send_sms(phone, code):
    print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code))
    return '短信发送成功'

4.2、配置定时任务【config.py】

from celery import Celery

broker = 'amqp://celery:[email protected]:5672/celery_host'  # rabbitMQ
backend = 'redis://:[email protected]:6379/2'  # redis地址
# 1 实例化得到celery对象

app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.compute_task',
    'celery_task.order_task',
    'celery_task.sms_send_task'
])
app.conf.update({'broker_connection_retry_on_startup': True})

###修改celery的配置信息    app.conf整个celery的配置信息
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 配置定时任务
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {
        'task': 'celery_task.sms_send_task.send_sms',  # 指定执行的是哪个任务
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('18953675221', '8888'),
    }
}

4.3、启动worker

celeryProject> celery -A config worker -l info -P eventlet

4.4、启动beat

celeryProject> celery -A config  beat -l info

 

标签:celery,教程,task,入门,app,sms,send,Celery,print
From: https://www.cnblogs.com/ygbh/p/18104108

相关文章

  • Visual Studio Code & Python教程1简介
    1简介VisualStudioCode是一款开源代码编辑器,可免费使用,完全支持Python编程语言的开发。它具有一些有用的功能,例如与世界各地的其他程序员进行实时协作。本章旨在介绍VSCode,帮助您了解其开发过程及其不同组件。我们将讨论为什么它可能是最受欢迎的代码编辑器,了解它的功能,并讨......
  • 2.java openCV4.x 入门-hello OpenCV
    专栏简介......
  • 前端 Typescript 入门
    前端Typescript入门Antdesignvue4.x基于vue3,示例默认是TypeScript。比如table组件管理。vue3官网介绍也使用了TypeScript,例如:响应式API:核心华为的鸿蒙OS(HarmonyOS)开发中也可以使用TypeScript本篇目的用于对TS进行扫盲Tip:ts路线图ts是什么TS是TypeScript的......
  • C++从入门到精通——函数重载
    函数重载前言一、函数重载概念二、函数重载的分类参数类型不同的函数重载参数个数不同的函数重载参数类型顺序不同的函数重载三、函数重载的具体代码展示main.cpp四、为什么为什么C++支持函数重载,而C语言不支持函数重载呢前言函数重载是指在同一个作用域内,可以定......
  • C++从入门到精通——缺省参数
    缺省参数前言一、缺省参数概念二、缺省参数分类位置参数的缺省参数全缺省参数半缺省参数关键字参数的缺省参数函数指针的缺省参数`lambda`表达式三、缺省参数的具体代码展示main.cpp前言缺省参数是在函数定义时指定的默认值,当调用函数时未提供该参数的值时,将使......
  • JavaScript快速入门笔记之七(String:字符串类型、RegExp:正则表达式)
    JavaScript快速入门笔记之七(String:字符串类型、RegExp:正则表达式)String:字符串类型什么是字符串?底层本质:一串字符组成的只读字符数组包装类型:临时封装原始类型数据,并提供对数据操作方法的对象——类型名和原始类型名相同!StringNumberBoolean何时使用:不必手动创建!......
  • DICE模型教程
    原文链接:DICE模型教程https://mp.weixin.qq.com/s?__biz=MzUzNTczMDMxMg==&mid=2247599474&idx=6&sn=bd716d5719ddd8bd6c565daa0f361b72&chksm=fa820495cdf58d83360a402cb2a05042e0f13e6d84d96ee36708e4c5ce90fa124ad9a30b9717&token=1105644014&lang=zh_CN#......
  • 针对框架以外的页面路由配置教程
    一.找到项目中的routes.js//学生注册页面路由constAoYuStudentRegister=[{path:'/MarketAoYuStudentRegister',name:'学生信息填写',component:_import('system/AoYuStudentRegister')}]path:‘/MarketAoYuStudentRegister’:表示当用户访问/M......
  • podman 入门实战
     一入编程深似海,从此节操是路人。最近使用podman,就想着写一篇总结性的笔记,以备后续参考。就如同写代码,不写注释,过了一段时间可能会想这是我写的吗?不会吧,还要理一下逻辑才能读懂,不利于后期维护。感觉整体体验下来,镜像获取、容器创建、容器监控、容器移除,和docker差不多,感觉可......
  • 大模型自学教程
    GitHub2K+星、B站播放量超30万,大模型入门看这本书就够了!(文末送书)_应用_ChatGPT_语言  llm-course,狂飙13.5KStar,GitHub上最全的开源大模型教程-知乎大模型教程-哔哩哔哩_Bilibili......