首页 > 其他分享 >路飞项目,celery异步任务框架,介绍和安装,快速使用,包结构

路飞项目,celery异步任务框架,介绍和安装,快速使用,包结构

时间:2023-03-11 20:34:06浏览次数:60  
标签:异步 task res app celery 路飞 任务 import

celery介绍和安装

celery是什么

翻译过来就是 芹菜 的意思,跟芹菜没有关系

框架:服务,python的框架,跟django无关

能用来做什么

异步任务

定时任务

延迟任务

理解celery的运行原理

1.可以不依赖任何服务器,通过自身命令,启动服务

2.celcry服务为其他项目提供异步解决任务需求的会有两个服务为同时运行,一个是项目服务,一个是celery服务为,项目服务将需要异步处理的任务交给celery服务,celery就会再需要使用异步完成项目的需求

例如:人是一个独立的服务|医院也是一个独立运行的服务

正常情况下,人可以完成所有健康情况的动作,不需要医院的参与,但当人生病时,就会被医院接收,解决人生病问题

人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery架构(broker,backend,都用redis)

1.任务中间件broker,其他服务提交的异步任务,放在里面需要排队

需要借助于第三方redis rabbitmq

2.任务执行单元 worker 真正执行异步任务的进程

celery提供的

3.结果存储 backend 结果存储,函数的返回结果,存到backend中

需要借助于第三方redis,mysql

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期任务

celery快速使用

安装模块安装完成,会有一个可执行文件celery

pip install celery

windwos还有额外安装一个模块

pip install eventlet

1.第一步新建一个main.py

from celery import Celery
# 提交的异步任务,放在里面
broker = 'redis://127.0.0.1:6379/1'
# 执行完的结果,放在这里
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
    import time
    time.sleep(3)
    print('------',a + b)
    return a + b

image-20230309212034720

放在一起就可以了

image-20230309212920548

2.其他程序,提交任务add.py

res = add.delay(5,6)   #原来add的参数,直接放在delay中传入即可
print(res)  # f150d8a5-c955-478d-9343-f3b60d0d5bdb

3.启动worker

启动worker命令 win需要安装eventlet

win:

​ 4.x之前版本

​ celery worker -A main(与新建的一样) -l info -P eventlet

​ 4.x之后

​ celery -A main worker -l info -P eventlet

mac:

​ celery -A main worker -l info

4.worker会执行消息中间件中的任务,把结果存起来

5.我们要看执行结果,拿到执行的结果

from main import app
from celery.result import AsyncResult
id = '51611be7-4914-4bd2-992d-749008e9c1a6'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

1.新建包celery_taks(自己起什么就行)

再包下新建【必须叫celery】的py文件celery.py写代码

celery.py

from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

2.再包内部,写task,任务异步任务

order_task.py

from .celery import app
import time
@app.task
def add(a, b):
    print('-----', a + b)
    time.sleep(2)
    return a + b

user_task.py

from .celery import app
import time
@app.task
def send_sms(phone, code):
    print("给%s发送短信成功,验证码为:%s" % (phone, code))
    time.sleep(2)
    return True

3.启动worker,包所在目录下

celery  -A celery_task(就是自己起的包名)  worker -l info -P eventlet

4.其他程序,提交任务,被提交到中间件,等待worker执行,因为worker启动了,就会被worker执行

from celery_task import send_sms
res=send_sms.delay('1999999', 8888)
print(res)  # 7d39033c-4cc7-4af2-8d78-e62c277db183

5.worke执行完毕,结果存到backend中

6.查看结果

from celery_task import app
from celery.result import AsyncResult
id = '7d39033c-4cc7-4af2-8d78-e62c277db183'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

image-20230309224752551

celery执行异步任务,延迟任务,定时任务

异步任务

任务.delay(参数)

延迟任务

任务.apply_async(args=[参数],eta=时间对象)

如果没有修改时区,需要使用utc时间

定时任务

需要启动beat 和 worker

beat 定时提交任务的进程》配置在

app.conf.beat_schedule的任务

worker 执行任务的

使用步骤

第一步:在celery的py文件中写入
# 第一步:在celery的py文件中写入
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False


# celery的配置文件#####
# 任务的定时配置
app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_task.user_task.send_sms',
        # 'schedule': timedelta(seconds=3),  # 时间对象
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=9, minute=43),  # 每天9点43
        'args': ('18888888', '6666'),
    },
}
第二步:启动beat

记得在包所在的目录运行

image-20230310172203727

