首页 > 其他分享 >异步消息队列Celery

异步消息队列Celery

时间:2024-03-22 17:31:25浏览次数:31  
标签:异步 task celery 队列 py Celery result print import

1. 什么是Celery

4.4.0 | Celery 中文手册 (celerycn.io)

1.1 介绍

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celety本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括RabbitMQ、Redis等等。

任务执行单元

Worker是Celery提供的任务执行单元,Worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store 用来存储Worker执行的任务结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等,另外Celery还支持不同的并发和系列化的手段。

  • 并发: Prefork、Eventlet、 gevent、 threads/single threaded

  • 序列化:pickle、json、yaml、msgpack、zlib、bzip2 compression、Cryptographic message signing等等

1.2 使用场景

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

  • 异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

  • 定时任务:定时执行某件事情,比如每天数据统计

1.3 Celery的优点

Simple(简单)Celery 使用和维护都非常简单,并且不需要配置文件。

  • Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。

  • Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)

  • Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

2. celery 的基本使用

2.1 处理异步任务

消费者

import celery
import time
​
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
​
app = celery.Celery("test", backend=backend, broker=broker)
​
​
@app.task
def send_email(name):
    print(f"向{name}发送邮件...")
    time.sleep(5)
    print(f"向{name}发送邮件完成")
    return "ok"
​
​
"""
celery worker -A celery_task -l info -P gevent
"""
生成者

import time
​
from celery_task import send_email
from celery.result import AsyncResult
from celery_task import app
​
task_list = []
# 发布任务
result = send_email.delay("小明")
print(result.id)
task_list.append(result.id)
​
result = send_email.delay("小红")
print(result.id)
task_list.append(result.id)
​
# 异步监控任务
while task_list:
    task_id = task_list[0]
    async_result = AsyncResult(id=task_id, app=app)
    if async_result.successful():
        result = async_result.get()
        # 会从数据库中删除
        # async_result.forget()
        print(result)
        # 任务执行完成从任务列表中移除
        task_list.remove(task_id)
    elif async_result.failed():
        print("执行失败")
    elif async_result.status == "PENDING":
        print("任务等待被执行")
    elif async_result.status == "RETRY":
        print("任务异常正在重试")
    elif async_result.status == "STARTED":
        print("任务正在执行")
    else:
        print("任务其他的状态")
    time.sleep(1)

2.2 多任务执行

文件结构

| application.py
│ product_task.py
├─celery_task
│  │  task1.py
│  │  task2.py

执行任务

#  task1.py
import time
from application import app
​
​
@app.task
def send_email(name):
    print(f"向{name}发送邮件...")
    time.sleep(5)
    print(f"向{name}发送邮件完成")
    return "ok"
# task2.py
import time
from application import app
​
​
@app.task
def send_smg(name):
    print(f"向{name}发送短信...")
    time.sleep(5)
    print(f"向{name}发送短信完成")
    return "ok"
应用
# application.py
import celery
​
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
​
app = celery.Celery("test", backend=backend, broker=broker,
                    include=["celery_task.task1", "celery_task.task2"])
​
​
"""
celery  -A application worker -l info -P gevent
"""
​

发布任务

# product_task.py
import time
​
from celery_task.task1 import send_email
from celery_task.task2 import send_smg
from celery.result import AsyncResult
from application import app
​
task_list = []
# 发布任务
result = send_email.delay("小明")
print(result.id)
task_list.append(result.id)
​
result = send_smg.delay("小红")
print(result.id)
task_list.append(result.id)
​
# 异步监控任务
while task_list:
    task_id = task_list[0]
    async_result = AsyncResult(id=task_id, app=app)
    if async_result.successful():
        result = async_result.get()
        # 会从数据库中删除
        # async_result.forget()
        print(result)
        # 任务执行完成从任务列表中移除
        task_list.remove(task_id)
    elif async_result.failed():
        print("执行失败")
    elif async_result.status == "PENDING":
        print("任务等待被执行")
    elif async_result.status == "RETRY":
        print("任务异常正在重试")
    elif async_result.status == "STARTED":
        print("任务正在执行")
    else:
        print("任务其他的状态")
    time.sleep(1)

