首页 > 其他分享 >celery

celery

时间:2023-06-29 21:23:08浏览次数:42  
标签:task app celery 任务 print import

celery介绍架构和安装:

  celery :分布式的异步任务框架,主要用来做:

   - 异步任务
    - 延迟任务
    - 定时任务---》如果只想做定时任务,可以不使用celery,有别的选择

  celery 框架,原理

1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

  人是一个独立运行的服务 | 医院也是一个独立运行的服务
  正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
  人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

  celery架构

消息中间件(broker):消息队列:可以使用redis,rabbitmq
  #提交的任务在消息中间件里存着但是没有执行 任务执行单元(worker):真正的执行 提交的任务 任务执行结果存储(banckend):可以使用mysql,redis

 

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

     安装celery

pip install Celery

  celery不支持win,所以想再win上运行,需要额外安装eventlet

windows系统需要eventlet支持:pip3 install eventlet
Linux与MacOS直接执行:
    3.x,4.x版本:celery worker -A demo -l info
    5.x版本:     celery -A demo worker -l info -P eventlet

 

celery执行异步任务

  1 在虚拟环境中装celery和eventlet

pip install Celery
pip3 install eventlet

  2 写个py文件(demo.py),实例化得到app对象,注册任务

from celery import Celery
import time

broker = 'redis://127.0.0.1:6379/1'  # 消息中间件
backend = 'redis://127.0.0.1:6379/2'  # 结果存储

app = Celery(__name__, broker=broker, backend=backend)


@app.task  # 加了app.task就成为了celery的任务
def add(a, b):
    print(a + b,'-----------')
    time.sleep(1)
    return a + b

  3 启动worker(worker监听消息队列,等待别人提交任务,如果没有就卡再这)

celery -A demo worker -l info -P eventlet    #这条命名需要在环境变量下执行才会生效

  4 别人 提交任务(add_task.py),提交完成会返回一个id号,后期使用id号查询,至于这个任务有没有被执行,取决于worker有没有启动

from demo import add

# 异步使用------>提交任务到消息中间件中,并没有执行,返回的格式是一个id号
res = add.delay(999, 999)
print(res)
# 输出结果:28c75c45-1ee8-4eb9-8d35-38e316c5d858

  5 提交任务的人,再查看结果(get_result.py)

from demo import app
# celery的包下
from celery.result import AsyncResult

id = '98655b23-883e-4376-b5db-cc90e48323cd'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 正常执行完成
        result = a.get()  # 任务返回的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

 

 

包结构celery:

   1 新建包:celery_task

   2 再包下新建 celery.py 必须叫它,里面实例化得到app对象

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'  # 消息中间件
backend = 'redis://127.0.0.1:6379/2'  # 结果存储

app = Celery(__name__, broker=broker, backend=backend,
             include=['celery_task.course_task', 'celery_task.home_task', 'celery_task.user_task'])    #include表示管理的任务

  3 新建任务py文件:user_task.py   course_task.py  home_task.py

    home_task.py中:

import time
from .celery import app


@app.task
def send_sms(mobile, code):
    time.sleep(2)
    print('%s手机号发送成功,验证码是%s' % (mobile, code))
    return True

  4 启动worker,以包启动,来到包所在路径下

 celery -A 包名 worker -l info -P eventlet
 celery -A celery_task worker -l info -P eventlet

  5 其它程序,导入任务,提交任务即可

from celery_task.home_task import send_sms

res = send_sms.delay('16655145917', '1234')
print(res)

  6 其它程序,查询结果

from celery_task.celery import app
# celery的包下
from celery.result import AsyncResult

id = '51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0'
if __name__ == '__main__':
   a = AsyncResult(id=id, app=app)
   if a.successful():  # 正常执行完成
      result = a.get()  # 任务返回的结果
      print(result)
   elif a.failed():
      print('任务失败')
   elif a.status == 'PENDING':
      print('任务等待中被执行')
   elif a.status == 'RETRY':
      print('任务异常后正在重试')
   elif a.status == 'STARTED':
      print('任务已经开始被执行')

 

celery执行延迟任务:

from datetime import timedelta
#timedelta的解释

timedelta类型为对象类型
  print(timedelta(seconds=2))   #输出结果为:0:00:02
  print(timedelta(minutes=3))   #输出结果为:0:03:00
  print(timedelta(hours=5))     #输出结果为:5:00:00
  print(timedelta(hours=3,minutes=5))    #输出结果为:3:05:00

  延迟任务实现:

from datetime import timedelta, datetime

eta=datetime.utcnow()+timedelta(seconds=5)
#函数.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=时间对象) # res=send_sms.apply_async(args=['16655145917','1234'],eta=eta) res=send_sms.apply_async(kwargs={'mobile':'16655144907','code':'1234'},eta=eta) print(res)

 

celery执行定时任务:

 

  1.在celery.py中配置:

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontab
# 定时任务配置
app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_task.home_task.send_sms',
        'schedule': timedelta(seconds=5),    #每隔5秒执行一次
        'args': ('1822344343', 8888),
    },
    'add_course': {
        'task': 'celery_task.course_task.add_coures',
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=11, minute=38),  # 每天11点35,执行
        'args': (),
    }
}

  2.启动beat,启动worker

