首页 > 其他分享 >【2022-11-16】luffy项目实战(九)

【2022-11-16】luffy项目实战(九)

时间:2022-11-16 22:39:02浏览次数:55  
标签:11 task 16 res app py celery 2022 import

一、celery介绍

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

它是一个任务队列,专注于实时处理,同时还支持任务调度。

# celery是独立的服务	

    1. 可以不依赖任何服务器,通过自身命令,启动服务
    2. celery服务为为其他项目服务提供异步解决任务需求的
    注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

    人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
    正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求


# celery 的优点

简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件

高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务

如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。

快速:一个单进程的celery每分钟可处理上百万个任务

灵活: 几乎celery的各个组件都可以被扩展及自定制

# celery有什么用?

    1. 完成异步任务:可以提高项目的并发量,之前开启线程做,现在使用celery做
    2. 完成延迟任务
    3. 完成定时任务
    
# celery的架构

    消息中间件:broker 提交的任务(函数)都放在这里,celery本身不提供消息中间件,需要借助于第三方:redis,rabbitmq
    任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数
    结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq
  
# 官方地址:https://docs.celeryq.dev/en/latest/getting-started/first-steps-with-celery.html

二、celery快速使用

# 安装celery
	pip install celery
    
# 安装eventlet
	pip install eventlet

2.1 test_celery/main.py

import time

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'  # 结果存储
broker = 'redis://127.0.0.1:6379/2'  # 消息中间件
app = Celery('test', backend=backend, broker=broker)  # 实例化得到app对象


@app.task()   # 写个函数,并使用装饰器装饰为celery的任务
def add(x, y):
    time.sleep(3)
    print(x + y)
    return x + y

2.2 test_celery/test1.py

from main import add

print('hello word')

res = add.delay(4, 4)
print(res)

2.3 test_celery/get_result.py

from main import app
from celery.result import AsyncResult

task_id = '1256a2d2-777c-4838-924f-f40ebe36ae2f'    # 执行任务,需要有任务id
if __name__ == '__main__':
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()  # 7
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

2.4 启动worker测试

   启动worker,从broker中取任务执行,执行完放到backend中
    	win:    
            celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
            celery -A main worker -l info -P eventlet  # 5.x及之后用这个
        lin,mac: 
            celery worker -A main -l info
        	celery -A main worker -l info
   再backend中查看任务执行的结果

三、celery的包结构

project
    ├── celery_task  		# celery包
    │   ├── __init__.py 	# 包文件
    │   ├── celery.py   	# celery连接和配置相关文件,且名字必须叫celery.py
    │   └── user_task.py	# 任务函数
    |	└── home_task.py
    ├── add_task.py  		# 添加任务
    └── get_result.py   	# 获取结果

3.1 celery_task/celery.py

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'

app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.home_task', 'celery_task.user_task'])

3.2 celery_task/user_task.py

import time
from .celery import app


@app.task
def send_sms(mobile, code):
    time.sleep(3)
    print('短信发送成功:%s,验证码是%s' % (mobile, code))
    return True

3.3 celery_task/home_task.py

from .celery import app
import time


@app.task
def add(a, b):
    time.sleep(3)
    print('计算结果是:%s' % (a + b))
    return a + b

3.4 add_task.py

from celery_task.user_task import send_sms

# 提交了一个发送短信异步任务
res = send_sms.delay('18723345455', '9999')
print(res)  # 672237ce-c941-415e-9145-f31f90b94627

# 任务执行,要启动worker  celery -A celery_task worker -l info -P eventlet

3.4 get_result.py

# 查询执行完的结果
from celery_task.celery import app

from celery.result import AsyncResult

task_id = '8bb50a78-7af8-4152-9d44-566432cb1277'    # 执行任务,需要有任务id

if __name__ == '__main__':
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        print(result)
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

四、celery异步任务、延迟任务、定时任务

# 添加异步任务

任务名.delay(参数,参数)

# 添加延迟任务

from datetime import datetime, timedelta
	
eta = datetime.utcnow() + timedelta(seconds=10)   # eta时间对象
res = add.apply_async(args=(200, 50), eta=eta)
print(res)

# 添加定时任务

1. 在celery.py中添加如下配置:

    from datetime import timedelta
    from celery.schedules import crontab

    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False

    # 定时任务

    app.conf.beat_schedule = {
        'send_sms_task': {
            'task': 'celery_task.user_task.send_sms',
            'schedule': timedelta(seconds=5),
            # 'schedule': crontab(hour=20, day_of_week=3),  # 每周三晚八点
            'args': ('1538680821', '7777'),
        },
        'add_task': {
            'task': 'celery_task.home_task.add',
            'schedule': crontab(hour=20, minute=20, day_of_week=3),  # 每周三晚八点
            'args': (10, 20),
        }
    }
    
2. 启动worker
	celery -A celery_task worker -l info -P eventlet

3. 启动beat
	celery -A celery_task beat -l info