3.3 定时任务

发布延时任务

文件结构

| application.py
│ product_task.py
├─celery_task
│  │  task1.py
│  │  task2.py

执行任务

#  task1.py
import time
from application import app
​
​
@app.task
def send_email(name):
    print(f"向{name}发送邮件...")
    time.sleep(5)
    print(f"向{name}发送邮件完成")
    return "ok"
​
# task2.py
import time
from application import app
​
​
@app.task
def send_smg(name):
    print(f"向{name}发送短信...")
    time.sleep(5)
    print(f"向{name}发送短信完成")
    return "ok"
应用
# application.py
import celery
​
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
​
app = celery.Celery("test", backend=backend, broker=broker,
                    include=["celery_task.task1", "celery_task.task2"])
​
​
"""
celery  -A application worker -l info -P gevent
"""
发布任务
# product_task.py
import time
from datetime import datetime, timedelta
from celery.result import AsyncResult
from celery_task.task1 import send_email
from celery_task.task2 import send_smg
from application import app
​
task_list = []
now_time = datetime.now()
now_stamp = datetime.utcfromtimestamp(now_time.timestamp())
# 发布定时任务, 延时10秒钟执行
result = send_email.apply_async(args=["小明"], eta=now_stamp + timedelta(seconds=10))
print(result.id)
task_list.append(result.id)
​
result = send_smg.apply_async(args=["小红"], eta=now_stamp + timedelta(seconds=10))
print(result.id)
task_list.append(result.id)
​
# 异步监控任务
while task_list:
    task_id = task_list[0]
    async_result = AsyncResult(id=task_id, app=app)
    if async_result.successful():
        result = async_result.get()
        # 会从数据库中删除
        # async_result.forget()
        print(result)
        # 任务执行完成从任务列表中移除
        task_list.remove(task_id)
    elif async_result.failed():
        print("执行失败")
    elif async_result.status == "PENDING":
        print("任务等待被执行")
    elif async_result.status == "RETRY":
        print("任务异常正在重试")
    elif async_result.status == "STARTED":
        print("任务正在执行")
    else:
        print("任务其他的状态")
    time.sleep(1)

使用beat执行定时任务

文件结构

| application.py
│ product_task.py
├─celery_task
│  │  task1.py
│  │  task2.py

执行任务

#  task1.py
import time
from application import app
​
​
@app.task
def send_email(name):
    print(f"向{name}发送邮件...")
    time.sleep(5)
    print(f"向{name}发送邮件完成")
    return "ok"
​
# task2.py
import time
from application import app
​
​
@app.task
def send_smg(name):
    print(f"向{name}发送短信...")
    time.sleep(5)
    print(f"向{name}发送短信完成")
    return "ok"
​

应用

# application.py
import celery
​
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
​
app = celery.Celery("test", backend=backend, broker=broker,
                    include=["celery_task.task1", "celery_task.task2"])
​
​
"""
celery  -A application worker -l info -P gevent
"""

发布任务

# product_task.py
from celery.schedules import crontab
​
​
from application import app
​
# 发布定时任务
app.conf.beat_schedule = {
    # 10秒执行一次
    "each10s_task": {
        "task": "celery_task.task1.send_email",
        "schedule": 10,
        "args": ("小明", )
    },
    # 一分钟执行一次
    "each1m_task": {
        "task": "celery_task.task2.send_smg",
        "schedule": crontab(minute="*/1"),
        "args": ("小红", )
    }
}
​
"""
celery  -A product_task beat
"""
crotab语法规则

crontab的语法规则格式:

代表意义分钟小时日期月份命令
数字范围0~590~231~311~120~7需要执行的命令

周的数字为 0 或 7 时,都代表“星期天”的意思。

