首页 > 其他分享 >今日内容 celery的使用,秒杀逻辑

今日内容 celery的使用,秒杀逻辑

时间:2022-11-16 21:11:07浏览次数:83  
标签:逻辑 task res app celery 任务 秒杀 result

  • celery的使用

1.celery官网:http://www.celeryproject.org/
2.介绍:Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform(没钱,不支持windows)

  1. celery是独立的服务
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

安装:pip3 install celery

使用步骤:

1.写一个main.py :实例化得到app对象,写函数,任务,注册成celery的任务,再别的程序中:提交任务--->提交到broker中去了

2.启动worker,从broker中取任务执行,执行完放到backend中
命令:win

celery worker -A main -l info -P eventlet 4.X版本及之前用这个
celery -A main worker -l info -P eventlet 5.X版本及之后用这个

Linux,mac:
celery worker -A main -l info
celery -A main worker -l info

3.在backend中查看任务执行的结果
通过代码查看:

        from main import app
        from celery.result import AsyncResult
        id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'
        if __name__ == '__main__':
            res = AsyncResult(id=id, app=app)
            if res.successful():
                result = res.get()  #7
                print(result)
            elif res.failed():
                print('任务失败')
            elif res.status == 'PENDING':
                print('任务等待中被执行')
            elif res.status == 'RETRY':
                print('任务异常后正在重试')
            elif res.status == 'STARTED':
                print('任务已经开始被执行')
  • celery包结构

写一个celery的包,以后在任意项目中,想用直接把包copy进去导入使用即可

项目:
        celery_task
            -__init__.py
            -celery.py
            -user_task.py
            -home_task.py
        add_task.py
        get_result.py

使用步骤

    -新建包:celery_task
    -在包先新建一个 celery.py
    -在里面写app的初始化
    -在包里新建user_task.py 编写用户相关任务 
    -在包里新建home_task.py 编写首页相关任务 
    -其它程序,提交任务
    -启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
    	celery -A celery_task worker -l info -P eventlet
    -查看任务执行的结果了

celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

celery_task/home_task.py

from .celery import app
@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b

celery_task/user_task.py

import time
from .celery import app
@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print('短信发送成功:%s,验证吗是%s' % (mobile, code))
    return True

add_task.py

from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('18723345455','9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker

# 查看任务执行的结果

get_result.py

# 查询执行完的结果
from celery_task.celery import app

from celery.result import AsyncResult

id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  #7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')
  • celery异步,延迟,定时任务

# 异步任务
	任务.delay(参数,参数)
# 延迟
	任务.apply_async(args=[参数,参数],eta=时间对象(utc时间))
# 定时
    # 定时任务配置
        -1 app的配置文件中配置 
        	app.conf.beat_schedule = {
                'send_sms_task': {
                    'task': 'celery_task.user_task.send_sms',
                    'schedule': timedelta(seconds=5),
                    # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                    'args': ('1897334444', '7777'),
                },
                'add_task': {
                    'task': 'celery_task.home_task.add',
                    'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周一早八点
                    'args': (10, 20),
                }
            }
        -2 启动worker :干活的人
        	celery -A celery_task worker -l info -P eventlet
        -3 启动beat :提交任务的人
        	celery -A celery_task beat -l info

  • django中使用celery

使用步骤:
  1 把写好的包复制到项目路径下
  2 在包内的celery.py 的上面加入代码
  3 在django的视图类中,导入,提交任务
  4 启动worker,beat

        import os
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
        import django
        django.setup()
  • 秒杀逻辑

前端
1.秒杀按钮

2.事件:向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他

    handleClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
        if (res.data.code == 100) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                res => {
                  if (res.data.code == 100 || res.data.code == 101) {  //秒杀结束了,要么成功,要么失败了
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 102) {
                    //什么事都不干
                  }
                }
            )
          }, 5000)


        }
      })
    }

后端
1.秒杀接口,提交秒杀任务

        def seckill(request):
            # 提交秒杀任务
            res = seckill_task.delay()
            return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})

