首页 > 其他分享 >celery的使用

celery的使用

时间:2023-07-24 15:46:24浏览次数:29  
标签:multi app args celery result 使用 import

前言

官方文档
中文官方文档
基于 python 开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,但本身不提供消息服务。

Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。

  • 中间件 (Broker):接收和发送消息,通常以独立的服务形式出现。常用 Redis 或 RabbitMQ

使用

启动方式

  1. 直接运行文件 python app.py
示例
from celery import Celery

broker = ...
backend = ...
app = Celery('tasks', broker=broker, backend=backend)


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    # app.worker_main(argv=args)
    app.start(args)
  1. 命令行启动
示例
# module/app.py
celery worker -A module.app -l INFO -c 4

# 后台启动
celery multi start worker -A module.app -l INFO
  • -c:进程数
  • -l:日志级别
  • 重启:celery multi restart ...
  • 停止:celery multi stop ...
  • 等待任务完成再停止:celery multi stopwait ...

基础使用

单文件任务

创建实例
import time

import celery

# 'amqp://USER:PASSWORD@HOST:PORT/QUEUE'        # RabbitMQ
# 'redis://PASSWORD@HOST:PORT/DB'               # redis
broker = 'amqp://guest:[email protected]'
backend = 'redis://localhost:6379/1'

# 创建实例
# tasks:项目名
app = celery.Celery('tasks', backend=backend, broker=broker)


@app.task
def add(*args):
    return sum(args)


@app.task
def multi(*args):
    result = 1
    for i in args:
        result *= i
    return result


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    # app.worker_main(argv=args)
    app.start(args)


生产者(发布消息)
from app import add, multi

result_add = add.delay(2, 3)		# delay()是apply_async()的快捷方法
print(f'add的任务id:{result_add.id}')

result_multi = multi.delay(2, 3)
print(f'multi的任务id:{result_multi.id}')
任务结果校验
import time

from celery.result import AsyncResult

from app import app

result_id = ''		# 填入某个任务的id
async_result = AsyncResult(id=result_id, app=app)

while True:
    if async_result.successful():
        result = async_result.get()
        print(f'结果:{result}')
        # result.forget()     # 将结果删除。执行完成后,结果不会自动删除
        # result.revoke(terminate=True)   # 无论现在是什么时候,都终止
        # result.revoke(terminate=False)   # 如果任务还没开始执行,则终止
        break
    elif async_result.failed():
        print('执行失败')
        break
    elif async_result.status == 'PENDING':
        print('等待执行')
    elif async_result.status == 'RETRY':
        print('重试中')
    elif async_result.status == 'STARTED':
        print('执行中')
    time.sleep(3)

多任务

通过 include 参数指定任务模块

创建实例
# module/app.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/1'

app = Celery(
    'celery',
    broker=broker,
    backend=backend,
    include=[
        'task01',
        'task02',
    ]
)


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    app.start(args)
任务1
# module/task01.py
from app import app


@app.task
def add(*args):
    return sum(args)
任务2
# module/task02.py
from app import app


@app.task
def multi(x):
    return x * x
生产者(producer.py)
import task01, task02

result_add = task01.add.delay(3)
result_multi= task02.multi.delay(3)
print(f'add的任务id:{result_add.id}')
print(f'multi的任务id:{result_multi.id}')

定时任务

创建实例
# 参考“基础使用”
import time

import celery


broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://localhost:6379/1'

app = celery.Celery('tasks', backend=backend, broker=broker)


@app.task(ignore_result=True)     # 不需要结果
def send_email(name):
    print(f'Start send email to {name}')
    time.sleep(5)
    print(f'Over')
    return {'code': 200}
生产者
import datetime

from app import add


now = datetime.datetime.now()
send_time = now + datetime.timedelta(seconds=10)
send_time = datetime.datetime.strftime(send_time, '%Y-%m-%d %H:%M:%S')
print(f'now:{now}')
result = send_email.apply_async(args=['abc', ], eta=send_time)
print(result.id)

进阶

TODO

标签:multi,app,args,celery,result,使用,import
From: https://www.cnblogs.com/FevolQ/p/17577383.html