另外,还有一些辅助的字符,大概有下面这些:

  1. 每分钟定时执行一次规则: 每1分钟执行: */1 * * * 或者 * * * * 每5分钟执行: */5 * * * *

  2. 每小时定时执行一次规则: 每小时执行: 0 * * * *或者0 */1 * * * 每天上午7点执行:0 7 * * * 每天上午7点10分执行:10 7 * * *

  3. 每天定时执行一次规则: 每天执行 0 0 * * *

  4. 每周定时执行一次规则: 每周执行 0 0 * * 0

  5. 每月定时执行一次规则: 每月执行 0 0 1 * *

  6. 每年定时执行一次规则: 每年执行 0 0 1 1 *

  7. 其他例子 5 * * * * 指定每小时的第5分钟执行一次ls命令 30 5 * * * ls 指定每天的 5:30 执行ls命令 30 7 8 * * ls 指定每月8号的7:30分执行ls命令 30 5 8 6 * ls 指定每年的6月8日5:30执行ls命令 30 6 * * 0 ls 指定每星期日的6:30执行ls命令[注:0表示星期天,1表示星期1,以此类推,也可以用英文来表示,sun表示星期天,mon表示星期一等。] 30 3 10,20 * * ls 每月10号及20号的3:30执行ls命令[注:“,”用来连接多个不连续的时段] 25 8-11 * * * ls 每天8-11点的第25分钟执行ls命令[注:“-”用来连接连续的时段] */15 * * * * ls 每15分钟执行一次ls命令 [即每个小时的第0 15 30 45 60分钟执行ls命令 ] 30 6 */10 * * ls 每个月中,每隔10天6:30执行一次ls命令[即每月的1、11、21、31日是的6:30执行一次ls命令。 ]

4.3 配置文件配置参数

文件结构

| celeryApp.py │config.py |tasks.py

配置文件

# config.py
from kombu import Queue
from datetime import timedelta
​
broker_url = 'redis://192.168.1.200:6379/4'     # 使用Redis作为消息代理
​
result_backend = 'redis://192.168.1.200:6379/5'     # 把任务结果存在了Redis
​
task_serializer = 'msgpack'     # 任务序列化和反序列化使用msgpack方案
​
result_serializer = 'json'  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
​
result_expires = 60 * 60 * 24   # 任务过期时间
​
accept_content = ['json', 'msgpack']    # 指定接受的内容类型
​
​
# 使用beat进程自动生成任务
​
beat_schedule = {
​
    'print_datetime': {
        'task': 'proj.tasks.print_datetime',
        'schedule': timedelta(seconds=10),
        'args': ()
​
    },
    'add': {
            'task': 'proj.tasks.add',
            'schedule': timedelta(seconds=60),
            'args': (1, 3)
​
        }
}
​

应用

# celeryApp.py
from celery import Celery
​
​
app = Celery('celery', include=["proj.tasks"])
​
app.config_from_object("proj.config")
​
​
"""
1.启动worker任务
celery -A proj.celeryApp worker -l info -P gevent
2.启动beat 任务
celery -A proj.celeryApp beat
"""
​

任务

# tasks.py
​
import time
import datetime
​
from proj.celeryApp import app
​
​
@app.task
def add(x, y):
    time.sleep(10)
    print("{} + {} = {}".format(x, y, x+y))
    return x + y
​
​
@app.task
def print_datetime():
    print(datetime.datetime.now().strftime("%Y%m%d %H%M%S"))

4. 4 django 中使用celery

项目结构

│  manage.py
├─djang_demo
│  │  settings.py
│  │  urls.py
│  │  wsgi.py
└─my_celery
    │  config.py
    │  main.py
    │  __init__.py
    ├─sms
    │  │  tasks.py
# config.py
​
broker_url = 'redis://192.168.1.200:6379/4'     # 使用Redis作为消息代理
​
result_backend = 'redis://192.168.1.200:6379/5'     # 把任务结果存在了Redis
# main.py
import celery
​
app = celery.Celery("django")
app.config_from_object("my_celery.config")
​
# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["my_celery.sms", ])
​
​
"""
celery -A my_celery.main worker -l info -P gevent
"""

