首页 > 其他分享 >luffy项目(十)

luffy项目(十)

时间:2022-11-16 20:22:35浏览次数:36  
标签:task 项目 res app celery 任务 luffy import

今日内容概要

  • celery快速使用
  • celery包结构
  • celery异步任务,延迟任务,定时任务
  • django中使用celery
  • 秒杀逻辑
  • 双写一致性

今日内容详细

celery快速使用

1 celery官网:http://www.celeryproject.org/
2 介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform
3 celery是独立的服务
    1)可以不依赖任何服务器,通过自身命令,启动服务
    2)celery服务为其他项目服务提供异步解决任务需求的
    注意:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,ceclery就会在需要时异步完成项目的需求
"""
人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
    正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""

使用步骤

-安装:pip3 install celery
-写一个main.py :实例化得到app对象,写函数,任务,注册成celery的任务
    from celery import Celery
    backend='redis://127.0.0.1:6379/1' # 结果存储
    broker='redis://127.0.0.1:6379/0'  # 消息中间件
    app=Celery('main',broker=broker,backend=backend)

    # 写任务,任务就是函数,加个装饰器,变成celery的任务
    @app.task
    def add(a, b):
        time.sleep(2)  # 假设任务耗时比较久
        print(a + b)
        return a + b
-再别的程序中:提交任务--->提交到broker中去了
    add.delay(3,4)

-启动worker,从broker中取任务执行,执行完放到backend中
    win:
        celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
        celery -A main worker -l info -P eventlet  # 5.x及之后用这个
    lin,mac:
        celery worker -A main -l info  # 4.x及之前用这个 
        celery -A main worker -l info  # 5.x及之后用这个
-在backend中查看任务执行结果
    直接在图形化界面看
    通过代码查看
    from main import app
        from celery.result import AsyncResult
        id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'  # 表示你提交任务唯一的uuid
        if __name__ == '__main__':
            res = AsyncResult(id=id, app=app)
            if res.successful():
                result = res.get()  #7
                print(result)
            elif res.failed():
                print('任务失败')
            elif res.status == 'PENDING':
                print('任务等待中被执行')
            elif res.status == 'RETRY':
                print('任务异常后正在重试')
            elif res.status == 'STARTED':
                print('任务已经开始被执行')

celery包结构

写一个celery的包,以后在任意项目中,想使用celery,把包copy进去,导入使用即可

目录

celery_task
    -__init__.py
    -celery.py
    -user_task.py
    -home_task.py
add_task.py
get_result.py

使用步骤

-新建包:celery_task
-在包先新建一个 celery.py
-在里面写app的初始化
-在包里新建user_task.py 编写用户相关任务 
-在包里新建home_task.py 编写首页相关任务 
-其它程序,提交任务
-启动worker --->它可以先启动,在提交任务之前--->包所在的目录下
    celery -A celery_task worker -l info -P eventlet
-查看任务执行的结果

celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

celery_task/home_task.py

from .celery import app
@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b

celery_task/user_task.py

import time
from .celery import app
@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print('短信发送成功:%s,验证吗是%s' % (mobile, code))
    return True

add_task.py

from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('18723345455','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker
# 查看任务执行的结果

get_result.py

# 查询执行完的结果
from celery_task.celery import app
from celery.result import AsyncResult

id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

celery异步任务,延迟任务,定时任务

# 异步任务
    任务.delay(参数,参数)
# 延迟任务
    任务.apply_async(args=[参数,参数], eta=时间对象(utc时间))
    # eta 时间对象 5s后的时间对象
    from datetime import datetime, timedelta

    # 得到5s后的utc时间,celery默认使用utc时间
    eta = datetime.utcnow() + timedelta(seconds=5)
    # local_eta = datetime.now() + timedelta(seconds=5)  东八区时间
    res=add.apply_async(args=(200, 50), eta=eta)
    print(res)
# 定时任务
    -1 app的配置文件中配置 
        app.conf.beat_schedule = {
            'send_sms_task': {
            'task': 'celery_task.user_task.send_sms',
            'schedule': timedelta(seconds=5),
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': ('1897334444', '7777'),
            },
            'add_task': {
                'task': 'celery_task.home_task.add',
                'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周一早八点
                'args': (10, 20),
            }
        }
    -2 启动worker :干活的人
        celery -A celery_task worker -l info -P eventlet
    -3 启动beat :提交任务的人
        celery -A celery_task beat -l info

django中使用celery

# 使用步骤
1 把写好的包复制到项目路径下
2 在包内的celery.py的上面加入代码
    import os
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
    import django
    django.setup()
3 在django的视图类中导入,提交任务
4 启动worker,beat

# 注意:
1 task可以写到不同的app中,注意在celery.py 中include的时候,路径要对
2 可以把celery运行在多台机器上--->完整的项目copy到机器上--->启动worker不启动djagno

秒杀逻辑

事件:向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他

前端

<template>
  <div>
    <img src="" alt="" height="300px" width="300px">
    <br>
    <el-button type="danger" plain @click.once="handleClick">秒杀</el-button>
  </div>
</template>

<script>
export default {
  name: "Seckill",
  methods: {
    handleClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
        if (res.data.code == 100) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                res => {
                  if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 102) {
                    //什么事都不干
                  }
                }
            )
          }, 5000)
        }
      })
    }
  }
}
</script>
<style scoped>
</style>

后端

1 编写秒杀任务
import time
from .celery import app
import random

@app.task
def seckill_task():
    time.sleep(6)  # 模拟秒杀需要3s
    res = random.choice([True, False])
    if res:
        return '秒杀成功'
    else:
        return '很遗憾,您没有秒到'
2 编写秒杀接口(提交秒杀任务)
from celery_task.user_task import seckill_task
from celery_task.celery import app
from celery.result import AsyncResult
from django.http import JsonResponse

def seckill(request):
    # 提交秒杀任务
    res = seckill_task.delay()
    return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})

3 查询是否秒杀成功的接口(根据传入的id,查询任务是否成功)
def get_result(request):
    task_id = request.GET.get('id')
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()  # 7
        return JsonResponse({'code': 100, 'msg': str(result)})
    elif res.failed():
        print('任务失败')
        return JsonResponse({'code': 101, 'msg': '秒杀失败'})
    elif res.status == 'PENDING':
        print('任务等待中被执行')
        return JsonResponse({'code': 102, 'msg': '还在排队'})

双写一致性

# 首页轮播图接口,加缓存,提交了接口的响应速度,提高并发量
# 加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致
# 双写一致性问题
    -1 修改mysql数据库,删除缓存  【缓存的修改是在后】
    -2 修改数据库,修改缓存    【缓存的修改是在后】
    -3 定时更新缓存   --->针对于实时性不是很高的接口适合定时更新

# 给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题【会存在不一致的情况,我们可以忽略】--->定时任务,celery的定时任务

接口加缓存

class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 缓存里有
            print('走了缓存,速度很快')
            return APIResponse(result=result)
        else:
            # 去数据库拿
            print('走了数据库,速度慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # {code:100,msg:成功,result:[{},{}]}
            cache.set('banner_list', result)
            return res

celery定时任务实现双写一致性

home_task.py

from .celery import app
from home.models import Banner
from django.conf import settings
from home.serialzier import BannerSerializer
from django.core.cache import cache

@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    # ser 中得图片,没有前面地址
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery.py

from datetime import timedelta

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    }
}

标签:task,项目,res,app,celery,任务,luffy,import
From: https://www.cnblogs.com/wwjjll/p/16897380.html

相关文章

  • 223201062524赵中垚-软件工程基础Y- 实验二 结对项目报告
    沈阳航空航天大学软件工程基础实验报告实验名称:实验二实验题目:结对项目专业软件工程学号223201062524姓名赵中垚袁显利指导教师孟桂英成......
  • 使用 Nginx 如何部署 web 项目
    第一步:前往Nginx官方下载Nginx资源包,建议下载Stableversion(长期稳定版本)   第二步:将Nginx压缩包解压到本地目录中(D:\Tools)   第三步:进入到已经解压......
  • 如何开发一款即时通讯软件?看这六个项目就够了
     即时通讯软件(IM)发展到今天功能已经越来越齐全,我们的日常生活中不管是社交、网上购物还是工作都已经离不开即时通讯软件。今天小编就为大家推荐六个即时通讯的开源项目,分别......
  • 程序员必学的项目管理知识-敏捷开发
    敏捷开发的目的敏捷开发的目的是快速响应市场需求,举个例子,如果一个产品的开发周期为一年,如果等到尽善尽美再上线,那么时间周期是非常漫长的,在这个过程中,用户需求也会发生很......
  • 谷粒学院(一)项目介绍
    文章目录​​一、项目背景​​​​二、项目商业模式​​​​三、系统模块​​​​四、技术架构​​一、项目背景在线教育顾名思义,是以网络为介质的教学方式,通过网络,学员与教......
  • 前端项目开发规范
    工具配置各项配置显而易见,工具能辅助人发现很多潜在问题;非常必要引入依赖:husky、lint-staged、eslint、prettier,使得可以从流程上,保证项目的代码风格统一,规避部分错误,且......
  • 关于SpringBoot项目中Jar包使用外部配置文件的问题
     SpringBoot项目的jar包在有外部配置文件的情况下会优先调用外部配置文件外部配置文件的优先级依次是执行命令的目录下的config目录(jar包同一目录下的config目录)jar包......
  • eclipse导入项目时lombok包(@Data、@Slf4j...)注解无效
    1、找到maven仓库lombok的jar包位置,或者在lombok官网(https://www.projectlombok.org/download)下载lombok的jar包2、打开cmd,运行javaw-jarlombok.jar(lombok的jar包全......
  • 篇(12)-Asp.Net Core入门实战-在项目中加个应用层,为多层结构建立基础
    入门实战-在项目中加个应用层,为多层结构建立基础以上11篇的演练已经简单讲清楚了asp.netcore开发的一个(表)菜单管理的小功能,感兴趣的可以自行演练其他功能,演练熟悉即可。......
  • 在Vue项目中引入公共css样式
    通常,较为复用次数高的css样式,需要提取出来同一存放在assets资源文件夹下创建css文件夹在css文件夹下创建public.less文件 public.less.wrap{......