相关文章

  • 端口未被占用,但是却提示端口无法使用
    端口未被占用,但是却提示端口无法使用​#问题记录#​环境:Windows10问题如下:Anattemptwasmadetoaccessasocketinawayforbiddenbyitsaccesspermissions。1.通过netstat-aon|findstr“3306”​并没有看到端口占用2.通过netshintipv4showdynamicporttcp......
  • Session的使用
     基本配置1.设置sessionsetting.pyapp部分 全局默认配置部分 SESSION_ENGINE="django.contrib.sessions.backends.db"#存储在数据库中SESSION_ENGINE="django.contrib.sessions.backends.file"SESSION_FILE_PATH='sess'#存储在文件中 将配置复制进自己......
  • AirNet使用笔记8
    摘要:SDD显示多监视源航迹;1、SDD同时显示多监视源航迹,在“DataSource”选择。.sdd_offline.conf.0不加点不是隐藏文件也行。[root@ACC-3conf]#more/home/cdatc/AirNet/bin/conf/.sdd_offline.conf.0B_OPS_IS_MAINTAIN=1......
  • 【Boost】Windows端使用 MSVC14.2 编译 Boost 并在 CMake 项目中使用
    Write2023.7.24关于boost在Windows下的使用gcc安装与CLion的配置,能够查到的英文资料都比较少,踩过坑后记录一下。MinGW安装BoostBoostDownload:https://www.boost.org/users/download/下载并解压在某个没有中文路径下的目录中在开始编译操作之前请把gcc添......
  • 编写高质量代码改善程序的157个建议:使用Dynamic来简化反射的实现
    概述最近在看《编写高质量代码改善C#程序的157个建议》。看到第15个建议的时候,结合平时使用的习惯发现有部分出入,没有对不对的说法,只是使用习惯有点区别,跟随着我们来看一看。第15条建议是:使用dynamic简化反射的使用。dynamic的确可以简化反射的使用,但是从性能上来说是有......
  • 阿贝云服务器使用的领悟
    推荐一下阿贝云的免费服务器,没有流量限制,一个机还免费给一个IP,免费可以续期。系统也是可以免费装windows(这比三丰要好,三丰装windows居然要给一块钱)。总结一下,很适合初学者和中小站长。赞,希望做的更好给大家分享一个可以永久免费试用的云电脑。 废话不多说,就是阿贝云这款免费虚......
  • 编码技巧 --- 使用dynamic简化反射
    合集-c#基础(7) 1.编码技巧---如何实现字符串运算表达式的计算07-122.编码技巧---同步锁对象的选定07-133.解读---yield关键字07-174.并发编程---信号量线程同步07-185.并发编程---为何要线程池化07-186.编码技巧---谨防闭包陷阱07-197.编码技巧---使用dyn......
  • vue项目使用vue-virtual-scroll-list虚拟滚动超多千万条数据 cv可用案例
    当我们有大量数据需要显示在列表中时,传统的滚动列表会将所有数据渲染到DOM中,导致性能下降和内存占用增加。虚拟滚动列表通过仅渲染当前视窗内可见的一小部分数据,动态地根据滚动位置更新列表内容,从而实现更高效的列表渲染。vue-virtual-scroll-list是一个用于Vue.js的虚拟滚动......
  • 使用mysqldump备份数据库时报错表不存在,提示信息Table 'mysql.engine_cost' doesn't e
    问题描述:使用mysqldump备份数据库时报错表不存在,提示信息Table'mysql.engine_cost'doesn'texist,如下所示:数据库:mysql5.7.211、异常重现[mysql@hisdb1~]$mysqldump-uroot-S/mysql/data/mysql.sock-P3306--max_allowed_packet=1G--master-data=2--single-transaction......
  • vs2022使用cocos2d-x4.0创建新项目
    cocos源码编译方法:cocos2d-x-4.0.zip解压到如:E:\cocos2d-x-4.0, 进入目录运行下setup.py设置cocos环境变量(这个要python2.x,要先安装python)创建个目录,如win32-build,进入win32-build,运行cmd,然后再运行cmake(cmake后面..表示运行上一级目录的cmake脚本)cmake..-G"Vi......