五、Django集成celery

# 使用步骤:
    1. 把写好的celery_task包复制到项目路径下
    2. 在包内的celery.py中的最上面加入以下代码
        import os
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
        import django
        django.setup()
        
    3. 在视图类中写一个发送短信的任务函数
    from celery_task.user_task import send_sms
	def index(request):
        mobile = request.GET.get('mobile')
        res = send_sms.delay(mobile, '9999')
        print(res)
        return HttpResponse('短信发送成功')
    
    4. 在user_task.py中加入以下代码
    from .celery import app
    from libs.tx_sms import send_sms_by_phone
    from user.models import UserInfo

    @app.task
    def send_sms(mobile, code):
        send_sms_by_phone(mobile, code)
        user = UserInfo.objects.all().filter(mobile=mobile).first()
        print('给用户%s发送短信成功,手机号为:%s,验证码为%s' % (user.username, mobile, code))
        return True
    
    5. 把路由配置好,然后启动worker测试即可

六、秒杀逻辑

6.1 seckill.vue

<template>
  <div>
    <img
        src="https://gimg2.baidu.com/image_search/src=http%3A%2F%2Fpic.616pic.com%2Fys_bnew_img%2F00%2F65%2F81%2FEQq9UR90Hr.jpg"
        alt="" height="300px" width="300px">
    <el-button type="danger" plain @click.once="handlerClick">点击秒杀</el-button>
  </div>

</template>

<script>
export default {
  name: "Seckill",
  methods: {
    handlerClick() {
      this.$axios.get(this.$settings.BASE_URL + 'userlogin/seckill/').then(res => {
        if (res.data.code == 200) {
          let task_id = res.data.id
          this.$message({
            message: res.data.msg,
            type: 'error'
          });
          // 写一个定时任务,每隔5秒向后端查询一下是否秒杀成功
          let t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'userlogin/get_result/?id=' + task_id).then(
                res => {
                  if (res.data.code == 200 || res.data.code == 201) {
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                  } else if (res.data.code == 202) {
                  }
                }
            )
          }, 5000)

        }
      })
    }
  }
}
</script>

<style scoped>

</style>

6.2 index.js

import Vue from 'vue'
import VueRouter from 'vue-router'
import HomeView from '../views/HomeView.vue'
import Seckill from "@/views/Seckill";

Vue.use(VueRouter)

const routes = [
    {
        path: '/',
        name: 'home',
        component: HomeView
    },
    {
        path: '/seckill',
        name: 'seckill',
        component: Seckill
    },

]

const router = new VueRouter({
    mode: 'history',
    base: process.env.BASE_URL,
    routes
})

export default router

6.3 celery_task/user_task.py

import time
import random
from .celery import app

@app.task
def seckill_task():
    time.sleep(8)  # 模拟秒杀需要5秒
    res = random.choice([True, False])
    if res:
        return '恭喜您,秒杀成功'
    else:
        return '秒杀结束,很遗憾您没有秒杀到'

6.4 user/views.py

from celery_task.user_task import seckill_task

# 1.提交一个秒杀任务
def seckill(request):
    res = seckill_task.delay()
    return JsonResponse({'code': 200, 'msg': '秒杀进行中,您正在排队', 'id': str(res)})


# 2.查询是否秒杀成功

from celery.result import AsyncResult
from celery_task.user_task import app


def get_result(request):
    task_id = request.GET.get('id')
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        return JsonResponse({'code': 200, 'msg': str(result)})
    elif res.failed():
        print('任务失败')
        return JsonResponse({'code': 201, 'msg': '秒杀失败'})
    elif res.status == 'PENDING':
        print('任务等待中被执行')
        return JsonResponse({'code': 202, 'msg': '还在排队'})

七、双写一致性

7.1 接口加缓存

# 首页轮播图接口加缓存,提高了接口的响应速度和并发量

class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 去redis缓存拿
            return APIResponse(result=result)
        else:
            # 去mysql数据库拿
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # {code:100,msg:成功,result:[{},{}]}
            cache.set('banner_list', result)
            return res
# 增加缓存,如果mysql数据变了,那么由于请求的都是缓存的数据,就会导致mysql和redis的数据不一致,这就是双写一致性问题
# 双写一致性问题,解决方法
        1 修改mysql数据库,删除缓存,缓存的修改是在后
        2 修改数据库,修改缓存,缓存的修改是在后
        3 定时更新缓存,针对于实时性不是很高的接口适合定时更新
   

# 给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题,起一个celery的定时任务

7.2 celery定时任务实现双写一致性

celery_task/home_task.py

from home.models import Banner
from home.serializer import BannerSerializer
from django.conf import settings
from django.core.cache import cache

@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery.py

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    }
}

settings/dev.py

HOST_URL = 'http://127.0.0.1:8000'

标签:11,task,16,res,app,py,celery,2022,import
From: https://www.cnblogs.com/dy12138/p/16897780.html

相关文章