首页 > 其他分享 >Celery

Celery

时间:2023-03-05 12:45:16浏览次数:28  
标签:celery task Celery 任务 tasks my

Celery

  • Celery 是基于Python实现的模块, 用于执行异步定时周期任务的

  • celery组成结构

    #   1.app 应用 任务
    #   2.缓存 存放任务的 Broker - Backend 缓存任务和缓存任务结果 redis or RabbitMQ
    #   3.工人 Worker 执行任务
    

1.Celery简单示例

  • s1.py

    from celery import Celery
    
    # 创建一个Celery实例,这就是我们用户的应用app
    my_task = Celery('tasks', backend='redis://127.0.0.1:6379', broker='redis://127.0.0.1:6379')
    
    # 为应用创建任务,my_func1
    import time
    @my_task.task
    def my_func1(a, b, i):
        time.sleep(i)
        return a + b
    15180400904
    
  • s2.py

    from s1 import my_func1
    
    # 将任务交给Celery的Worker执行
    # 4.0版本之后不支持window,需要下载模块eventlet
    # celery worker -A s1 -l INFO -P eventlet  版本celery 4.4.7, async为python3.7关键字,在4.4.7版本中已修复
    # celery worker -A s1 -c 4 -l INFO -P eventlet  #  创建一个worker组,里面有4个员工,只能执行s1中的任务,打印 -l 中的info信息
    #     c:几个员工  -l INFO: 打印log中INFO信息  -P 更改任务处理的方式,eventlet方式运行
    
    res_id_list = []
    for i in range(10):
        # 发布任务
        res = my_func1.delay(2, 3, i)  
        # print(res)
        res_id_list.append(res)
    
    print(res_id_list)
    # 任务发布出去了,将发布任务的id存入列表(实际上是一个对象)
    # 在等待任务完成前  result.get()处于堵塞状态
    # time.sleep()设定堵塞延时 待任务完成,result.get()也完成,此时将会将结果打印
    for result in res_id_list:
        print(f'{result}的值为{result.get()}')
    
  • 监听任务:celery worker -A s1 -c 4 -l INFO -P eventlet

image-20210831091151873

  • 发布任务

image-20210831091533415

  • worker监听到发布的任务并执行

    image-20210831091838800

2.Celery项目目录

  • 在实际项目中我们运用的Celery是有规则的

image-20210831094810287

要满足这样的条件才可以哦,目录celery_tasks这个名字可以随意起,但是一定要注意在这个目录下一定要有一个celery.py这个文件
  • celery.py

    from celery import Celery 
    
    # 创建一个Celery实例
    my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379',
                     backend='redis://127.0.0.1:6379',
                     include=['celery_tasks.task_one']  # include 这个参数适用于寻找目录中所有的task
                     )
    
  • task_one.py

    from Celery_task.celery import my_task
    
    # 为我们的Celery实例创建任务
    @my_task.task
    def task_one(a, b):
        return a * b
    
  • my_celery.py

    from Celery_tasks.task_one import task_one
    
    # 发布任务
    res = task_one.delay(3, 2)
    print(res)
    
  • 启动Worker

    启动Worker的时候无需再使用文件启动,直接启动你的celery_tasks目录就行了
    它会自动找到celery文件并开始执行,这也就是为什么要创建celery名的文件
    celery worker -A celery_tasks -l INFO -P eventlet
    

    image-20210831094923090

    image-20210831094752633

3.Celery定时任务

  • celery.py
