首页 > 其他分享 >celery笔记三之task和task的调用

celery笔记三之task和task的调用

时间:2023-06-02 21:00:45浏览次数:46  
标签:重试 task tasks1 笔记 celery add retry div

本文首发于公众号:Hunter后端 原文链接:celery笔记三之task和task的调用

这一篇笔记介绍 task 和 task 的调用。

以下是本篇笔记目录:

  1. 基础的 task 定义方式
  2. 日志处理
  3. 任务重试
  4. 忽略任务运行结果
  5. task 的调用

1、基础的 task 定义方式

前面两篇笔记中介绍了最简单的定义方式,使用 @app.task 作为装饰器:

@app.task
def add(x, y):
    return x + y

如果是在 Django 系统中使用 celery,需要定义一个延时任务或者周期定时任务,可以使用 @shared_task 来修饰

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

在 Django 系统中使用 celery 的方式会在接下来的几篇笔记中介绍道。

多个装饰器

如果是 celery 的任务和其他装饰器一起联用,记得将 celery 的装饰器放在最后使用,也就是列表的最前面:

@app.task
@decorator1
@decorator2
def add(x, y):
    return x + y

task名称

每个 task 都有一个唯一的名称用来标识这个 task,如果我们在定义的时候不指定,系统会为我们默认一个名称,这些名称会在 celery 的 worker 启动的时候被系统扫描然后输出一个列表展示。

还是上一篇笔记中我们定义的两个 task,我们给其中一个指定 name:

#tasks1.py
from .celery import app


@app.task(name="tasks1.add")
def add(x, y):
    return x + y

可以观察在 celery 的 worker 启动的时候,会有一个输出:

[tasks]
  . proj.tasks2.mul
  . tasks1.add

可以看到这个地方,系统就会使用我们定义的 name 了。

2、日志处理

我们可以在启动 worker 的时候指定日志的输出,定义格式如下:

celery -A proj worker -l INFO --logfile=/Users/hunter/python/celery_log/celery.log

在 task 中的定义可以使用 celery 中方法:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

也可以直接使用 logging 模块:

import logging

logger1 = logging.getLogger(__name__)

直接在 task 中输出:

@app.task(name="tasks1.add")
def add(x, y):
    logger.info("this is from logger")
    return x + y

然后在 worker 启动时指定的日志文件就会有我们打印出的日志内容:

[2022-07-24 16:28:33,210: INFO/ForkPoolWorker-7] tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a]: this is from logger
[2022-07-24 16:28:33,224: INFO/ForkPoolWorker-7] Task tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a] succeeded in 0.016244667931459844s: 3

3、任务重试

对于一个 task,我们可以对其设置 retry 参数来指定其在任务执行失败后会重试几次,以及隔多长时间重试。

比如对于下面的 div() 函数,我们来输入除数为 0 的情况查看重试的功能。

当然,这里我们是故意输入参数错误,在实际的项目中可能会是其他的原因造成任务失败,比如数据库连接失败等

任务重试的参数也都在 @app.task() 中定义:

# tasks1.py

