首页 > 其他分享 >luffy学习-10

luffy学习-10

时间:2022-11-16 22:27:34浏览次数:55  
标签:10 task celery res app py 学习 任务 luffy

一、celery介绍及快速使用

1. celery:翻译过来叫芹菜,它是一个  分布式的异步任务   框架
2. celery有什么用
   1. 完成异步任务:可以提高项目的并发量,之前开启线程做,现在使用celery做
   2. 完成延迟任务
   3. 完成定时任务
3. 架构
   消息中间件:broker 提交的任务(函数)都放在这里,celery本身不提供消息中间件,需要借助于第三方:redis,rabbitmq
   任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数
   结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq

1、celery官网:ttp://www.celeryproject.org/

2、介绍:celery(芹菜)是一个资金很少的项目,所以我们不支持微软Windows。 请不要打开任何与该平台相关的问题 

3、celery是独立的服务(我自己单整,跟别人没关系,跟django没有必然的联系)

  • 可以不依赖任何服务器,通过自身命令,启动服务
  • celery服务为其他项目服务提供异步解决任务需求的

注意:会有两个服务同时运行,一个是项目服务,一个celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

比喻说哈

人是一个独立运行的服务(django)
医院也是一个独立运行的服务(celery)

正常情况下,人可以完成所有健康情况的动作,不需要医院的参与,但当人生病时,就会被医院接收,解决人的生病问题。
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

老规矩先安装:pip3 install celery

使用步骤

  • 写一个main.py实例化得到app对象,写函数、加了装饰器 @task才能是celery的任务、注册成celery的任务

同步任务

#####main.py#####

from celery import Celery
import time

backend = 'redis://127.0.0.1:6379/1'  # 任务结果
broker = 'redis://127.0.0.1:6379/0'  # 中间件
app = Celery('test', backend=backend, broker=broker)


# 写任务,任务就是函数,加个装饰器,变成celery的任务
@app.task
def add(a, b):
    time.sleep(2)  # 假设任务耗时比较久
    print(a + b)
    return a + b


#####s1.py#####
from main import add

print('hello world')
# 执行同步任务
# res=add(3,4)
# print(res)
  • 在别的程序中:提交任务——>提交到 broker
    • add.delay(1,2)——>放进来需要传的参数
  • 启动 worker,从 broker中取出任务执行,执行完放到 backend
  • Windows
    • 切记cd到路径下,不要整错了 
    • celery worker -A main -l info -P eventlet  # 4.x及之前用这个
    • celery -A main worker -l info -P eventlet  # 5.x及之后用这个
  • 林、mac
    • celery worker -A main -l info
    • celery -A main worker -l info
  • 在backend中查看任务执行的结果(两种)
    • 直接看

 

    • 通过代码查看

get_result.py

# 查询执行完的结果
from main import app

from celery.result import AsyncResult

id = '68df8669-1f1d-48cc-a0ca-075dc7da2a3a'  # 任务的id号
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  # 80
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

二、celery包结构

老规矩:这一串封装写成一个celery的包,以后在任意项目中,想用的话直接把包导过去使用即可

项目目录

  celery_task

    -__init__.py

    -celery.py

    -user_task.py

    -user_task.py

  add_task.py

  get_result.py

使用步骤

  • 新建包:celery_task
  • 在包里新建一个:celery.py(py文件必须叫这个)
  • 在里面写app的初始化
  • 在包里新建user_task.py    编写用户相关任务
  • 在包里新建home_task.py 编写首页相关任务
  • 其他程序,提交任务,调用.delay
  • 启动worke——>它先启动后启动都可以,有任务就运行没有任务就等待,(要在包所在的目录下)
    • celery -A celery_task worker -l info -P eventlet
  • 查看任务执行的结果了

celery_task/celery.py

一定不要忘了include

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

celery_task/home_task.py

from .celery import app
@app.task
def add(a, b):
    time.sleep(5)
    print('计算结果是:%s' % (a + b))
    return a + b

celery_task/user_task.py

import time
from .celery import app
@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print('短信发送成功:%s,验证吗是%s' % (mobile, code))
    return True

add_task.py

from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('15647697835','1234')
print(res)  

get_result.py

# 查询执行完的结果
from celery_task.celery import app

from celery.result import AsyncResult

id = '68df8669-1f1d-48cc-a0ca-075dc7da2a3a'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

三、celery异步任务、延迟任务、定时任务

 异步任务

  • 任务.delay(参数,参数)

延迟任务

  • 任务.apply_async(args=[参数,参数],eta=时间对象(utc时间))

定时任务

1 app的配置文件中配置 
app.conf.beat_schedule = {
    'send_sms_task': {
        'task': 'celery_task.user_task.send_sms',
        'schedule': timedelta(seconds=5),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('15647697835', '1234'),
    },
    'add_task': {
        'task': 'celery_task.home_task.add',
        'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周三12:10
        'args': (10, 20),
    }
}
-2 启动worker :干活的人
celery -A celery_task worker -l info -P eventlet
-3 启动beat :提交任务的人
celery -A celery_task beat -l info