from celery import Celery
my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379',
                   backend='redis://127.0.0.1:6379',
                   include=['celery_tasks.task_one', 'celery_tasks.task_two']
                   )
  • task.two.py

    from .celery import my_task
    
    
    @my_task.task
    def task_two():
        return "task_two - 完成定时任务"
    
  • my_celery.py

    from celery_tasks import task_one, task_two
    
    import time, datetime
    # 定时任务我们不在使用delay这个方法了,delay是立即交给 task 去执行
    # 现在我们使用apply_async定时执行
    
    now_time = time.time()
    # 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法
    utc_now = datetime.datetime.utcfromtimestamp(now_time)
    # 为当前时间增加 1 天
    add_time = datetime.timedelta(days=1)
    action_time = utc_now + add_time
    # action_time 就是当前时间未来1天之后的时间
    # 现在我们使用apply_async定时执行
    res = task_two.task_two.apply_async(args=(), eta=action_time)
    print(res)
    
    
  • 发布任务后,启动worker:celery worker -A celery_tasks -l INFO -P eventlet

    image-20210831100009394

4.Celery周期任务

  • celery.py

    from celery import Celery
    from celery.schedules import crontab  # 做时间转换用的
    
    my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379',
                     backend='redis://127.0.0.1:6379',
                     include=['celery_tasks.task_one', 'celery_tasks.task_two']
                     )
    
    # 对beat任务生产做一个配置
    my_task.conf.beat_schedule = {
            "each10s_task": {
                "task": "task": "celery_tasks.task_one.task_one",  # 具体哪个任务
                "schedule": 10,  # 每10秒钟执行一次
                "args": (10, 10)  # 函数接收的参数值
            },
            "each1m_task": {
                "task": "celery_tasks.task_two.task_two",
                "schedule": crontab(minute=1),  # 每一分钟执行一次
            },
    }
    
  • #以上配置完成之后,还有一点非常重要
    # 不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方beat
    # 该生产方用于每隔一段时间创建一个任务然后交给worker去执行
    # celery beat -A Celery_task
    # celery worker -A Celery_task -l INFO -P eventlet
    
  • image-20210831102420750

    image-20210831102645658

标签:celery,task,Celery,任务,tasks,my
From: https://www.cnblogs.com/WWW-ZZZ/p/17180220.html

相关文章

  • 转载 Celery入门
    原文:https://www.cnblogs.com/pyedu/p/12461819.html一、什么是Celery1.1、celery是什么Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步......
  • celery学习
     原文:https://www.bbsmax.com/A/gVdnwBZXzW/Celery对象核心的对象就是Celery了,初始化方法:classCelery(object):def__init__(self,main=None,loader=None,back......
  • celery
     celery,实现了异步的分布式任务队列。中间是两个队列,左边是执行任务,将任务提交到中间的队列中,右边是worker,能自动发现队列中的任务。左边调用任务,右边worker就会拿到这......
  • django中使用celery,模拟商品秒杀。
    Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块 安装:pipinstallcelery#安装celery库pipinstallredis#celery依赖于......
  • Celery ValueError: not enough values to unpack (expected 3, got 0)
    一:报错描述a.window10b.pycharm+python3.6.6+django2.2+restframework3.13.1+celery5.1.2c.在调用的时候报的错,启动并没有报错d.错误截图二.错误原因没抄到,wi......
  • celery僵死导致jumpserver提示 连接WebSocket失败
    celery僵死导致jumpserver提示连接WebSocket失败Celery的任务监控位于堡垒机”作业中心“下的”任务监控“中,点击打开新的页面如下图所示刷新页面这里的status状态一......
  • Python-celery介绍与快速上手
    1.celery介绍:  celery是一个基于Python开发的模块,可以帮助我们在开发过程中,对任务进行分发和处理。            详细介绍取自:Python之celery的简介与使......
  • Celery
    一、什么是Celery1.1、celery是什么Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery的架构由三......
  • python分布式框架celery(二)
    一、什么是Celery1.1、celery是什么Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。Celery的架构由三......
  • celery架构介绍和基本使用
    一、Celery架构介绍分布式异步任务框架,可以支持大量任务的并发执行(celery服务主要为其他项目服务提供异步解决任务需求的)可以不依赖任何服务器,通过自身命令,启动服务,内......