首页 > 其他分享 >Celery 的详解

Celery 的详解

时间:2023-04-26 23:25:57浏览次数:44  
标签:celery tasks app py Celery 任务 详解

(一) Celery 的使用

1. 概述

Celery 是一个基于分布式消息传递的任务队列,用于异步处理任务和定时任务等。它可以让你将耗时的任务放到后台处理,从而不会阻塞 Web 应用程序的主线程。(例如:发送电子邮件、生成报表、爬虫、定时任务等)

Celery 可以与多种消息代理(如 RabbitMQ、Redis 等)配合使用,并且提供了许多高级特性,例如任务调度、定时任务、任务重试、任务结果存储等。

使用 Celery,你可以将复杂的任务分解成多个小任务,然后将它们分配给多个工作进程或者多个机器来处理。这样可以提高任务的执行效率,减少响应时间,并且可以更好地管理任务和作业。

Celery 是使用 python 编写的分布式任务调度框架,因此非常适合与 Python Web 框架(如 Django、Flask 等)一起使用。

2. Celery 的结构图

image


image

从上图可以看出,Celery 的主要组件构成包括:

  1. Task
    任务,是实际需要执行的代码逻辑。
  2. Beat
    定时任务调度器,用于定时触发任务的执行。
  3. Broker
    任务队列的消息代理,消息代理是一个中间件,它负责接收任务请求并将其分发给可用的工作进程(用于存储和传递任务消息)。常用的 Broker 有 RabbitMQ、Redis、Amazon SQS 等。
  4. Worker
    任务执行者,Celery Worker 是一个独立的进程,它负责从 Broker 中接收任务请求,并执行这些任务。
  5. Result Backend
    结果后端,用于存储任务执行结果,常用的有 Redis、MongoDB、数据库等。

3. Celery 的异步任务

(1) 项目的目录结构:

├─celery_demo
│  ├─app.py
│  └─tasks.py

(2) 安装 Celery

可以使用 pip 命令安装 Celery,如下所示:

pip install celery

# 另外,如果要使用 redis 作为 Broker 的话,还需要安装 redis 模块
pip install redis

(3) 创建任务

在 Celery 中,任务是最基本的单位,你需要定义一个任务并且为其编写处理函数,如下所示:

# tasks.py 文件
from celery import Celery