celery -A celery_test beat -l info
第三步:启动worker

同样需要在包所在目录运行

celery -A celery_test worker -l info -P eventlet

image-20230310171455333

image-20230310171644267

注意点
1.启动命令的执行位置,如果是包结构,一定要在包这一层
2.include=['celery_task.order_task'],路径从包名下开始导入,因为我们在包这层执行的命令

django种使用celery

补充如果在公司中,只做定时任务,还有一个更简单的框架

APSchedule:https://blog.csdn.net/qq_41341757/article/details/118759836

使用步骤:

1.把我们写的包,复制到项目目录下

-luffy_api
    -celery_task #celery的包路径
    -luffy_api  #源代码路径

2.在使用提交异步任务的位置,导入使用即可

视图函数种,导入任务

任务.delay() # 提交任务

3.启动worker,如果有定时任务,启动beat

4.等待任务被worker执行

5.在视图函数中,查询任务执行的结果

秒杀功能

秒杀逻辑分析

1.前端秒杀按钮,用户点击,发送ajax请求到后端

2.视图函数》提交秒杀任务》借助于celery,提交到中间件中了

3.当次秒杀的请求,就回去了,携带则任务id号在前端

4.前端开启定时任务,每隔3秒钟带着任务,向后端发送请求,查看是否秒杀成功

5.后端情况

​ 1任务还在等待被执行》返回给前端,前端继续每隔3s发送一次请求

​ 2任务执行完了,秒杀成功了》返回给前端,恭喜您秒杀成功》关闭前端任务定时器

​ 3任务执行完了,秒杀失败了》返回给前端,秒杀失败了》关闭前端任务定时器

视图

from celery_task.sckill_task import sckill
from celery.result import AsyncResult
from celery_task.celery import app


class SckillView(GenericViewSet):
    """秒杀视图类"""

    def list(self, request, *args, **kwargs):
        """使用定时任务,不等待直接返回任务id"""
        user_id = request.query_params.get('id')
        #异步提交任务返回任务id号
        task_id = sckill.delay(user_id)
        return APIResponse(task_id=task_id.id)

    @action(methods=['GET'], detail=False)
    def get_result(self, request, *args, **kwargs):
        task_id = request.query_params.get('task_id')

        res = AsyncResult(id=task_id, app=app)
        if res.successful():  # 执行完了
            result = res.get()  #
            return APIResponse(data=result)
        elif res.failed():
            print('任务失败')
        elif res.status == 'PENDING':
            print('任务等待中被执行')
        elif res.status == 'RETRY':
            print('任务异常后正在重试')
        elif res.status == 'STARTED':
            print('任务已经开始被执行')
        return APIResponse(code=101, msg=res.status)

任务sckill.py

from .celery import app

import random
import time


@app.task
def sckill(id):
    print(f'用户{id},正在秒杀')
    time.sleep(random.choice([6, 7, 8, 9]))
    print(f'用户{id},秒杀成功')
    return random.choice([True, False])

celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 写了一个任务不要忘记在这里注册
app = Celery('test', broker=broker, backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task',
                 'celery_task.sckill_task'
             ])

前端Sckill.vue

<template>
  <button @click="handlerSckill">点击开始秒杀</button>
</template>

<script>
export default {
  name: "Sckill",
  data() {
    return {
      task_id: '',
      t: ''
    }
  },
  methods: {
    handlerSckill() {
      this.$axios({
        url: this.$setting.BASE_URL + '/user/sckill/',
        method: 'get',
        params: {id: 1},
      }).then(res => {
        this.$message({
          message: '正在秒杀中,请稍等',
          duration: 2000,
          type: 'success'
        })
        this.task_id = res.data.task_id
        this.t = setInterval(() => {
          this.$axios({
            url: this.$setting.BASE_URL + '/user/sckill/get_result/',
            method: 'get',
            params: {task_id: this.task_id}
          }).then(res => {
            let msg
            let type
            if (res.data.code == 100) {
              if (res.data.data == true) {
                msg = '秒杀成功'
                type = 'success'

              } else {
                msg = '秒杀失败'
                type = 'error'
              }
              clearInterval(this.t)
            } else {
              msg = res.data.msg
              type = 'info'
            }

            this.$message({
              message: msg,
              duration: 1000,
              type: type
            })
          })
        }, 3000)
      })
    }
  }
}
</script>

<style scoped>

</style>

image-20230311153507903

django中使用celery

1把我们写的包,复制到项目下

