首页 > 编程语言 >学习python-89

学习python-89

时间:2022-11-17 22:36:43浏览次数:45  
标签:celery task python res py 学习 任务 89 result

今日学习内容

celery介绍

它是一个异步任务提交的框架

作用

  1. 完成异步任务:提高项目的并发量。之前开启线程做,现在用celery做。
  2. 完成延迟任务
  3. 完成定时任务

架构

  • 消息中间件
    • broker提交任务也就是函数都放在消息中间件中,celery本身不提供消息中间件,需要借助于第三方的redis、rabbitmq。
  • 任务执行单位
    • worker,真正执行任务的地方,一个个进程执行函数。
  • 结果存储
    • backend,函数return的结果都在这里,celery本身不提供结果存储,借助于第三方redis、数据库、rabbitmq。

img

image

image

celery快速使用

  • celery官网:http://www.celeryproject.org/

  • 介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform

  • celery 是独立的服务

    1. 可以不依赖任何服务器,通过自身命令,启动服务

    2. celery服务为其他服务提供异步解决任务需求的

      • 注意:会有两个服务同时运行,一个是项目服务,一个celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要异步完成项目的需求。

      • 比如:人相当于一个独立运行的服务(django),医院也是一个独立运行服务(celery)

        正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但人生病时,就会被医院接收,解决人生病问题。

        人生病的处理方案交给医院来解决,所有人不生病时,医院正常独立运行,人生病时,医院就来解决人生病的需求。

  • 安装pip3 install celery

  • 使用步骤

    1. 写一个main.py:实例化得到app对象,写函数,任务,注册成celery的任务

    2. 别的程序中:提交任务---》提交到broker中去

    3. 启动worker从broker中取出任务执行,执行完放到backend中

      • win:
            celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
            celery -A main worker -l info -P eventlet  # 5.x及之后用这个
            
        lin, mac:
            celery worker -A main -l info
            celery -A main worker -l info
            
        注意:main 是我们当前跑项目的包名 celery_task
        
        对于 执行 worker命令 需要在该项目文件夹下执行。
        
      • backend中查看任务执行的结果

      • 直接在redis desktop manager

      • from main import app
        from celery.result import AsyncResult
        id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'
        if __name__ == '__main__':
            res = AsyncResult(id=id, app=app)
            if res.successful():
                result = res.get()
                print(result)
            elif res.failed():
                print('任务失败')
            elif res.status == 'PENDING':
                print('任务等待中被执行')
            elif res.status == 'RETRY':
                print('任务异常后正在重试')
            elif res.status == 'STARTED':
                print('任务已经开始被执行') 
        

celery包结构

写一个celery的包,在任意项目中,把包copy进去,导入使用即可。

项目:

celery_task
	-__init__.py
    -celery.py
    -user_task.py
    -home_task.py
add_task.py
get_result.py

使用步骤:

  • 新建包:celery_task

  • 包里新建一个 celery.py

  • 包里写app的初始化

  • 包里新建user_task.py 编写用户相关任务

  • 包里新建 home_task.py 编写首页相关任务

  • 其他程序,提交任务

  • 启动 worker ---》它可以先启动,在提交任务之前 ---》包所在的目录下

    celery -A celery_task worker -l info -P eventlet

  • 查看任务执行的结果

celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

celery_task/home_task.py

from .celery import app
@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b

celery_task/user_task.py

import time
from .celery import app
@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print('短信发送成功:%s,验证吗是%s' % (mobile, code))
    return True

add_task.py

from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('18723345455','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker

# 查看任务执行的结果

get_result.py

# 查询执行完的结果
from celery_task.celery import app

from celery.result import AsyncResult

id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

celery异步任务,延迟任务,定时任务

异步任务:
	任务.delay(参数, 参数)

延迟任务:
	任务.apply_async(args=[参数, 参数], eta=时间对象(utc时间))
定时任务:
	1.定时任务配置
    2.启动worker:真正干活的人
    	celery -A celery_task worker -l info -P eventlet
    3.启动beat:提交任务的人
    	celery -A celery_task beat -l info

django中使用celery

使用步骤

  1. 把写好的包赋值到项目路径下

  2. 把包内的celery.py的上面加入代码

    import os
            os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api1.settings.dev')
    import django
    django.setup()
    
  3. 在django视图类中导入,提交任务。

  4. 启动worker,beat

秒杀逻辑

前端

  1. 秒杀按钮

  2. 事件:想后端秒杀接口发送请求,发送完立马起了一个定时任务,每隔5s,向后端查看一下是否秒杀成功。如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗提示。

     handleClick() {
          this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
            if (res.data.code == 100) {
              let task_id = res.data.id
              this.$message({
                message: res.data.msg,
                type: 'error'
              });
              // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
              let t = setInterval(() => {
                this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                    res => {
                      if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                        alert(res.data.msg)
                        // 销毁掉定时任务
                        clearInterval(t)
                      } else if (res.data.code == 102) {
                        //什么事都不干
                      }
                    }
                )
              }, 5000)
    
    
            }
          })
        }
       
    