# 连接 redis 的规则: "redis://用户名:密码@主机:端口/Redis默认的n号数据库"
app = Celery('tasks',
            broker='redis://:123456@localhost:6379/0',
            backend='redis://:123456@localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task
def print_task():
    print("this is a test case.")
    return "this is a test case."

(4) 启动 Celery worker

在任务定义好以后,你需要启动 Celery worker 来执行这些任务,如下所示:

celery -A tasks worker --loglevel=info

命令说明:

  • -A 参数用于指定 celery 的应用程序
  • tasks 为文件名,即 tasks.py 文件
  • --loglevel 参数用于指定日志级别

可通过 celery worker --helpcelery --help 查看更详细的命令说明

(5) 调用任务

在需要执行任务的地方,你可以通过如下方式调用任务:

# app.py 文件
from tasks import add, print_task

# 调用 add 命令
result = add.delay(2, 3)
print(result.get(timeout=5))

# 调用 print_task 命令
result = print_task.delay()
print(result.get(timeout=5))
  • 这里的 add.delay(2, 3) 是异步调用任务的方式,它会立即返回一个 AsyncResult 对象,你可以通过 result.get() 方法来获取任务的执行结果。
  • 使用 python app.py 运行程序,即可看到程序结果的打印,在 redis 中也会有处理结果的存储。
  • 以上是 Celery 的基本使用步骤,还可以根据自己的需求来设置 Celery 的配置参数,比如消息队列的地址、任务的并发数等。
  • 额外说明:
    我在 win10 上启动了 redis 作为消息代理,redis 接收到了任务请求并将其分发给可用的工作进程,但是在存储结果时,总是连不上 redis,一直报连接超时的错误;后面换成了在云服务器 ubuntu 上执行,没有出现过这个问题,推荐使用云服务器或 linux 系统去做测试

(6) 任务执行完成后,redis 中存储的结果

image

4. Celery 的定时任务

Celery 中,你可以使用 beat 模块来设置定时任务。beat 模块是 Celery 的一个独立的组件,它可以独立于 worker 进程运行,它会定期读取你的任务配置,并根据配置调度任务,定时地发送任务到 Broker(消息队列) 中等待 worker 进程执行。

(1) 项目的目录结构:

├─myapp
│  ├─celery_demo
│  │  ├─__init__.py                 # 入口文件
│  │  ├─celeryconfig.py             # celery 的配置文件
│  │  └─tasks.py                    # celery 注册任务的文件

(2) 在 celeryconfig.py 文件中,添加以下配置信息:

# celeryconfig.py

from datetime import timedelta

task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
enable_utc = True

broker_url = 'redis://:123456@localhost:6379/0'
result_backend = 'redis://:123456@localhost:6379/0'
timezone = 'Asia/Shanghai'
beat_schedule = {
    'add-every-30-seconds': {
        'task': 'celery_demo.tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16),
    },
    'task2': {
        'task': 'celery_demo.tasks.print_task',
        'schedule': timedelta(minutes=1),
        'args': (),
    },
}

在上述代码中,我们定义了 Celery 的配置信息。这些配置信息包括代理设置、任务设置和定时任务设置。在定时任务设置中,我们定义了两个定时任务,分别是 add-every-30-secondstask2,并指定了任务的名称、调度时间和参数。

  • add-every-30-seconds 任务为每 30 秒执行一次
  • task2 任务为每 1 分钟执行一次

(3) 在 tasks.py 文件中,添加以下内容:

# tasks.py

from celery_demo import app


@app.task
def add(x, y):
    print("x + y = {}".format(x + y))
    return x + y

@app.task
def print_task():
    print("this is a test case.")
    return "this is a test case."

在上述代码中,我们定义了两个任务,分别是 addprint_task,并指定了任务的逻辑和返回结果。

(4) 最后,在 __init__.py 文件中,添加以下内容:

# __init__.py

from celery import Celery

app = Celery('celery_demo')
app.config_from_object('celery_demo.celeryconfig')

from . import tasks

在上述代码中,我们创建了一个名为 appCelery 应用程序,并从 celeryconfig.py 文件中加载配置信息。

(5) 使用命令来启动 Celery worker 和 beat 服务

通过运行以下命令来启动 Celery worker 和 beat 服务(在 myapp 目录下执行以下命令):

# 1. 先启动 Celery worker 服务
celery -A celery_demo worker --loglevel=INFO

# 2. 再启动 beat 服务
celery -A celery_demo beat --loglevel=INFO

这将会启动一个 Celery workerbeat 服务,并开始执行定时任务。

结果示意图:
image
image
image

(二) Celery 和 Flask 的结合使用

1. 项目的文件结构

文件结构如下:

├─project
│  ├─ app.py
│  └─ tasks.py

2. 项目中各个文件的内容

app.py 文件的内容如下:

from flask import Flask
from tasks import add

app = Flask(__name__)

@app.route('/')
def index():
    result = add.delay(16, 16)
    return "Task ID: {}".format(result.id)

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True, port=5000)

tasks.py 文件的内容如下:

from celery import Celery


celery_app = Celery('tasks',
                broker = 'redis://:123456@localhost:6379/0',
                backend = 'redis://:123456@localhost:6379/0',
            )


@celery_app.task
def add(x, y):
    print("x + y = {}".format(x + y))
    return x + y

3. 安装相关的依赖

pip install flask
pip install celery
pip install redis

4. 使用命令来启动 Celery worker 和 Flask 服务

在命令行中执行以下命令启动 Celery worker:

celery -A tasks worker --loglevel=info

最后,在命令行中执行以下命令启动 Flask 应用:

python app.py

在浏览器中访问 http://localhost:5000/ 就可以看到任务 ID 了。

5. 执行结果示意图

image


image

(三) Celery 的官方文档

