首页 > 其他分享 >celery 的高级使用

celery 的高级使用

时间:2023-11-21 09:45:05浏览次数:28  
标签:task res app 高级 celery 缓存 使用 import

celery包结构

celery_task # celery包
    celery.py # celery连接和配置相关文件,且名字必须叫celery.py
    home_tasks.py # home app的异步任务
    user_task.py # user app 的异步任务
    get_result.py # 查看异步任务
    add_task.py #启动异步任务
    __init__.py # 包文件
    
### 1 创建 celery_task  包,包内部有celery.py和一堆task-->['celery_task.home_task','celery_task.user_task']

### 2 celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis
app = Celery('app', broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

###3 每个task,写自己相关的任务

### 4 启动worker
 celery -A celery_task worker -l info -P eventlet
    
### 5 提交任务
from celery_task.home_task import add
res=add.delay(3,4)
print(res)

### 6 查看结果
from celery_task.celery import app
from celery.result import AsyncResult

id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

使用celery异步发送框架

user_task.py

from .celery import app
from libs.tx_sms.sms import send_sms


# 使用celery异步框架提交验证码
@app.task
def sms_send(mobile, code):
    print('sms_send:>>>',mobile,code)
    response = send_sms(mobile, code)
    print(response)
    if response:
        return f"手机号{mobile}发送成功,验证码为{code}"
    else:
        return f"手机号{mobile}验证码发送失败,请重新尝试"

get_test.py

from celery_task.celery import app
from celery.result import AsyncResult

id = ''
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

views.py

    @action(methods=["POST"], detail=False)
    def send_sms(self, request, *args, **kwargs):
        """
            短信验证码接口
            路由:# http://127.0.0.1:8000/api/v1/user/mobile/send_sms/
        """
        mobile = request.data.get("mobile")
        code = sms.get_code(6)
        cache.set('cache_mobile_%s' % mobile, code, timeout=120)
        # 异步发送短信
        response = send_sms_celery.delay(mobile,code)
        print(response)
        return APIResponse(msg="短信已发送")

celery延迟任务和定时任务

异步任务

# 使用celery异步框架提交验证码
@app.task
def sms_send(self, mobile, code, ):
    response = sms.send_sms(mobile, code)
    if response:
        return f"手机号{mobile}发送成功,验证码为{code}"
    else:
        return f"手机号{mobile}验证码发送失败,请重新尝试"

延迟任务

@app.task
def send_sms(phone):
    time.sleep(2)
    return f'{phone}发送短信成功'

## 提交延迟任务   apply_async
# 添加延迟任务
from datetime import datetime, timedelta

print(datetime.utcnow())  # utc 时间,跟咱们差8个小时
# eta 就是 10s 后的实际
eta = datetime.utcnow() + timedelta(seconds=30)
res = send_sms.apply_async(args=(18953675221,), eta=eta)
print(res)

定时任务

#### 1 在celery.py中
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis

app = Celery(__name__, broker=broker, backend=backend, include=["celery_task.home_tasks", "celery_task.user_task"])
# 设置时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    'add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (5, 6),
    },

}


###2 启动worker
celery -A celery_task worker -l info -P eventlet
###3 启动beat(它来定时提交任务)
celery -A celery_task beat -l info

在Django中使用celery

# 通用方案

#1 把咱们之前写的包,放到项目路径下
#2 提交异步或延迟任务,导入直接提交即可
#3 只要启动worker,这些任务就会被执行
#4 如果要使用django中的东西(配置文件,缓存,orm。。。),都需要在celery.py中写
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
如果不写就会报错
#5 使用django内置东西的任务
@app.task
def cache_demo(key, value):
    cache.set(key, value)
    return '缓存成功'

接口缓存

# 1 首页获取轮播图接口--->去数据库查的--->每次来到首页,都要查一次---->轮播图变得很慢
# 2 咱们把轮播图数据,放到redis,做缓存
	-以后:只要缓存中有,就从缓存中拿(redis)
    -如果缓存中没有,从数据库查,查完再放到缓存中

# 3 查询所有的接口,都可以加缓存

轮播图接口加入缓存

from rest_framework.mixins import ListModelMixin
from django.core.cache import cache
from utils.common_response import APIResponse


class ListMixin(ListModelMixin):
    def list(self, request, *args, **kwargs):
        # 先从缓存中查看有没有banner
        banner_list = cache.get("banner")
        # 如果没有调用父类从数据库中查,并保存到缓存
        if not banner_list:
            banner = super().list(request, *args, **kwargs)
            cache.set("banner", banner.data)
            return APIResponse(result=banner.data)
        # 如果有直接返回出去
        return APIResponse(result=banner_list)

使用装饰器完成


双写一致性

# 1 接口加缓存---->mysql数据改了--->缓存数据没动--->数据不一致了
	- 有的数据,必须一致(缓存删除和修改,要在修改数据后)
        -修改,插入数据(mysql),删除缓存
        -修改,插入数据(mysql),修改缓存
        ----不合理的方案----
        删除缓存,再改数据
        
    - 有的数据,可以容忍--->实时性要求没有那么高
    	-定时更新  --->每隔5分钟更新一次
        
