首页 > 其他分享 >celery介绍

celery介绍

时间:2024-05-16 22:43:59浏览次数:28  
标签:task app 介绍 celery 任务 result import

接口缓存

1 查询所有接口(带过滤)--》每次都要去查询,性能不高

2 一旦查出来,下次还用这个数据的话--数据存放在缓存中---》直接给

3 首页轮播图

class BannerView(GenericViewSet, APIListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               0:settings.BANNER_COUNT]  # 过滤排序
    serializer_class = Bannerserializer

    # 接口缓存
    def list(self, request, *args, **kwargs):
        banner_list = cache.get('banner_list')
        if not banner_list:
            # 缓存中没有数据,走数据库
            res = super().list(request, *args, **kwargs)
            banner_list = res.data.get('results')  # 列表类型
            # 放到django的缓存中
            cache.set('banner_list', banner_list)
        return APIResponse(results=banner_list)

1.1带缓存的mixin类

 1 class CacheAPIListModelMixin(ListModelMixin):
 2     cache_key = None
 3 
 4     def list(self, request, *args, **kwargs):
 5         assert self.cache_key, APIException('如果继承CacheAPIListModelMixin,必须要加类属性')
 6         # 1  先去缓存中取,根据key
 7         results = cache.get(self.cache_key)
 8         if not results:
 9             logger.info('走了数据库----')
10             res = super().list(request, *args, **kwargs)
11             results = res.data
12             # 2  存入缓存中
13             cache.set(self.cache_key, results)
14         return APIResponse(results=results)

视图类 home.view

1 from utils.common_mixin import CacheAPIListModelMixin
2 
3 
4 class BannerView(GenericViewSet, CacheAPIListModelMixin):
5     queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
6     serializer_class = Bannerserializer
7     cache_key = 'banner_list'

.

加入缓存后存在问题

# 1 数据从缓存拿--》如果mysql中数据改了--》缓存中一直有数据---》不会做自动更新---》缓存中的数据和mysql中的数据--》不一致了

# 2 缓存不一致;双写一致性

# 3 解决这个问题:
    -1 增,改,删 mysql--》删缓存
    -2 增,改,删 mysql--》更新缓存
    -3 定时更新缓存--》会有延迟

celery介绍

 1  0。 celery:分布式异步任务框架,celery 芹菜,吉祥物是芹菜
 2 
 3  1。 celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务
 4 
 5  2。 celery 是一个专注于实时处理的任务队列,支持任务调度
 6 
 7  3。 celery 是开源的,有很多的使用者
 8 
 9  4。 celery 完全基于 Python 语言编写
10 
11  5。 所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow
12 
13  6。 celery 只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。
  因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。官方推荐的是消息队列 RabbitMQ,我们使用 Redis 14 15 16 ### celery能做什么### 17 1 定时任务 18 2 异步任务 19 3 延迟任务

1.2celery 使用场景

1 异步任务
    -一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。
    -视频转码、邮件发送、消息推送等等
2 定时任务
    -定时推送消息、定时爬取数据、定时统计数据等

3 延迟任务
    -提交任务后,等待一段时间再执行某个任务

1.3 Celery官网

# 1 开源地址(源码)
https://github.com/celery/celery
    
# 2  官网
https://docs.celeryq.dev/en/stable/
    
# 3 最新版本
Celery (5.4)

#4 python支持
Celery version 5.3 runs on
Python ❨3.8, 3.9, 3.10, 3.11❩

#5 Django支持
Celery 5.3.x supports Django 2.2 LTS or newer versions. Please use Celery 5.2.x for versions older than Django 2.2 or Celery 4.4.x if your Django version is older than 1.11

1.4 celery架构

# Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:

# 1 Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

# 2 Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

# 3 Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。

# 4 Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。

# 5 Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。



实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。
我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。

 。

celery快速使用

1.1安装

# 0 创建Python项目

# 1 创建虚拟环境

# 2 安装celery pip install celery

# 3 安装redis(消息队列和结果存储使用redis) pip install redis

# 4 安装eventlet(win 平台,如果是mac,linux不需要) pip install eventlet

1.2快速使用

celery_demo.py--主文件

main.py

 1 from celery import Celery
 2 import time
 3 
 4 # 创建一个Celery实例
 5 broker = 'redis://127.0.0.1:6379/1'
 6 backend = 'redis://127.0.0.1:6379/2'
 7 app = Celery('celery_test', broker=broker, backend=backend)
 8 
 9 
10 # 定义任务
11 @app.task
12 def add(n, m):
13     time.sleep(2)
14     print('n+m的结果:%s' % (n + m))
15     return n + m
16 
17 
18 @app.task
19 def send_email(mail='1922517453@qq.com'):
20     print('模拟发送延迟--开始')
21     time.sleep(2)
22     print('模拟发送延迟--结束')
23     return '邮件发送成功:%s' % mail