@app.task(autoretry_for=(Exception, ),  default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
    return x / y

在这里,autoretry_for 表示的是某种报错情况下重试,我们定义的 Exception 表示任何错误都重试。

如果只是想在某种特定的 exception 情况下重试,将那种 exception 的值替换 Exception 即可。

default_retry_delay 表示重试间隔时长,默认值是 3 * 60s,即三分钟,是以秒为单位,这里我们设置的是 10s。

retry_kwargs 是一个 dict,其中有一个 max_retries 参数,表示的是最大重试次数,我们定为 5

然后可以尝试调用这个延时任务:

from proj.tasks1 import div
div.delay(1, 0)

然后可以看到在日志文件会有如下输出:

[2022-07-24 16:59:35,653: INFO/ForkPoolWorker-7] Task proj.tasks1.div[1f65c410-1b2a-4127-9d83-a84b1ad9dd2c] retry: Retry in 10s: ZeroDivisionError('division by zero',)

且每隔 10s 执行一次,一共执行 5 次,5次之后还是不成功则会报错。

retry_backoff 和 retry_backoff_max

还有一个 retry_backoff 和 retry_backoff_max 参数,这两个参数是用于这种情况:如果你的 task 依赖另一个 service 服务,比如会调用其他系统的 API,然后这两个参数可以用于避免请求过多的占用服务。

retry_backoff 参数可以设置成一个 布尔型数据,为 True 的话,自动重试的时间间隔会成倍的增长

第一次重试是 1 s后 第二次是 2s 后 第三次是 4s 后 第四次是 8s 后 ...

如果 retry_backoff 参数是一个数字,比如是 3,那么后续的间隔时间则是 3 的倍数增长

第一次重试 3s 后 第二次是 6s 后 第三次是 12s 后 第四次是 24s 后

retry_backoff_max 是重试的最大的间隔时间,比如重试次数设置的很大,retry_backoff 的间隔时间重复达到了这个值之后就不再增大了。

这个值默认是 600s,也就是 10分钟。

我们看一下下面这个例子:

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

关于重试的机制,理论上应该是按照我们前面列出来的重试时间间隔进行重试,但是如果我们这样直接运行 div.delay(),得出的间隔时间是不定的,是在 0 到 最大值之间得出的一个随机值。

这样产生的原因是因为还有一个 retry_jitter 参数,这个参数默认是 True,所以时间间隔会是一个随机值。

如果需要任务延时的间隔值是按照 retry_backoff 和 retry_backoff_max 两个设定值来运行,那么则需要将 retry_jitter 值设为 False。

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

然后运行 div 的延时任务,就可以看到延时任务按照规律的间隔时间重试了,以下是日志:

[2022-07-24 19:00:38,588: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 2s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:40,662: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:40,664: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 4s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:44,744: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:44,746: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 8s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:52,870: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:52,872: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 16s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:09,338: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:09,340: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 32s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:41,843: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:41,845: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:02:21,923: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:02:21,925: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:03:02,001: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:03:02,003: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)

因为我们设置的重试间隔时间最大为 40s,所以这个地方延时间隔时间到了 40 之后,就不再往上继续增长了。

4、忽略任务运行结果

有时候延时任务的结果我们并不想保存,但是我们配置了 result_backend 参数,这个时候我们有三种方式不保存运行结果。

1.ignore_result=True 不保存任务运行的结果

@app.task(ignore_result=True)
def add(x, y):
    return x + y

2.app.conf 配置

也可以通过 app.conf 的配置来禁用结果的保存:

app.conf.update(
    task_ignore_result=True
)

3.执行单个任务的时候禁用

from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)

apply_async() 函数的作用相当于是带参数的 delay(),或者 delay() 是简化版的 apply_async(),这个我们下面会介绍。

5、task 的调用

前面简单两个简单的调用方法,一个是 apply_async(),一个是 delay()。

简单来说就是 delay() 是不带参数执行的 apply_async()。

以下用 add() 函数为例介绍一下他们的用法:

delay()

纯粹的延时任务,只能如下操作:

add.delay(1, 2)

apply_async()

带参数的用法,add() 函数的参数用 () 包起来:

add.apply_async((1, 2))

也可以带其他参数,比如上面介绍的不保存运行结果:

add.apply_async((1, 2), ignore_result=True)

这个函数还可以指定延时的时间:

countdown参数

现在开始 10s 后开始运行:

add.apply_async((1, 2), countdown=10)

eta参数

也可以用 eta 参数来指定 10s 后运行:

from datetime import datetime, timedelta

now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))

expires参数

这个是用来设置过期的参数:

add.apply_async((1, 2), countdown=60, expires=120)

上面的参数表示,距现在60秒后开始执行,两分钟后过期

标签:重试,task,tasks1,笔记,celery,add,retry,div
From: https://blog.51cto.com/u_16061969/6404999