-luffy_api
    -celery_task #celery的包路径
    celery.py  # 一定不要忘了一句话
    import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
-luffy_api  #源代码路径

2在使用提交异步任务的位置,导入使用即可

​ 视图函数中使用,导入任务

​ 任务.delay() 提交任务

3启动worker,如果有定时任务,启动beat

4等待任务被worker执行

5在视图函数中,查询任务执行的结果

重点celery中使用django,有时候,任务中会使用django的orm缓存,表模型。。。一定要加

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

轮播图接口加缓存

1.网站首页被访问的频率很高瞬间1w个人在访问,首页的轮播图接口会执行1w次,1w次查询轮播图标的sql在执行,轮播图基本不变

2.想一种方式,让这1w个访问,效率更高一些,不查数据库了,直接走缓存》redis》效率高

3现在的轮播图逻辑变成了

1轮播图接口请求来了,先去缓存中看,如果有,直接返回缓存中的数据,如果没有,查数据库,然后把轮播图数据,放到redis中,缓存起来

1改接口

    # 加入缓存的轮播图接口
    def list(self, request, *args, **kwargs):
        # 查看缓存有没有数据,如果没有,再走数据库
        banner_list = cache.get('banner_list')
        if banner_list:
            print('走了缓存')
            return APIResponse(data=banner_list)
        else:  # 走数据库
            print('走了数据库')
            res = super().list(request, *args, **kwargs)
            # 把序列化后的数据存到缓存中,redis中
            cache.set('banner_list', res.data)
            return APIResponse(data=res.data)

2采用继承的方式

common_cache

from rest_framework.mixins import ListModelMixin
from django.core.cache import cache
from utils.common_response import APIResponse


class CacheListModelMixin(ListModelMixin):
    cache_key = None

    def list(self, request, *args, **kwargs):
        assert self.cache_key, Exception('没有设置缓存值')
        res = cache.get(self.cache_key)
        # 首先获取redis中缓存的数据如果获取到了直接返回
        if res:
            data = res
        else:
            # 如果获取不到查询数据库中的数据放到缓存中
            res = super().list(request, *args, **kwargs)
            cache.set(self.cache_key, res.data)
            data = res.data
        return APIResponse(data=data)

views.py使用

# 采用继承的方式进行缓存数据
class BannerView(GenericViewSet, CacheListModelMixin):
    queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    serializer_class = BannerSerializer
    cache_key = 'banner_list'

双写一致性

加入缓存后,缓存中有数据,先去缓存中国拿,但是如果mysql中数据变了,缓存不会自动变化,出现数据不一致的问题

mysql和缓存数据库 数据不一直

双写一致性

写入mysql,redis没动,数据不一致存在问题

如何解决

1.修改数据,删除缓存
2.修改数据,更新缓存
3.定时更新缓存,实时性差

定时任务:celery

首页轮播图定时任务

第一步:在celery中配置定时任务

from celery import Celery
import os

# 因为要使用django的orm所有必须要加这个
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 写了一个任务不要忘记在这里注册
app = Celery('test', broker=broker, backend=backend,
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task',
                 'celery_task.sckill_task',
                 'celery_task.banner_task'
             ])

from datetime import timedelta

# 第一步:在celery的py文件中写入
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    'cache_banner': {
        'task': 'celery_task.banner_task.banner',
        'schedule': timedelta(seconds=3),  # 时间对象
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 'schedule': crontab(hour=9, minute=43),  # 每天9点43
        # 'args': ('18888888', '6666'),
    },
}

第二步启动worker,启动beat记得在包所在的目录启动

banner代码

from .celery import app
from home.models import Banner

from django.core.cache import cache
from home.serializer import BannerSerializer

from django.conf import settings


@app.task
def banner():
    # 获取轮播图数据
    banner_list = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    res = BannerSerializer(instance=banner_list, many=True)
    # 存到缓存中

    # 如果有request对象了他会在rest_framework>fields文件中的FileFields中的to_representation调用request.build_absolute_uri(url)
    # 对这个文件url进行修改,如果没有则不会修改
    # 这里因为是没有request对象所以不会自动帮我们进行拼接url地址要我们自己进行拼接
    # 所以这里需要我们自己进行拼接
    for data in res.data:
        data['image'] = settings.BASE_URL + data['image']
    cache.set('banner_list', res.data)
    return True

标签:异步,task,res,app,celery,路飞,任务,import
From: https://www.cnblogs.com/clever-cat/p/17206873.html

相关文章