后端

  1. 秒杀接口

    • 提交秒杀任务

      def seckill(request):
                  # 提交秒杀任务
                  res = seckill_task.delay()
                  return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
      
  2. 查询是否秒杀成功接口

    • 根据用户传入的id,查询任务是否成功

      def get_result(request):
                  task_id = request.GET.get('id')
                  res = AsyncResult(id=task_id, app=app)
                  if res.successful():
                      result = res.get()  # 7
                      return JsonResponse({'code': 100, 'msg': str(result)})
                  elif res.failed():
                      print('任务失败')
                      return JsonResponse({'code': 101, 'msg': '秒杀失败'})
                  elif res.status == 'PENDING':
                      print('任务等待中被执行')
                      return JsonResponse({'code': 102, 'msg': '还在排队'})
                  
      

双写一致性

接口加缓存

  • 首页轮播图接口,加缓存
  • 提交了接口的响应速度
  • 提高并发量
class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 缓存里有
            print('走了缓存,速度很快')
            return APIResponse(result=result)
        else:
            # 去数据库拿
            print('走了数据库,速度慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # {code:100,msg:成功,result:[{},{}]}
            cache.set('banner_list', result)
            return res
  • 加了缓存,如果mysql数据变了,由于请求的是缓存的数据,导致mysql和redis的数据不一致。
  • 双写一致性问题
    1. 修改mysql数据库,删除缓存===》缓存的修改在后
    2. 修改数据库,修改缓存===》缓存的修改在后
    3. 定时更新缓存 ===》针对于实时性不是很高的接口适合定时更新
  • 给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题
  • 如果存在不一致的情况,可以先忽略。执行celery的定时任务

celery定时任务实现双写一致性

home_task.py

@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    # ser 中得图片,没有前面地址
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery.py

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    }
}

步骤:

  • 启动django=》启动worker=》启动beat
  • 第一次访问,先查数据库,然后放入缓存
  • 以后再访问,就先走缓存,一旦mysql数据改了,缓存可能就不一样了。
  • 定时更新可以始终保持一致。

标签:celery,task,python,res,py,学习,任务,89,result
From: https://www.cnblogs.com/bjyxxc/p/16901242.html

相关文章

  • 学习python-Day90
    一、课程板块相关分析以及创建建表思路:有个课程表通过类型字段区分划分不同课,所以这三种课的表字段是是不一样的。一种课设计一张表FreeCourse 免费课程Course 实......
  • JDBC学习笔记
    JDBC学习笔记一.前阶段0.0前章HTMLCSSJS负责结构,表现,行为服务端Tomcat有关的XML语言(可拓展性),可以自定义标签,用于写配置文件的服务器Tomcat的组件Servlet......
  • STA学习记录4-输入输出路径约束
    @目录1输入路径约束2输出路径约束参考1输入路径约束由于STA不能检查不受约束路径上的时序约束,因此需要约束所有路径来进行时序分析当然,如果存在一些输入控制信号,我们......
  • 【C++】如果你准备学习C++,并且有C语言的基础,我希望你能简单的过一遍知识点。
    相关视频——黑马程序员匠心之作|C++教程从0到1入门编程,学习编程不再难_哔哩哔哩_bilibili(1-83)我的小站——半生瓜のblog我知道这个视频早已经被很多人学习并且记录​笔记,......
  • C++学习------cstdint头文件的源码学习02---宏定义
    续接上文讲解常见整型的上下限这里事先定义了如下整形的上下限占用位数有符号/无符号上限下限8有符号127-1288无符号255016有符号32767-3276816无符号65535032有符号2147483......
  • std::move() 学习
    转自:https://stackoverflow.com/questions/28595117/why-can-we-use-stdmove-on-a-const-object1.右值int a = 10;左值是有固定的内存地址,&a即左值的地址,我们可以把&......
  • Selenium4+Python3系列(七) - Iframe、Select控件、交互式弹出框、执行JS、Cookie操作
    前言突然,想把所有之前未更新的常用Api操作、演示写出来,算是对API的一种完结吧。下面按照Api模块来做逐一介绍。一、iframe操作iframe识别:语法:driver.switch_to.fram......
  • 【SSL 1589】汉明距离(NTT)
    汉明距离题目链接:SSL1589题目大意给你两个字符串A,B,保证B的长度大于等于A,且两个字符串都只有0,1构成。然后问你B所有跟A相同长度的字符串中,跟A有不用字符......
  • Rust库学习-cipher(简单使用)
    介绍cipher是Rust的一个密码库实践Cargo.toml[dependencies]aes="0.8.2"base64="0.13.1"cipher="0.4.3"main.rsuseaes::cipher::{generic_array::Generic......
  • 面向高效网络渗透测试的强化学习
    一、本文的贡献本文提出并且评估了一个基于人工智能的PT系统————IAPTS,系统利用RL技术来学习和再现PT活动。该模块集成了工业框架,能够在未来类似的测试用例中捕获信息......