首页 > 其他分享 >celery的使用与接口加缓存的双写一致性

celery的使用与接口加缓存的双写一致性

时间:2022-11-16 22:22:14浏览次数:38  
标签:task res app celery 缓存 print import 双写

celery快速使用

1.celery是独立的访问
-官网
http://www.celeryproject.org/
'''
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
'''
2.安装
pip3 install celery

3.使用
3.1写一个main.py,实例化得到app对象,写函数,任务,注册成celery的任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker)
@app.task
def add(a, b):
    time.sleep(2)
    print(a)
    return a + b

3.2在其他文件中提交任务到broker中
from main import add
print('1')
# 执行同步任务
# res = add(3, 4)
# print(res)
# 1	 3	7

# 执行异步任务
res = add.delay(3, 4)
print(res)

3.3启动worker,从broker中取任务执行,执行完放到backend中
'''
win:    
	celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
	celery -A main worker -l info -P eventlet  # 5.x及之后用这个
lin,mac: 
    celery worker -A main -l info
    celery -A main worker -l info
'''
3.4再backend中查看任务执行的结果(可以通过代码查看)
from main import app
from celery.result import AsyncResult

id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  # 7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构

1.写一个celery的包,再任意项目中,把包copy进去,导入使用即可
celery_task
	__init__.py
	celery.py
	user_task.py
	home_task.py
add_task.py
get_result.py

2.使用
-新建包	celery_task
-在包下新建一个	celery.py
-在里面写app的初始化
-在包里新建user_task.py编写用户相关任务 
-在包里新建home_task.py编写首页相关任务 
-其它程序,提交任务
-启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
celery -A celery_task worker -l info -P eventlet
-查看任务执行的结果了

celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker, include=['celery_task.home_task', 'celery_task.user_task'])

celery_task/home_task.py

import time
from .celery import app

@app.task
def add(a, b):
    time.sleep(2)
    print(f'计算结果为{a + b}')
    return a + b

celery_task/user_task.py

import time
from .celery import app

@app.task
def send_sms(mobile, code):
    time.sleep(1)
    print(f'短信发送成功{mobile},验证码{code}')
    return True

add_task.py

from celery_task.user_task import send_sms

# 提交了一个发送短信异步任务
res = send_sms.delay('15976725488', '6666')
print(res)  # 13c2208a-07fd-4f27-9691-ea96836a6f66
# 任务执行,要启动worker

get_result.py

from celery_task.celery import app
from celery.result import AsyncResult

id = '13c2208a-07fd-4f27-9691-ea96836a6f66'
if __name__ == '__main__':
    res = AsyncResult(id=id, app=app)
    if res.successful():
        result = res.get()  # 7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

celery异步任务,延迟任务,定时任务

1.异步任务
res = add.delay(3, 4)
print(res)

2.提交延时任务
from celery_task.home_task import add
from datetime import datetime, timedelta

# 得到10秒后的utc时间
eta = datetime.utcnow() + timedelta(seconds=10)
# args内是传给函数的参数,eta是时间对象
res = add.apply_async(args=(200, 50), eta=eta)
print(res)

3.定时任务
3.1启动worker(执行任务)
celery -A celery_task worker -l info -P eventlet
3.2启动beat(提交任务)
celery -A celery_task beat -l info
3.3在app的配置文件中配置
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms_task': {
        'task': 'celery_task.user_task.send_sms',
        'schedule': timedelta(seconds=5),  # 每5秒提交
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('1897334444', '7777'),
    },
    'add_task': {
        'task': 'celery_task.home_task.add',
        'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周三12点10分提交
        'args': (10, 20),
    }
}

django中使用celery

1.把写好的包复制到项目路径下
2.在包内celery.py中加入代码
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()

3.在django的视图类中,导入,提交任务
from .celery import app
from libs.send_tx_sms import send_sms_by_phone
from apps.user.views import UserInfo
@app.task
def send_sms(mobile, code):
    send_sms_by_phone(mobile, code)
    user = UserInfo.objects.all().filter(mobile=mobile).filter()
    print(f'给{user.username}发送短信发送成功:{mobile},验证码{code}')
    return True
4.启动worker,beat

秒杀逻辑

1.秒杀事件
向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他

2.前端
<template>
  <div>
    <el-button type="danger" plain @click="handleClick">一键秒杀</el-button>
  </div>
</template>

<script>
export default {
  name: "Seckill",
  methods: {
    handleClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
        if (res.data.code == 100) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 起个定时事件,每隔5秒向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(res => {
                  // 秒杀结束了,要么成功,要么失败了
                  if (res.data.code == 100 || res.data.code == 101) {
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 102) {
                    alert(res.data.msg)
                  }
                }
            )
          }, 5000)
        }
      })
    }
  }
}
</script>