# 2 高级名:  双写一致性
当修改了数据库中的数据同时也需要修改缓存中的数据,如何保证数据库中和缓存中的数据一致,这就是双写一致性
解决方案:
    -改数据,删除缓存,该缓存
    -定时更新
        
# celery:发送短信,改成异步
# celery:定时更新轮播图缓存

django缓存过期时间

cache.set(self.cache_key, res.data, None)   # 永不过期

cache.set(self.cache_key, res.data)   #  不写5分钟过期

通过定时更新缓存,实现双写一致性

#####1 任务
from django.core.cache import cache
from home.models import Banner
from home.serializer import BannerSerializer
from settings.user_settings import BASE_URL, BANNER_COUNT
from celery_task.celery import app


@app.task
def update_banner():
    # 查询banner所有的数据
    banner = Banner.objects.filter(is_delete=False, is_show=True).order_by("orders")[:BANNER_COUNT]
    # 因为拿到的是Query对象,所以需要序列化
    serializer = BannerSerializer(banner, many=True)
    # 通过序列化循环出来序列化的数据
    for ser in serializer.data:
        ser["img"] = BASE_URL + "/" + ser["img"]
    cache.set('banner_list', ser.data)
    return '更新成功'

### 2 celery.py 
app.conf.beat_schedule = {
    'add': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (),
    },

}

### 3 启动worker
celery -A celery_task worker -l info -P eventlet

##  4 启动beat
celery -A celery_task beat -l info

异步秒杀逻辑

# 1 分析流程--->提高了 并发量
	同步流程:用户在前端,点击秒杀按钮---->提交请求到后端---->[扣减库存,生成订单]假设耗时--->同步操作-->10s钟处理完成,秒杀成功--->返回给前端--->如果秒杀人数过多,同步操作,不能承载更多人同时秒杀
    异步流程:用户在前端,点击秒杀按钮---->提交请求到后端---->提交一个任务[扣减库存,生成订单]假设耗时--->异步操作-->10s钟处理完成,秒杀成功--->前端再发请求查询--->如果秒杀人数过多,异步操作,10s内能承载非常多用户操作

前端页面

<template>
  <div class="home">
    <Header></Header>
    <div style="padding: 50px;margin-left: 100px">

      <h1>Go语言课程</h1>
      <img src="http://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.png"
           height="300px"
           width="300px">
      <br>
      <el-button type="danger" @click="handleSeckill">秒杀课程</el-button>
    </div>

    <br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>
    <Footer></Footer>
  </div>
</template>
<script>
import Headers from "@/components/Headers.vue";
import Footer from "@/components/Footer.vue";

export default {
  name: "SeckillView",
  data() {
    return {
      seckill_id: '',
      t: ''
    }
  },
  methods: {
    handleSeckill() {
      this.$axios.post(this.$settings.BASE_URL + 'api/v1/home/seckill/seckill/', {
        course_id: 'go语言'
      }).then(res => {
        console.log(res.data)
        if (res.data.code === 100) {
          this.$message({
            message: res.data.msg,
            type: 'warning',
            duration: 1500
          });

          // 起一个定时任务,每隔2s向后端查询一次,看是否秒杀成功
          this.seckill_id = res.data.seckill_id
          this.t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'api/v1/home/seckill/get_seckill_result/?seckill_id=' + this.seckill_id).then(res => {
              if (res.data.code === 100 || res.data.code === 101) {
                alert(res.data.msg)
                clearInterval(this.t)
                this.t = null

              } else if (res.data.code === 102) { //秒杀逻辑还没开始执行
                this.$message('等待开始秒杀');
              } else if (res.data.code === 103) {
                this.$message('正在秒杀途中');
              }
            })

          }, 2000)

        } else {
          this.$message({
            message: '服务端异常,请联系系统管理员',
            type: 'warning',
            duration: 1500
          });
        }
      })
    }
  },
  components: {
    Headers, Footer
  }
}
</script>
<style scoped>

</style>

<style scoped>

</style>

router

import Seckills from "@/views/seckills.vue";

Vue.use(VueRouter)

const routes = [
  {
    path: '/seckill',
    name: 'seckills',
    component: Seckills
  }
]

views

class SeckillView(GenericViewSet):

    @action(methods=["POST"], detail=False)
    def seckill(self, request, *args, **kwargs):
        seckill_id = request.data.get("course_id")
        res = seckill_course.delay(seckill_id)
        return APIResponse(seckill_id=str(res), msg='秒杀任务已经提交')

    @action(methods=['GET'], detail=False)
    def get_seckill_result(self, request, *args, **kwargs):
        seckill_id = request.query_params.get('seckill_id')
        a = AsyncResult(id=seckill_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='恭喜您,秒杀成功')
            else:
                return APIResponse(code=101, msg='很遗憾,您没有秒到')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(code=102, msg='暂未轮到您')
        elif a.status == 'STARTED':
            print('任务已经开始被执行')
            return APIResponse(code=103, msg='正在秒杀,请稍后')
        else:
            return APIResponse(code=104, msg='服务端错误,秒杀失败')

