首页 > 其他分享 >celery安装和使用

celery安装和使用

时间:2023-07-05 15:44:36浏览次数:31  
标签:task app py celery 使用 print import 安装

安装

pip install celery

简单使用

安装完celery后,会生成一个可执行文件:celery,与pip命令在一个目录下,也就是scripts下。

注意:celery官方不支持在windows上启动,如果确需要在windows上启动服务,需要使用一个第三方模块:eventlet支持
pip install eventlet

启动命令

4.x之前版本
Linux:
celery worker -A 模块名 -l info

windows:
celery -A 模块名 worker -l info -P eventlet

4.x之后版本
Linux:
celery -A 模块名 worker -l info

windows:
celery -A 模块名 worker -l info -P eventlet

# 命令说明:
-A 指定模块名,也就是需要做分步式的模块
-l info  日志级别,info就是最普通的info级别
-P 只在windows环境下使用,linux不需要

测试

  1. 建一个celery_main.py文件
import celery
import time

# 提交的异步任务会放到broker中
broker = 'redis://192.168.0.2:6379/1'
# 执行完的结果,会被放到backend中
backend = 'redis://192.168.0.2:6379/2'

# 类实例化得到对象,第一个是指定任务名
app = celery.Celery('test', broker=broker, backend=backend)


@app.task
def add(a, b):
    # 模拟延迟
    time.sleep(5)
    return a + b
  1. 启动服务
# 先进入到celery_main.py目录下,再执行下面的启动命令
celery -A celery_main worker -l info -P eventlet
  1. 获取任务ID号
  • 创建了commit_process.py文件
# 提交任务
from celery_main import add

# 异步调用
res = add.delay(1, 2)
print(res)  # c71ca743-72d2-45d6-84f5-4b929398ad73
  1. 根据任务ID号获取执行结果
  • 创建了get_result.py文件
from celery_main import app

from celery.result import AsyncResult

id = 'fa773d73-0525-4260-8235-d0bc89d53476'
if __name__ == '__main__':
    task = AsyncResult(id=id, app=app)
    if task.successful():
        result = task.get()
        print(result)
    elif task.failed():
        print('任务失败')
    elif task.status == 'PENDING':
        print('任务等待中被执行')
    elif task.status == 'RETRY':
        print('任务异常后正在重试')
    elif task.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构的使用

  • 创建一个celery的包,未来使用的时候,直接导入使用就可以了。

包架构封装结构

project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 这个就是项目中的任务,以add_task为例
    └── get_result.py   # 获取结果

总结

  • 第一步,新建包celery_task,并且新建文件celery.py(必须叫这个名字)
  • celery.py
from celery import Celery

# 提交的异步任务会放到broker中
broker = 'redis://192.168.0.2:6379/1'
# 执行完的结果,会被放到backend中
backend = 'redis://192.168.0.2:6379/2'

# 类实例化得到对象,第一个是指定任务名
# include别忘了加,它是一个列表,下面的例子中的意思是celery_task/task.py
app = Celery('test', broker=broker, backend=backend, include=['celery_task.task'])
  • 第二步,在包内部,写你要写的任务内容,需要使用@app.task装饰
  • tasks.py
# 此文件下,是程序的各种任务
from .celery import app
import time


# 模拟任务
@app.task
def add(a, b):
    print('正在计算中')
    time.sleep(5)
    return a + b
  • 启动worker服务(此步骤可以放在第一步之后任意步骤,也就是建好celery.py后啥时候起都行)
cd scripts
# 注意要与celery_task目录在同级,不要进入到celery_task中执行
celery -A celery_task worker -l info -P eventlet
  • 第三步,提交任务,任务会被提交到中间件中,等待worker执行,worker启动了,就会被worker执行
  • add_task.py
from celery_task.task import add

res = add.delay(1, 2)
print(res)
  • 第四步,worker执行完,结果会放到backend中

  • 第五步,查看结果

  • get_result.py

from celery_task.celery import app

from celery.result import AsyncResult