四、django中使用celery 

# 使用步骤:
	1 把写好的包复制到项目路径下
    2 在包内的celery.py 的上面加入代码
        import os
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
        import django
        django.setup()
    3 在django的视图类中,导入,提交任务
    4 启动worker,beat

 五、秒杀逻辑 

# 前端:
	1 秒杀按钮
    2 事件:向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他
    handleClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
        if (res.data.code == 100) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                res => {
                  if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 102) {
                    //什么事都不干
                  }
                }
            )
          }, 5000)


        }
      })
    }
    
    
# 后端:
	1 秒杀接口
    	提交秒杀任务
        def seckill(request):
            # 提交秒杀任务
            res = seckill_task.delay()
            return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
    2 查询是否秒杀成功的接口
    	根据用户传入的id,查询任务是否成功
        def get_result(request):
            task_id = request.GET.get('id')
            res = AsyncResult(id=task_id, app=app)
            if res.successful():
                result = res.get()  # 7
                return JsonResponse({'code': 100, 'msg': str(result)})
            elif res.failed():
                print('任务失败')
                return JsonResponse({'code': 101, 'msg': '秒杀失败'})
            elif res.status == 'PENDING':
                print('任务等待中被执行')
                return JsonResponse({'code': 102, 'msg': '还在排队'})

 六、双写一致性

给首页轮播图接口+缓存

好处:

  • 提交了接口的响应速度
  • 提高并发量
class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 缓存里有
            print('我在走缓存,速度变快了')
            return APIResponse(result=result)
        else:
            # 去数据库拿
            print('我在走数据库,速度变慢了')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # {code:100,msg:成功,result:[{},{}]}
            cache.set('banner_list', result)
            return res

加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致

双写一致性问题

  1. 修改mysql数据库,删除缓存  【缓存的修改是在后】
  2. 修改数据库,修改缓存    【缓存的修改是在后】
  3. 定时更新缓存   ---》针对于实时性不是很高的接口适合定时更新

给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题【会存在不一致的情况,但是我们是定时,举例子最多可能就1个小时,1天这种的】——>定时任务,celery的定时任务

celery定时任务实现双写一致性

home_task.py

@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    # ser 中得图片,没有前面地址
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery,py

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    }
}
  • 启动django
  • 启动worker
  • 启动beat

第一次访问:查的数据库(慢点)放入了缓存,以后不管走多少次,也是走缓存

但是如果mysql数据改了,缓存可能不一致

我们引用定时任务定时更新,最终保持了一致

标签:10,task,celery,res,app,py,学习,任务,luffy
From: https://www.cnblogs.com/zzjjpp/p/16896547.html

相关文章

  • 1010002504-软件工程基础Y-实验一 吕书海个人项目报告模板
    《软件工程基础》上机实验报告撰写要求 一、 纸张与页面要求采用国际标准A4型打印纸或复印纸,纵向打印。封页和页面按照下面模板书写(正文为:小四宋体1.5倍行距)。图......
  • Java学习——11.16
    今天把狂神的Java基础看完了,但Java基础没完全看完。从网上看的狂神的评价都不好,就很无语!!!,对此还想了很久。也是自己学完,感觉没啥用(和鹏哥一个天上一个地下),太杂太乱了,明......
  • 第五章第6节: 2020.06.10 智能互联网之弹性容器云与Service Mesh【六】
                                                         ......
  • Spring Boot学习笔记(1/2)
    前言serverlet服务器端小程序,第一代javaweb开发技术,基于java实现了一套用于动态网站的APITomcat\Jetty\Undertow都是Servelet容器,用来管理Servelet类jsp在html页......
  • Spring MVC学习笔记(1/2)
    SpringMVCMVC框架SpringMVC常用组件SpringMVC项目浏览器发送一个请求,若请求地址与web.xml中配置的前端控制器(DispatcherServlet)的url-pattern相匹配,则该请求......
  • oled显示屏(128*64bit)使用——stm32学习总结
    正点原子oled显示屏教程,驱动程序有些缺陷:1.正点采用的取模方式:从上到下,再从左到右,纵向8点上高位。虽然正点原子提供了取模软件,但是软件的图像取模,没有自带滤波以及色阶选......
  • markdown的学习
    标题二级标题“#”号后面需要空格才能成功三级标题四级标题字体样式美化字体美化字体美化字体美化字体列表abcabc阿苏勒拥有青铜之血,实在是太NB了......
  • 缓冲区工作原理学习和攻击
    bufferoverflow基本的汇编语言MOVEAX,EBX:把EBX中存储的内容传给EAXADDEAX,EBX:把EAX和EBX相加,最终存到第一个变量EAX中PUSHEAX:入栈操作,ESP=ES......
  • Markdown学习
     标题:三级标题四级标题 字体Hello,World!Hello,World!Hello,World!Hello,World!  引用 走向人生巅峰 分割线  图片     超......
  • Datawahle — 2022年11月组队学习 — 李宏毅机器学习 — TASK02
    学习时间:2022年11月15-2022年11月16日学习内容:李宏毅机器学习视频P3和P4学习笔记 ......