异步任务

@app.task
def seckill_course(seckill_id):
    time.sleep(6)
    res = random.choices([106, 109])
    if res == "106":
        return True
    elif res == "109":
        return False
    else:
        return "系统错误,请联系管理员"

标签:task,res,app,高级,celery,缓存,使用,import
From: https://www.cnblogs.com/chao0308/p/17845560.html

相关文章

  • celery介绍
    celery介绍#celery是什么?分布式异步任务框架:第三方框架,celery翻译过来是芹菜,吉祥物就是芹菜项目中使用异步任务的场景,可以使用它之前做异步,如何做?异步发送短信--->开启多线程--->不便于管理#celery有什么作用? -执行异步任务-执行延迟任务-执行定......
  • cenots7使用GPG加密文件
    1、安装gpgyum-yinstallgnupg2、生成密钥对gpg--gen-key中文翻译后输入如下:请选择您要使用的密钥种类:(1)RSAandRSA(default)(2)DSAandElgamal(3)DSA(仅用于签名)(4)RSA(仅用于签名)您的选择?1RSA密钥长度应在1024位与4096位之间。您想要......
  • go使用context.withtimtout取消一个超时操作
    3 使用context.WithTimeout:package mainimport ("context""fmt""time")func main() {timeout := 5 * time.Secondctx, cancel := context.WithTimeout(context.Background(), timeout)defer cancel()done := make(chan bool)go func() {    // ......
  • 【Redis使用】一年多来redis使用笔记md文档,第(2)篇:命令和数据库操作
    Redis是一个高性能的key-value数据库。本文会让你知道:什么是nosql、Redis的特点、如何修改常用Redis配置、写出Redis中string类型数据的增删改查操作命令、写出Redis中hash类型数据的增删改查相关命令、说出Redis中list保存的数据类型、使用StrictRedis对象对string类型数据......
  • 【Flask使用】全知识md文档,4大部分60页第3篇:状态cookie和session保持
    本文的主要内容:flask视图&路由、虚拟环境安装、路由各种定义、状态保持、cookie、session、模板基本使用、过滤器&自定义过滤器、模板代码复用:宏、继承/包含、模板中特有变量和函数、Flask-WTF表单、CSRF、数据库操作、ORM、Flask-SQLAlchemy、增删改查操作、案例、蓝图、单元测......
  • 先锋版N100-N200-N305老版本(四个M.2转接板)使用教程
    N100-N200-I3-N305先锋版专用四个M.2接口转接板。4M.2-BIOS下载地址刷机前请确认是不是如下产品,其它主板或拓展板不对情况下勿刷!PS:拓展上面的拔码预留给USB切换的,本次版本不支持USB,所有拔码开关暂时用不上,请勿操作。刷BIOS教程,请安装Ventoy制作U盘启动盘工具,把下载到的BIOS复制......
  • 先锋版N100-N200-I3-N305三选一SATA+M.2 NGFF+mSATA拓展板使用教程
    先锋版N100-N200-I3-N305拓展板3选1功能使用教程3选1BIOS下载地址刷机前请确认是不是如下产品,其它拓展板勿刷!重要提示:3选1拓展版不支持NVMe协议的硬盘,接口不对会烧,请勿插PCIE(NVMe)协议的M.2,通电情况下严禁进行拔码操作,需要拔码换硬盘一定要关机掉拔电源下进行通电情况下严禁进行......
  • 先锋版N100-N200-N305新版本(四个M.2或5个M.2转接板)使用教程
    N100-N200-I3-N305先锋版专用支持五个M.2接口转接板5个M.2-BIOS下载地址刷机前请确认是不是如下产品,其它主板或转接板不对情况下勿刷!产品图片......
  • I2C器件AT24xx的使用
    AT24xx:“xx”代表存储容量,分别为“01”、“02”、“04”、“08”、“16”,代表1k、2k、4k、8k、16k。引脚描述:串行时钟(SCL):在时钟上升沿数据写入EEPROM,时钟下降沿EEPROM的数据被读出。串行数据(SDA):双向传输,引脚开漏,需要上拉电阻。器件、页地址(A2/A1/A0):器件引脚地址。写保护(WP):接地,芯......
  • Angular 依赖注入系统里 Injection token PLATFORM_ID 的使用场景
    Angular的依赖注入系统是它的核心特性之一,它使得我们可以轻松地在应用程序的各个部分共享和管理代码。在Angular的依赖注入系统中,InjectionToken是一个特别重要的概念。InjectionToken是一个用于参数类型的标记类,它可以用来在依赖注入器中注入特定的值。在这里,我们将重点讨论......