id = '51400de7-cc62-455a-886f-43fead62a3c2'
if __name__ == '__main__':
    task = AsyncResult(id=id, app=app)
    if task.successful():
        result = task.get()
        print(result)
    elif task.failed():
        print('任务失败')
    elif task.status == 'PENDING':
        print('任务等待中被执行')
    elif task.status == 'RETRY':
        print('任务异常后正在重试')
    elif task.status == 'STARTED':
        print('任务已经开始被执行')

标签:task,app,py,celery,使用,print,import,安装
From: https://www.cnblogs.com/smyz/p/17525174.html

相关文章

  • VUE 2项目使用vue-json-excel导出数据
    记录一下后端返回的json数据转成excel导出这里外面使用的是vue-json-excel1.安装包npminstallvue-json-excel2.组件中使用<download-excelclass="btnbtn-default":data="json_data":fields="json_fields"worksheet="MyWorksheet"name=&......
  • faiss 使用记录
    importsysimportfaissimportnumpyasnpd=64nb=100nq=10np.random.seed(1234)xb=np.random.random((nb,d)).astype('float32')printxb[:2]xb[:,0]+=np.arange(nb).astype('float32')/1000#sys.exit()printxb[:2]xq=np......
  • 小程序使用echarts(二)
    一、在根据(一)进行使用时出现得问题 1、按照教程图表依旧不显示2、图表不跟随页面滑动二、解决办法(1)不显示1、应当引入插件2、查看父元素是否存在display:flex;属性;以及height:100%类似属性;也不能使用float(2)滑动1、不能存在(1)2中的display:flex;属性,以及图表的父元素不......
  • hadoop集群配置 CentOS7 JDK安装 卸载
    1JDK下载下载JDK8下载地址 2SSH访问需要安装JDK的服务器3检查是否有默认安装的JDKjava-version 4卸载自带JDK4.1检查系统安装的OpenJDKrpm-qa|grepopenjdk-i 4.2  删除以上四项openjdk的安装包并且检查后显示删除完毕rpm-e--nodeps需要删除的软......
  • MySQL数据库8.0.29-8.0.31版本使用 INSTANT 算法新增字段bug
    xxx下发MySQL数据库共性隐患排查通知,要求统一排查MySQL数据库8.0.29及以后版本使用INSTANT算法新增字段后期变更回滚可能导致数据库宕机的隐患,排查方法及整改方法详见下表和附件。请各分支()数据库运营人员集中排查隐患,及时整改。 隐患概述MySQL数据库8.0.29及以后版本......
  • 企业级logstash简单使用(ELK)
    企业级logstash简单使用(ELK)要使用logstash收集到Elasticsearch的方式,需确保logstash版本与es版本一致。由于我也是刚刚研究使用,所以本文暂不会出现原理性的东西。Logstash介绍Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数......
  • winscp 使用root身份登录
    一般root账户在服务器上会被禁止ssh,此时普通用户需要通过sudo执行管理员权限命令。如果需要使用winscp,会有普通用户权限不足的问题。本文提供一个方法可使用普通用户通过sudo提权使用winscp的方法首先确认自己的普通账户sudo能够免密码执行,然后在服务器上查看sftp-server程序......
  • Go Gin JWT token 使用
    packagemainimport("github.com/dgrijalva/jwt-go""log""fmt""errors""time")//一些常量var(TokenExpirederror......
  • 在asp.net core中使用vue3+vite(起)
    前言一开始是一个自用的应用,原本是用razor写的。最近有了点新想法,加点新功能,但是我接触的项目基本都是vue+api的前后端分离,用这razor写的是真不习惯,最后决定还是用习惯的vue重写。之前尝试过在.netcore里使用vue2+webpack,毕竟实际上就是把.vue翻译成了.js来用,一个站点就能跑。......
  • Beyond compare 下载及安装
    compare工具在生活中应用很广泛,可以文档比较,代码比较,方便的看到版本之间的不同之处应用场景一:论文(或其他文档)修改后和之前版本比对应用场景二:代码比对 1.下载:官方下载地址,记得选择汉化版,一步到位,根据自己的操作系统,选择对应的下载,以windows为例,点开windows下的下拉三角,选择ch......