# tasks.py
import time
from my_celery.main import app
​
@app.task
def send_email(name):
    print(f"向{name}发送邮件...")
    time.sleep(5)
    print(f"向{name}发送邮件完成")
    return "ok"

  1. 启动celery celery -A my_celery.main worker -l info -P gevent

  2. 启动django python manage runserver

  3. 访问127.0.0.1:8000/test/ 触发异步任务

使用可视化工具

pip install flower
celery -A application.app flower 
# 访问
http://127.0.0.1:5555/
​

标签:异步,task,celery,队列,py,Celery,result,print,import
From: https://blog.csdn.net/weixin_43413871/article/details/136824962

相关文章

  • 使用FastAPI+SQLAlchemy+Redis+Celery 编写一个完整的用户登录验证API
    使用PyQt5+FastAPI+SQLAlchemy+Redis+Celery做一个登录注册页(三)本文将介绍用PyQt5+FastAPI+SQLAlchemy+Redis+Celery做的一个登录注册页,使用邮箱接收验证码,本文介绍是前后端分离的实现方式,厚后端使用FastAPI+SQLAlchemy+Redis+Celery,你可以将PyQt5改为PySide2以获得更宽松......
  • C++ Stacks(堆栈) 和 Queues(队列)的基本用法
    一、栈1.栈的定义        栈(stack)是限定仅在表的一端进行插入和删除操作的线性表,允许插入和删除的一端称为栈顶(stacktop),另一端称为栈底(stackbottom),不含任何数据元素的栈称为空栈。        如图1-1所示,栈中有三个元素,插入元素(也称为入栈、进栈、压......
  • 求助!!!!Django+Celery异步执行神经网络时出错
    跪求大佬我使用了Django编写服务器,然后再配置celery用于执行异步程序。我的设想是,服务器接收机器学习训练请求,然后同步然后接收成功信息,随后异步执行机器学习训练。但是目前celery接收到任务以后不执行,图1-图4是我使用flower监控celery的窗口,图5-图6是异步代码(这里我同步执行过,没......
  • Celery在django中的应用
    Celery在Django中的应用这个文档描述了当前稳定的Celery版本(5.3)。有关开发文档,请访问这里。Django入门指南在Django中使用Celery注意:Celery的早期版本需要单独的库来与Django协同工作,但自3.1版本起就不再是这样了。现在Django已经得到了原生支持,因此这份文档只包含了集成Cele......
  • Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?
    随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将Oracle数据库同步到Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例来强调实时性在业务场......
  • 栈和队列章节课后习题答案集锦
    目录4.34.44.54.84.94.114.124.3#include<stdio.h>#include<stdlib.h>//定义栈的最大容量#defineMAX_SIZE100//定义栈结构typedefstruct{chardata[MAX_SIZE];inttop;}Stack;//初始化栈voidinitStack(Stack*s){s->top=-1;......
  • js 异步发展过程
    yieldreturnpromoisenext参数next方法可以带一个参数,该参数就会被当作上一个yield表达式的返回值next01_callbackhellletsayhello=function(name,callback){setTimeout(function(){console.log(name);callback();},1000);};sayhell......
  • STM32 HAL库基于F103系列之异步通信
    硬件资源串口1(PA9/PA10连接在板载USB转串口芯片CH340C上面) 原理图USB转串口硬件部分的原理图 程序设计USART/UART异步通信配置步骤1、配置串口工作参数  HAL_UART_Init()2,串口底层初始化  HAL_UART_MspInit()   配置GPIO、NVIC、CLOCK等3,开启串口异步接......
  • 【消息队列开发】 虚拟主机设计——操作队列
    文章目录......
  • 深入理解Redux:异步Action的实现与应用
    深入理解Redux:异步Action的实现与应用在Redux中,异步操作是常见的需求之一,例如在处理网络请求或定时任务时。为了实现异步操作,Redux提供了一种称为“异步action”的机制。在这篇博客中,我们将重点介绍Redux中异步action的使用方法。什么是异步Action?在Redux中,ac......