3.后端
3.1秒杀接口
from celery_task.user_task import seckill_task
from django.http import JsonResponse
def seckill(request):
    res = seckill_task.delay()
    return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})

3.2查询是否秒杀成功接口
from celery_task.celery import app
from celery.result import AsyncResult
def get_result(request):
    task_id = request.GET.get('id')
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        return JsonResponse({'code': 100, 'msg': str(result)})
    elif res.failed():
        return JsonResponse({'code': 101, 'msg': '秒杀失败'})
    elif res.status == 'PENDING':
        return JsonResponse({'code': 100, 'msg': '还在排队'})

3.3秒杀任务
@app.task
def seckill_task():
    time.sleep(3)  # 秒杀需要3秒
    res = random.choice([True, False])  # 可能成功,可能不成功
    if res:
        return '秒杀成功'
    return '很遗憾,你没有秒到'

双写一致性

接口加缓存

首页轮播图接口,加缓存,提交了接口的响应速度、并发量

from utils.response import APIResponse
from django.core.cache import cache


class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:
            print('走缓存,速度快')
            return APIResponse(result=result)
        else:
            print('走数据库,速度慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')
            cache.set('banner_list', result)
            return res

celery定时任务实现双写一致性

1.在数据加到缓存中后,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致

2.双写一致性问题
1 修改mysql数据库,删除缓存	(缓存的修改是在后)
2 修改数据库,修改缓存	(缓存的修改是在后)
3 定时更新缓存	(针对于实时性不是很高的接口适合定时更新)
'给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题【会存在不一致的情况,我们可以忽略】---》定时任务,celery的定时任务'
  • home_task.py
from celery import Celery
from datetime import timedelta
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker, include=['celery_task.home_task'])

from celery.schedules import crontab
app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=20),  # 20秒更新轮播图
        'args': (),
    }
}

  • celery.py
from .celery import app
from home.models import Banner
from django.conf import settings
from settings.common_settings import *
from django.core.cache import cache
from home.serializer import BannerSerializer


@app.task
def update_banner():
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

标签:task,res,app,celery,缓存,print,import,双写
From: https://www.cnblogs.com/riuqi/p/16897765.html

相关文章

  • 今日内容 celery的使用,秒杀逻辑
    celery的使用1.celery官网:http://www.celeryproject.org/2.介绍:Celeryisaprojectwithminimalfunding,sowedon’tsupportMicrosoftWindows.Pleasedon’......
  • Django Celery RabbitMQ访问被拒绝(403) ACCESS_REFUSED
    报错代码:(403)ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAI(省略) 解决方案:    在rabbitmq中注册用户具体代码实现:列出用户rabbitm......
  • 浅谈HTTP缓存与CDN缓存的那点事
    HTTP缓存与CDN缓存一直是提升web性能的两大利器,合理的缓存配置可以降低带宽成本、减轻服务器压力、提升用户的体验。而不合理的缓存配置会导致资源界面无法及时更新,从而引发......
  • 分布式缓存(redis)
    分布式缓存--基于Redis集群解决单机Redis存在的问题单机的Redis存在四大问题:0.学习目标1.Redis持久化Redis有两种持久化方案:RDB持久化AOF持久化1.1.RDB持久化......
  • 今日内容,redis数据类型操作和celery介绍
    redis的使用一.redis字符串操作redis五大数据类型:字符串,hash,列表,集合,有序集合操作字符串的方法importredisconn=redis.Redis()#1set(name,value,ex=No......
  • luffy之redis操作和celery介绍
    一、redis字符串的操作#redis的vlaue有五大数据类型:字符串,列表,hash,集合,有序集合#关于字符串的操作有17种importredisconn=redis=Redis()1set(name,valu......
  • celery使用
    celery使用1.项目布局proj|______init__.py|____celery.py|____tasks.py''' pipinstallcelery==5.2.3(最好别用最新版本,目前最新5.2.7)pip......
  • celery使用报错记录
    celery使用报错记录1.ImportError:cannotimportname'current_app'from'celery'(D:\python37\lib\site-packages\celery_init_.py)PSC:\Users\lcx\Desktop\demo>......
  • vue keepAlive 不缓存
    详情参考:https://blog.csdn.net/weixin_44813666/article/details/120737881<keep-alive:include="cachedViews"exclude="Index"><router-view:key="k......
  • 浅谈包装类缓存池技术
    回顾包装类的装箱和拆箱publicclassIntegerTest{ publicstaticvoidmain(String[]args){ Integeri=newInteger(4);//手动装箱调用构造方法 Integera......