add_task.py--提交异步任务

# 提交任务文件
from main import add, send_email

# 同步执行任务
# res = add(3, 4)
# print(res)

# 2. 异步执行任务,提交到broker消息队列
res = add.delay(3, 4)
print(res)  # 返回任务id号 02358487-658e-4815-8f23-2da1ac919898

1.3查看提交的任务

 

1.4让worker执行任务

1 # 通过命令启动worker
2 # win启动
3 celery -A main worker -l info -P eventlet
4 # mac linux
5 celery -A main worker -l info 

 

1.5 结果存储查看结果


# 1 直接看redis 有数据

# 2 通过代码,拿到结果

from main import app from celery.result import AsyncResult id = '5a8d1d68-1963-4771-9889-839bd954a37b' if __name__ == '__main__': result = AsyncResult(id=id, app=app) if result.successful(): result = result.get() print(result) elif result.failed(): print('任务失败') elif result.status == 'PENDING': print('任务等待中被执行') elif result.status == 'RETRY': print('任务异常后正在重试') elif result.status == 'STARTED': print('任务已经开始被执行')

celery包结构

 

 1 celery.py
 2 
 3 from celery import Celery
 4 import time
 5 broker = 'redis://127.0.0.1:6379/1'
 6 backend = 'redis://127.0.0.1:6379/2'
 7 # 1 创建app对象
 8 app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task'])
 9 
10 ------------------------------------------------
11 
12 user_tasks.py
13 
14 from .celery import app
15 import time
16 # 用户相关任务
17 @app.task
18 def add(x,y):
19     time.sleep(3)
20     return x+y
21 
22 --------------------------------------------------
23 order_task.py
24 
25 from .celery import app
26 # 订单相关任务
27 # 下单成功,发送短信
28 @app.task
29 def send_sms(mobile,code):
30     print(f'手机号:{mobile},下单成功')
31     return True
32 
33 -------------------------------------

4.2 提交任务add_task.py

1 from celery_task.order_task import send_sms
2 
3 # 1  同步调用
4 # res=send_sms('111111',888)
5 # print(res)
6 
7 # 2 提交到任务队列被 worker执行
8 res = send_sms.delay('18896982015', 9999)
9 print(res)  # 282fd101-1047-427e-a4d6-867c36ec70ee

查看任务结果get_result.py

 

from celery_task.celery import app
from celery.result import AsyncResult

id = '282fd101-1047-427e-a4d6-867c36ec70ee'
if __name__ == '__main__':
    result = AsyncResult(id=id, app=app)
    if result.successful():
        result = result.get()
        print(result)
    elif result.failed():
        print('任务失败')
    elif result.status == 'PENDING':
        print('任务等待中被执行')
    elif result.status == 'RETRY':
        print('任务异常后正在重试')
    elif result.status == 'STARTED':
        print('任务已经开始被执行')

启动worker

# 包所在目录下,启动worker

  -celery -A celery_task worker -l debug -P eventlet

 

执行异步--延迟--定时

 1 # 1 刚刚学的是:异步任务
 2     任务名.delay()
 3     
 4 # 2 延迟任务
 5     任务名.apply_async
 6     from celery_task.user_task import add
 7     from datetime import datetime, timedelta
 8     # datetime.utcnow() 取utc时间---》默认使用utc时间--》修改配置文件做中国时区
 9     eta=datetime.utcnow() + timedelta(seconds=30)
10     res=add.apply_async(args=[5,6],eta=eta)
11     
12 # 3 定时任务:配置文件
13     3.1 配置文件  在celery.py里面操作
14     # 时区
15     app.conf.timezone = 'Asia/Shanghai'
16     # 是否使用UTC
17     app.conf.enable_utc = False
18 
19     # 任务的定时配置
20     from datetime import timedelta
21     from celery.schedules import crontab
22     app.conf.beat_schedule = {
23         'add-task': {
24             'task': 'celery_task.user_task.add',
25             'schedule': timedelta(seconds=3),
26             # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
27             'args': (300, 150),
28         },
29         'send-sms-task': {
30             'task': 'celery_task.order_task.send_sms',
31             # 'schedule': timedelta(seconds=30),
32             'schedule': crontab(hour=11,minute=20),  # 每天11点20执行
33             'args': ('189232222',888),
34         },
35     }
36     3.2 启动beat
37     celery -A celery_task beat -l debug
38     3.3 启动worker

django中使用celery

通用方案

 1 # 1 把之前写的celery包,放到项目根路径中
 2 
 3 # 2 在视图函数中提交任务
 4 # from libs.tx_sms import get_code,send_sms as sms
 5 from celery_task.order_task import send_sms as sms1
 6 from celery_task.user_task import add
 7 class CeleryView(APIView):
 8     def get(self, request, *args, **kwargs):
 9         ## 1异步发送短信