Celery 的英文文档
Celery 的中文文档
Celery 中文手册

标签:celery,tasks,app,py,Celery,任务,详解
From: https://www.cnblogs.com/wanghuizhao/p/17357694.html

相关文章

  • Ubuntu ettercap driftnet 详解
    工具:Ubuntuettercapdriftnetsudoaptinstallettercap-commonsudoaptinstalldriftnetettercapshiff->第一个->无线网卡名字ifconfigscanfforhosthostslist绑定192.168.0.1addtotarget2192.168.0.123addtotarget1mitm->arppoisoning勾......
  • MySQL主从复制详解
    主从复制原理+实操什么是MySQL主从复制?​MySQL主从复制是指数据可以从一个MySQL数据库服务器主节点复制到一个或多个从节点。MySQL默认采用异步复制方式,这样从节点不用一直访问主服务器来更新自己的数据,数据的更新可以在远程连接上进行,从节点可以复制主数据库中的所有数据库或......
  • 网卡配置文件详解
    #动态ip[root@qls~]#cat/etc/sysconfig/network-scripts/ifcfg-eth0TYPE="Ethernet"PROXY_METHOD="none"BROWSER_ONLY="no"BOOTPROTO="dhcp"DEFROUTE="yes"IPV4_FAILURE_FATAL="no"IPV6INIT="yes......
  • 一文详解多模态认知智能
    摘要:多模态认知智能是AI人工智能当前发展的主流趋势之一,其核心是以多模态知识的获取,表示与推理为主要内容的跨模态知识工程与认知智能,也是为了更好的处理多模态的数据,需要融合多种感知模态和智能处理技术。本文分享自华为云社区《GPT-4发布,AIGC时代的多模态还能走多远?系列之三:多......
  • P.22-认证配置详解、P.23-权限系统的作用、P.24-授权基本流程
    P.22-认证配置详解在SecurityConfig下进行修改@ConfigurationpublicclassSecurityConfigextendsWebSecurityConfigurerAdapter{//创建BCryptPasswordEncoder注入容器@BeanpublicPasswordEncoderpasswordEncoder(){returnnewBCryptPasswordEn......
  • MySQL索引详解
     DB哥MySQL高级教程-系统学习MySQL共149课时加我微信公众号免费学:DB哥文末有MySQL高级课程目录前言因为现在使用的mysql默认存储引擎是Innodb,所以本篇文章重点讲述Innodb下的索引,顺带简单讲述其他引擎。希望小伙伴们能通过这片文章对mysql的索引有更加清晰的认识,废话不多......
  • 排序算法之详解选择排序
    引入选择排序顾名思义是需要进行选择的,那么就要问题了,选择到底是选择什么呢?选择排序的选择是选择数组中未排序的数组中最小的值,将被选择的元素放在未排序数组的首位如果你对‘未排序数组’,‘选择’的概念不理解,那么你可以看看下面的图思路有了上面的一些基础之......
  • pig grunt shell详解
    输入 pig-xlocal 此时pig和本地的文件系统交互省略 “-xlocal”,pig和hdfs交互1、在pig中执行HDFS的命令grunt>fs-ls/Found5itemsdrwxr-xr-x -rootsupergroup     02013-01-3014:32/datadrwxr-xr-x -rootsupergroup     02......
  • centos linux系统安装详解
    打开vmware,版本差异区别不大选择创建新的虚拟机  选择典型,是默认选项不用改,点击下一步 选择稍后安装操作系统(默认选项不用改),点击下一步 选择linux,并且版本改为centos64位,点击下一步 虚拟机名称随便改,位置是指虚拟机的位置,点击浏览,自己选择位置,点击下一步 最大......
  • 进程间8种通信方式详解******************
    进程间的几种通信方式的比较和线程间的几种通信方式-8种百度经验有介绍8种1、无名管道(pipe):半双工的通信方式,单向流动,血缘关系(父子进程关系)的进程间使用。2、高级管道(popen):将另一个程序当做一个新的进程在当前程序进程中启动,则它算是当前程序的子进程3、有名管道(nemedpipe):半双工的......