前言
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。
可以使用的场景如下:
- 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
- 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
- 定时调度任务等
Celery 简介
Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。
该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。
接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?
celery 的5个角色
- Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
- Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
- Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
- Beat 定时任务调度器,根据配置定时将任务发送给Broker。
- Backend 用于存储任务的执行结果。
如下图所示
celery的demo
安装命令: pip install celery
from celery import Celery # import redis # 创建一个Celery应用程序 #redis 设置了密码 app = Celery('mycelery', backend='redis://:[email protected]', broker='redis://:[email protected]') #redis未设置密码 # app = Celery('my_app', backend='redis://150.158.47.37', broker='redis://150.158.47.37')
#@app.task 将函数转为celery任务
#@shared_task
:这个装饰器与@task
类似,但是它创建的任务是共享的,可以在多个应用程序中使用
@app.task def add_numbers(x, y): return x + y if __name__ == '__main__':
#delay 触发任务 res = add_numbers.delay(3, 5) print(res) print(res.get()) # 等待结果并将其打印出来
task任务装饰器:
-
@task
:这是最基本的任务装饰器,用于将一个函数转换为Celery任务。它可以带有参数来配置任务的行为。 -
@shared_task
:这个装饰器与@task
类似,但是它创建的任务是共享的,可以在多个应用程序中使用。 -
@task(bind=True)
:当您需要访问任务实例本身时,可以使用此装饰器。通过指定bind=True
参数,任务函数的第一个参数将变成任务实例自身。 -
@task(name='custom_name')
:通过name
参数,您可以为任务指定一个自定义的名字,而不是默认的函数名。 -
@task(queue='custom_queue')
:使用queue
参数可以将任务分配给特定的队列,而不是默认队列。
常用的几个属性
- res.task_id 任务id唯一的,可以根据id拿到结果
- res.status 任务状态:PENDING、STARTED、RETRY、FAILURE、SUCCESS
- res.get() 任务运行结果,必须要任务状态是'SUCCESS',才会有运行结果
- r.successful() 返回布尔值,执行成功返回True
- r.ready() # 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
- r.wait() # 等待任务完成, 返回任务执行结果.
- r.result # 任务执行结果.
- r.state # 和res.status一样,任务状态:PENDING, START, SUCCESS
标签:Celery,task,简介,redis,celery,任务,res,安装 From: https://www.cnblogs.com/7dao/p/17676907.html