相关文章

  • vue学习笔记一
    VUE自学目录VUE自学一、vue核心的相关学习一,搭建vue开发环境1.下载vue.js2.下载扩展工具二,helloworld案例三,helloworld项目扩展四,模板语法五,数据绑定六,el和data的两种写法发七,理解MVVM模型八,数据代理1.Object.defineProperty2.什么是数据代理3.vue中的数据代理九,事件处理1.事件......
  • python学习笔记
    #一、切片操作#第一个位置代表区间左端点,第二个位置代表区间右端点,区间左闭右开,第三个位置代表步长print('--------------------------------------------------------------------')seq=[0,1,2,3,4,5,6,7,8,9]print(seq[1:4])print(seq[:5])print(seq[5:])p......
  • 07代码大全2阅读笔记
    第十章变量  隐式声明可能带来错误。 应该尽量避免隐式声明(问题:python中无法避免隐式声明);遵循良好的命名规则并经常检查。  变量初始化策略 在声明时初始化。 第一次使用时初始化。 理想情况下,在第一次使用时生命并初始化变量。 可能情况下使用final和const。......
  • 08代码大全2阅读笔记
    第十一章变量名的力量  变量名应该完全、准确地表示变量指代的事物。避免xxxxxx一类无意义变量名,避免dattmp一类泛泛的变量名。 长度:最佳为10~16个字符,8~20个字符也可以。 变量名与作用域: 短变量名,如itmp常常用作临时变量。有些人因为短变量名有风险,所以建议即使临......
  • 数据治理专业认证CDMP学习笔记(思维导图与知识点)- 第10章参考数据和主数据篇
    大家好,我是独孤风,一位曾经的港口煤炭工人,目前在某国企任大数据负责人,公众号大数据流动主理人。在最近的两年的时间里,因为公司的需求,还有大数据的发展趋势所在,我开始学习数据治理的相关知识。数据治理需要进行系统的学习才能真正掌握,也需要进行专业的考试认证才能证明自己在数据治理......
  • stm32笔记[7]-串口多字节收发
    主要功能使用AliosThingsDevkit开发板实现串口多字节收发,使用内部RTC时钟和AD转换.硬件平台开发板:AliosThingsDevkit芯片:stm32l496vgt6系统时钟:80MHzLED1:PB6(低电平有效)LED2:PE3(低电平有效)LED3:PD15(低电平有效)STLK_TX:PB10(LPUART1)STLK_RX:PB11(LPUA......
  • 学习笔记——软件、软件分类、软件测试、数据的形式与数制(进制之间的转化)
    2023-06-02一、软件1、软件包含的内容:程序、数据和文档①程序:一遍是由编程语言编写的,例如:C、C++、java、Python等。②数据:一般使用文件或者数据库来存储数据③文档:包括安装说明书、帮助文档、许可协议等。 2、软件分类2.1安装功能用途分类(1)系统软件例如:操作系统:Unix、W......
  • 三个博弈-巴什博奕、威佐夫博弈、尼姆博弈。acm博弈算法笔记HDU 2149,1850,1527
    博弈论(一)、acm博弈基础算法BashGame,NimGame和WythoffGame(即巴什博奕、尼姆博弈、威佐夫博弈)Bash  Game: 同余理论Nim   Game: 异或理论WythoffGame: 黄金分割(二)、三个博弈。1、巴什博奕。只有一堆n个物品,两个人轮流从这堆物品中取物, 规定每次至少取一个,......
  • 循环神经网络 RNN LSTM GRU 笔记
    文章目录1.神经网络基础2.RNN循环神经网络2.1背景与概念2.2RNN基本方法2.3拓展3.LSTM3.1概念3.2LSTM基本方法3.3原理解释4.GRU4.1概念与背景4.2GRU基本方法1.神经网络基础上图是一个简单的全连接神经网络结构,每一条连接线上都有一个权重,蕴含着网络学得的“能力”。......
  • opencv 读取图片 图片数据处理 笔记
    1.python中各类图片读取工具的比较参考Python各类图像库的图片读写方式总结可知,opencv的性能表现最好。opencvyyds相关文章收藏:Python-OpenCV:cv2.imread(),cv2.imshow(),cv2.imwrite()Python各类图像库的图片读写方式总结2.opencv常用函数2.1读取图片cv2.imread(filename,fla......