首页 > 其他分享 >Celery的使用

Celery的使用

时间:2023-06-19 18:35:00浏览次数:49  
标签:celery tasks Celery result 使用 print import

celery是什么

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。


消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
并发:Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

使用场景

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计

安装

pip install -U Celery
或着:
sudo easy_install Celery

基本使用

单文件夹下使用

创建celery_test文件夹,pro写生产端,tasks写消费端,result用来获取请求结果

  • 创建消费端代码 tasks.py
import celery
import time

backend = "redis://:密码@ip:port/14"
broker = "redis://:密码@ip:port/15"
cel = celery.Celery('test', backend=backend, broker=broker)

# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
    print("开始发送")
    time.sleep(3)
    return "发送成功%s" % name

在tasks.py的当前目录运行celery的worker
celery -A tasks worker -l info 5版本命令和之前略有不同
出现下面信息表示成功运行

  • 创建生产端代码 Pro.py
    运行后返回一个id值
from tasks import send_sms
ret = send_sms.delay("hahaha ")
print(ret)
  • 创建异步获取结果 result.py
from tasks import cel
from celery.result import AsyncResult
# id是pro运行后获取的id值,app是生产端的实例对象
asyncresult = AsyncResult(id="4b235142-1921-4eff-9e41-27886b273a8e",app=cel)
print(asyncresult.status)
ret = asyncresult.get()
asyncresult.forget()
print(ret)

多文件夹下使用

文件结构如下, mycelery用来保存celery实例的配置

  • mycelery.py 注意此时需要用include关键字,指定异步的任务路径
import celery
backend='redis://:password@ip:port/14'
broker='redis://:password@ip:port/15'
cel=celery.Celery('test',backend=backend,broker=broker,include=[
    'celery_tasks.task01',
    'celery_tasks.task02'
                      ])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
  • task01/02.py
    写入任务函数,注意导包路径
from celery_tasks.mycelery import cel
import time

# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
    print("短信开始发送")
    time.sleep(3)
    return "短信发送成功%s" % name
  • 启动worker,注意启动路径和参数里包含的相对路径celery_tasks.mycelery
    建议后续都从根路径启动workder,并用相对路径指定配置文件的位置,
    这里是在celery_test2的根路径下启动的worker
young_shi@MacBook-Air-2 celery_test2 % ls                                            
celery_tasks    pro.py          result.py
young_shi@MacBook-Air-2 celery_test2 % celery -A celery_tasks.mycelery worker -l info
  • 生产端代码
from celery_tasks.task01 import send_sms
result = send_sms.delay("yuan")
print(result.id)
result2 = send_sms.delay("alex")
print(result2.id)

celery执行定时任务

celey的delay方法可以异步执行,而定时任务要用到apply_async方法

  • 简单结构下的定时任务 在pro文件给apply_async添加延时
from celery_task import send_email
from datetime import datetime

# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
  • 多任务结构中

标签:celery,tasks,Celery,result,使用,print,import
From: https://www.cnblogs.com/Young-shi/p/17491868.html

相关文章

  • Git使用教程(带你玩转GitHub)
    Git使用教程(理论实体结合体系版)下载安装:按照这个博客来就好Windows系统Git安装教程(详解Git安装过程)-学为所用-博客园(cnblogs.com)Git命令大全:Git大全-Gitee.com最小配置:在桌面右键点击GitBashHere进入命令行,GUI我们不常用。首先要设置你的用户名称和e-mail......
  • linux C语言 使用socket获取本机所有IP地址
    #include<stdio.h>#include<sys/ioctl.h>#include<net/if.h>#include<arpa/inet.h>/******************************************************函数功能:获取本机所有ip地址。*输入参数:*max_ip_num:ip_buf能存的最多ip个数;*输出参数:*ip_b......
  • 在.Net Core6中使用log4net组件写日志到本地文件的操作流程
    原文链接:https://blog.csdn.net/kevin860/article/details/1068810621.引用包: Log4Net  Microsoft.Extensions.Logging.Log4Net.AspNetCore Microsoft.Extensions.Logging //loggingbuilder.AddFilter该方法需要引入Microsoft.Extensions.Logging名称空间 ......
  • Instruments中常用Template的使用
     Instruments是苹果提供的Xcode套件,可用于分析iOS,MacOS程序的性能数据,进行性能提升。Instruments提供了很多类型的Template,用于特定场景的分析。这里选了3种常用的Template进行使用方法的讲解,对于其他Template的用法则用到时再了解吧,没必要一次把所有的类型都学习一遍。讲解的Te......
  • python 日志使用
    python日志使用日志基础教程日志是对软件执行时所发生事件的一种追踪方式。软件开发人员对他们的代码添加日志调用,借此来指示某事件的发生。一个事件通过一些包含变量数据的描述信息来描述(比如:每个事件发生时的数据都是不同的)。开发者还会区分事件的重要性,重要性也被称为等级......
  • 记录 Windows 下绿色版 PostgreSQL 部署使用
    使用官方的安装包,可能会在最后的步骤遇到各种有关服务运行的问题,绿色版就非常简单了,记录一下绿色版的下载部署。1、下载地址:https://www.enterprisedb.com/download-postgresql-binaries2、将文件解压到想放置的目录3、进入pgsql\bin目录,打开命令提示符执行以下命令::初......
  • POSTGRESQL 设置hugepage 可以让系统使用内存更有效率,防止OOM
    https://www.percona.com/blog/why-linux-hugepages-are-super-important-for-database-servers-a-case-with-postgresql/https://bbs.huaweicloud.com/blogs/detail/156799Hugepage是什么,基于LINUX系统,大页面对虚拟内存管理是有必要的。除标准的4KB页面之外,还进行内存中的大页面......
  • ABAP READ_TEXT, SAVE_TEXT函数使用,物料,利润中心等长文本批量读取及维护
    一.READ_TEXT函数读取长文本本文以利润中心长文本读取及维护为例子,事务码:KE521.查看长文本参数,输入KE52事务码查看图片12.点击书写按钮后,输入需要维护的长文本语言。查看图片23.点击转到->表头,其中文本名,语言,ID和文本对象为调用函数需要的参数查看图片34.READ_T......
  • QT sqlite 使用
    非常适合QT本地的数据落地,基本语法和oraclemysql有一定的差异,建议本地安装sqlite客户端,一边操作,一边开发下载页:http://www.sqlite.org/download.html1。删除数据deletefromin_store_code;2.  删除表droptablein_store_code;3。创建表 createtablein_store_co......
  • 使用NamedParameterJdbcTemplate指定命名参数
    在本文中,我们将介绍如何在连接到后端Postgres数据库的Spring启动应用程序中使用NamedParameterJdbcTemplate。我们将使用NamedParameterJdbcTemplate从PostgresDB插入,更新和删除员工。为了保持设计的合理性,我将dao,service和controller分开了。服务只是本文的一个转折点。概观Named......