10         mobile=request.query_params.get('mobile')
11         code=get_code()
12         # 使用celery做异步,提交任务
13         res=sms1.delay(mobile,code)
14         return APIResponse(msg=f'短信已发送,{str(res)}')
15 
16 
17         ## 2 异步计算
18         # x=request.query_params.get('x')
19         # y=request.query_params.get('y')
20         # # res=add(x,y)# -->请求要等:3s多
21         # res=add.delay(x,y)# -->请求直接返回
22         # return APIResponse(msg=str(res))
23         
24 # 3 启动worker
25 # 4 运行django,正常使用接口即可
26 
27 ======================================================
28 # 必须加这行代码,以后才能用django的配置
29 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffytest.settings.dev')

celery官方方案(新)

 

标签:task,app,介绍,celery,任务,result,import
From: https://www.cnblogs.com/liuliu1/p/18196897

相关文章

  • 鸿蒙HarmonyOS实战-Stage模型(服务卡片介绍和运行机制)
    ......
  • APS 介绍
    高级计划和调度软件的四个特点任务时间完成准确性 -了解完成任务需要多长时间非常重要。虽然您的设计和成本工程师将了解任务将花费多长时间,但这些时间总体上只是估计值。为了确保获得最佳结果,您需要通过车间数据收集系统的典型使用从车间收集实际数据。高级计划和调度(APS)软......
  • E-PS和E-SBAS技术介绍
    合成孔径雷达(InSAR)干涉叠加技术可以测量地表形变。SARscape提供了永久散射体(PS-InSAR)干涉测量和短基线(SBAS-InSAR)方法,PS方法测量PS点的形变,SBAS方法测量分布式散射体DS的形变。随着技术的发展,在这一领域取得了许多研究进展,SARscape5.7版本开始,提供了能够同时提取PS和DS测量值......
  • 主流原型工具介绍
    AxureRP设计方法:AxureRP提供了强大的交互设计功能,允许用户创建具有复杂交互的高保真原型。它支持脚本和动态内容,可以模拟真实应用的用户体验。适用场景:适合需要详细说明产品功能和交互的复杂项目。Sketch设计方法:Sketch以其简洁的界面和强大的矢量编辑能力而闻名,主要被用......
  • 原型设计工具介绍
    在原型设计领域,除了Axure和墨刀之外,还有其他一些主流的原型设计工具,如即时设计、AdobeXD、Figma等。下面将对这几种工具进行详细介绍和比较。Axure:Axure是一款功能强大的原型设计工具,主要特点包括:丰富的交互元素库:Axure提供了大量的交互元素和组件,用户可以快速创建复杂的交......
  • 主流原型设计工具介绍
    一、原型设计介绍产品原型可以概括的说是整个产品面市之前的一个框架设计。以网站注册作为例子,整个前期的交互设计流程图之后,就是原形开发的设计阶段,简单的来说是将页面的模块、元素、人机交互的形式,利用线框描述的方法,将产品脱离皮肤状态下更加具体跟生动的进行表达。原型设......
  • 主流原型设计工具介绍Axure RP,Sketch,墨刀
    AxureRP工具特点:AxureRP是一款功能强大的原型设计工具,它可以帮助用户创建交互式的网站和应用程序原型。它提供了多种预设组件和模板,用户可以根据自己的需求进行修改。此外,AxureRP还支持多人协作,方便团队内部沟通和合作。使用方法:(1)在AxureRP中创建新项目。(2)选择所需的组......
  • 软件开发与创新-原型设计工具Figma介绍
    Figma是什么?Figma是一个基于浏览器的协作式UI设计工具,从推出至今越来越受到UI设计师的青睐,也有很多的设计团队投入了Figma的怀抱,接下来我会带大家深入了解Figma,以及Figma都有什么优点。Figma能干什么?UI设计Figma是为UI设计而生的设计工具,除了有和Sketch一样基......
  • apisix~authz-keycloak插件介绍
    参考:https://apisix.apache.org/docs/apisix/plugins/authz-keycloak/kc插件源码梳理及原理说明如果只是进行keycloak颁发的token进行校验(签名校验和有效期校验),那么我们可以使用jwt-auth这个插件实现,并且已经对这个插件进行二次开发,支持jwt内容解析与向下请求头的传递。作用......
  • 原型设计工具介绍
    本次介绍两个原型设计工具:Axure与墨刀一、AxureAxure是一种功能强大的原型设计工具,被广泛用于创建高保真度的界面原型、用户流和交互动画。它具有丰富的交互组件库,用户可以轻松创建复杂的交互流程,包括状态转换、动态面板和条件逻辑等。Axure还支持团队协作和版本控制,多人可以......