celery  -A celery_task beat -l info 

  3.到了时间,beat进程负责提交任务到消息队列---》worker执行

 

django中使用celery:

  1 把写好的包,copy到项目根路径下

  2.在home_task.py中写任务

from .celery import app
@app.task
def add_banner():
    from home.models import Banner
    Banner.objects.create(title='测试',image='/1.png',link='3',info='xxx',orders=89)
    return '增加成功'

  3 在celery.py 中加载django配置

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

  4. 视图函数中,导入任务,提交即可

from rest_framework.views import APIView
from celery_task.home_task import add_banner
class CeleryView(APIView):
def get(self, request):
add_banner.delay()
return APIResponse()

  5. 启动worker,等待运行即可

celery -A celery_task worker -l info -P eventlet

  

接口缓存:

  给查询接口加入缓存

# 查询所有 轮播图接口
from rest_framework.viewsets import GenericViewSetfrom .serialzier import BannerSerializer
from .models import Banner
from utils.common_mixin import CommonListModelMixin as ListModelMixin
from django.conf import settings
from django.core.cache import cache
from utils.common_response import APIResponse


class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):

      '''
        1 先去缓存中查一下有没有数据
        2 如果有,直接返回,不走父类的list了(list在走数据库)
        3 如果没有,走父类list,查询数据库
        4 把返回的数据,放到缓存中
      '''

        data = cache.get('home_banner_list')
        if not data:
            print(111)
            res = super().list(request, *args, **kwargs)
            data = res.data.get('data')
            cache.set('home_banner_list', data)
        return APIResponse(data=data)

 

标签:task,app,celery,任务,print,import
From: https://www.cnblogs.com/Hao12345/p/17515230.html

相关文章

  • celery 之 celery介绍架构和安装、celery执行异步任务、包结构celery、celery执行
    目录一、celery介绍架构和安装1、celery:分布式的异步任务框架,主要用来做:2、celery框架,原理3、celery架构4、安装celery5、celery不支持win,所以想再win上运行,需要额外安装eventlet二、celery执行异步任务基本使用1再虚拟环境中装celery和eventlet2写个py文件,实例化得到app对象......
  • 11celery介绍架构和安装,celery执行异步任务,包结构celery,celery执行延迟任务和定时任务
    1celery介绍架构和安装#celery:分布式的异步任务框架,主要用来做: -异步任务-延迟任务-定时任务---》如果只想做定时任务,可以不使用celery,有别的选择#celery框架,原理1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)2)celery服务为为其他项目......
  • celery
    1Celery架构,介绍#Celery:芹菜(跟翻译没有任何关系),分布式异步任务框架,框架(跟其他web框架无关)#Celeryisaprojectwithminimalfunding,sowedon’tsupportMicrosoftWindows.Pleasedon’topenanyissuesrelatedtothatplatform.#架构 -broker:任务中间件,用户提......
  • celery介绍
    简介Celery是使用python编写的分布式任务调度框架。celery能做什么Celery是一个强大的分布式任务队列框架,它可以与Python应用程序一起使用,提供了异步任务处理和分布式消息传递的能力。以下是Celery框架的一些主要功能和用途:异步任务处理Celery可以将耗时的任务放......
  • 10redis列表操作,其他操作,redis管道,django中使用redis,django缓存,序列化json和pickle,cel
    字符串和字节转换的两种方式#字符串和字节转换的两种方式 -decode,encode-直接类型转换-bytes格式的16进制,2进制,10进制的显示#字符串需要用encode,bytes格式需要用decode,但是有时候忘了#可以直接进行强转b1=bytes(s,encoding='utf-8') print(......
  • celery笔记八之数据库操作定时任务
    本文首发于公众号:Hunter后端原文链接:celery笔记八之数据库操作定时任务前面我们介绍定时任务是在celery.py中的app.conf.beat_schedule定义,这一篇笔记我们介绍一下如何在Django系统中的表里来操作这些任务。依赖及migrate操作beat的启动表介绍手动操作定时任务1......
  • celery笔记七之周期/定时任务及crontab定义
    本文首发于公众号:Hunter后端原文链接:celery笔记七之周期/定时任务及crontab定义periodictask,即为周期,或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。在第一篇笔记的时候我们就介绍过celery的组件构成,其中有一......
  • celery笔记七之周期/定时任务及crontab定义
    本文首发于公众号:Hunter后端原文链接:celery笔记七之周期/定时任务及crontab定义periodictask,即为周期,或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。在第一篇笔记的时候我们就介绍过celery的组件构成,其中有一个......
  • celery 执行异步任务,延迟任务,定时任务
    celery执行异步任务,延迟任务,定时任务1异步任务 任务.delay(参数)2延迟任务 任务.app_async(args=[].eta=时间对象)#如果没有修改时区,需要使用utc时间3定时任务 需要启动beat和worker-beat定时提交任务进程---》配置在app.comf.beat_schedule的任务-worker......
  • celery封装与包结构
    celery封装与包结构project├──celery_task #celery包│├──__init__.py#包文件│├──celery.py#celery连接和配置相关文件,且名字必须交celery.py│└──tasks.py#所有任务函数├──add_task.py #添加任务......