2.查询是否秒杀成功的接口,根据用户传入的id,查询任务是否成功

        def get_result(request):
            task_id = request.GET.get('id')
            res = AsyncResult(id=task_id, app=app)
            if res.successful():
                result = res.get()  # 7
                return JsonResponse({'code': 100, 'msg': str(result)})
            elif res.failed():
                print('任务失败')
                return JsonResponse({'code': 101, 'msg': '秒杀失败'})
            elif res.status == 'PENDING':
                print('任务等待中被执行')
                return JsonResponse({'code': 102, 'msg': '还在排队'})
  • 双写一致性

1.接口加缓存

# 首页轮播图接口,加缓存
# 提交了接口的响应速度
# 提高并发量
class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 缓存里有
            print('走了缓存,速度很快')
            return APIResponse(result=result)
        else:
            # 去数据库拿
            print('走了数据库,速度慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # {code:100,msg:成功,result:[{},{}]}
            cache.set('banner_list', result)
            return res

  因为加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,会导致mysql和redis的数据不一致

解决双写一致性问题:
   1 修改mysql数据库,删除缓存 【缓存的修改是在后】
   2 修改数据库,修改缓存 【缓存的修改是在后】
   3 定时更新缓存 ---》针对于实时性不是很高的接口适合定时更新

  给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题【会存在不一致的情况,我们可以忽略】---》定时任务,celery的定时任务

2.celery定时任务实现双写一致性

home_task.py
@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    # ser 中得图片,没有前面地址
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True
celery.py
app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    }
}

标签:逻辑,task,res,app,celery,任务,秒杀,result
From: https://www.cnblogs.com/tai-yang77/p/16897521.html

相关文章

  • 云计算基础设施的逻辑架构
    按照云计算分布式的特点,云计算基础设施可以分布在不同的地域,形成多个逻辑隔离的区域数据中心,且各区域数据中心之间通过大带宽链路实现互联,并可纳入同一套云管理平台进行管理......
  • Django Celery RabbitMQ访问被拒绝(403) ACCESS_REFUSED
    报错代码:(403)ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAI(省略) 解决方案:    在rabbitmq中注册用户具体代码实现:列出用户rabbitm......
  • 三分钟梳理TDengine安装部署的逻辑
      ​小T导读:TDengine,是涛思数据面对高速增长的物联网大数据市场和技术挑战推出的创新性的大数据处理产品,除却读写性能、存储压缩能力强大之外,还有安装简单、操作难......
  • Nginx if语法不支持if条件的逻辑与&&逻辑或|| 运算 ,而且不支持if的嵌套语法。需要借
    条件判断Nginx语法不支持if条件的逻辑与&&逻辑或||运算,而且不支持if的嵌套语法。需要借助变量来实现嵌套语法或多条件判断location/{set$flag0;if($hos......
  • 思维分析逻辑 3 DAY
    目录指标分析指标选择原则指标体系建立步骤流量分析渠道分析(从哪来)常见渠道及渠道分类渠道推广过程渠道的指标渠道分析方法转化及价值分析(经过什么?产生什么价值?)漏斗分析功......
  • 今日内容,redis数据类型操作和celery介绍
    redis的使用一.redis字符串操作redis五大数据类型:字符串,hash,列表,集合,有序集合操作字符串的方法importredisconn=redis.Redis()#1set(name,value,ex=No......
  • luffy之redis操作和celery介绍
    一、redis字符串的操作#redis的vlaue有五大数据类型:字符串,列表,hash,集合,有序集合#关于字符串的操作有17种importredisconn=redis=Redis()1set(name,valu......
  • 业务数据切割显示的去重逻辑处理
    业务数据切割显示的去重逻辑处理业务背景:根据不同的数据来源封装到相同的VO对象,在后台界面上面按照不同的多个集合对象显示出来。按照不同的条件(比如结算时间等维度来切割......
  • 数据分享|R语言逻辑回归、Naive Bayes贝叶斯、决策树、随机森林算法预测心脏病|附代码
    全文链接:http://tecdat.cn/?p=23061这个数据集可以追溯到1988年,由四个数据库组成。克利夫兰、匈牙利、瑞士和长滩。"目标"字段是指病人是否有心脏病。它的数值为整数,0=无......
  • celery使用
    celery使用1.项目布局proj|______init__.py|____celery.py|____tasks.py''' pipinstallcelery==5.2.3(最好别用最新版本,目前最新5.2.7)pip......