前倾回顾:
# 1 redis 是什么 非关系型数据库(存数据) key-value形式存储 5大数据类型:字符串,hash,列表,集合,有序集合 纯内存存储 # 2 为什么这么快? - 纯内存 - io多路复用网络模型 - 数据操作单线程,避免了线程切换 # 3 python 链接redis - 普通方式: conn=redis.Redis(链接参数) - 连接池[必须单例]: (django中使用数据库连接池) -创建池 - 从池就拿链接 -连接池和半连接池有什么关系? -连接池: -半连接池: -信号量 信号 Semaphore signal # 4 python 的字符串操作 -1 缓存 -qs/单个对象---》ser.data --->[{},{}]/{}---》Response(data=ser.data)--》json格式字符串 - res=super().list()【Response的对象】--->res.data--->cache.set('banner_list',res.data)---》redis--》字符串---》pickle -2 计数器(访问量,日活,月活) -3 短信验证码存储--》过期时间 -4 常用api---》常用方法 set get ....conn.get 这些方法,对应着redis的命令 conn.get get conn.set set -5 所有 1 set(name, value, ex=None, px=None, nx=False, xx=False) 2 setnx(name, value) 3 psetex(name, time_ms, value) 4 mset(*args, **kwargs) 5 get(name) 6 mget(keys, *args) 7 getset(name, value) 8 getrange(key, start, end) 9 setrange(name, offset, value) 10 setbit(name, offset, value) 11 getbit(name, offset) 12 bitcount(key, start=None, end=None) 13 bitop(operation, dest, *keys) 14 strlen(name) 15 incr(self, name, amount=1) #incrby 16 incrbyfloat(self, name, amount=1.0) 17 decr(self, name, amount=1) 18 append(key, value) # 5 hash操作 -1 缓存,计数器---》hash类型能做的,一般都用字符串顶替它 -2 常用api 1 hset(name, key, value) 2 hmset(name, mapping) # 弃用了 3 hget(name,key) 4 hmget(name, keys, *args) 5 hgetall(name) 6 hlen(name) 7 hkeys(name) 8 hvals(name) 9 hexists(name, key) 10 hdel(name,*keys) 11 hincrby(name, key, amount=1) 12 hincrbyfloat(name, key, amount=1.0) 13 hscan(name, cursor=0, match=None, count=None) 14 hscan_iter(name, match=None, count=None) # 取所有 # 6 列表 -作用:队列,栈,消息队列[分布式]--blopop阻塞式, 有顺序可重复 -常用api 1 lpush(name, values) 2 rpush(name, values) 表示从右向左操作 3 lpushx(name, value) 4 rpushx(name, value) 表示从右向左操作 5 llen(name) 6 linsert(name, where, refvalue, value) 7 r.lset(name, index, value) 8 r.lrem(name, num,value) 9 lpop(name) 10 rpop(name) 表示从右向左操作 11 lindex(name, index) 12 lrange(name, start, end) # 通过它取所有 13 ltrim(name, start, end) 14 rpoplpush(src, dst) 15 blpop(keys, timeout) 16 r.brpop(keys, timeout),从右向左获取数据 17 brpoplpush(src, dst, timeout=0) # 7 去列表所有数据---》做成生成器 # 由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要(lrange): # 1、获取name对应的所有列表 # 2、循环列表 # 但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能: import redis conn=redis.Redis(host='127.0.0.1',port=6379) def scan_list(name,count=2): index=0 while True: (列表的key,0,1) (列表的key,2,3) data_list=conn.lrange(name,index,count+index-1) if not data_list: return index+=count for item in data_list: yield item print(conn.lrange('test',0,100)) for item in scan_list('test',5): print('---') print(item) # 8 redis 其他操作 delete(*names) exists(name) keys(pattern='*') expire(name ,time) rename(src, dst) move(name, db)) randomkey() type(name) # 9 django中使用redis -通用方案: -pool.py--->得到单例的POOL -在视图函数中---》导入,使用:conn = redis.Redis(connection_pool=POOL): -第三方: -安装 -配置文件配置:带池 -导入使用 from django_redis import get_redis_connection conn = get_redis_connection() -django 的缓存--》缓存到redis中 cache.set(key,value,3) # 10 接口加缓存
今日内容
celery介绍
# celery 是什么
-翻译过来是芹菜
-官网:https://docs.celeryq.dev/en/stable/
-吉祥物:芹菜
-分布式的异步任务框架
- 分布式:一个任务,拆成多个任务在不同机器上做
- 异步任务:后台执行,不阻塞主任务
- 框架:集成到项目中
# 能干的事
-1 异步任务
-异步发邮件,短信,通知
-2 延迟任务
-延迟 几秒 再执行某个任务
-订单提交后,延迟半小时,把订单取消
-3 定时任务
-每隔多长事件 执行某个任务
- 定时更新缓存
celery架构
# django 是一个服务
# celery 是一个服务
-命令启动,就能提供服务
# 三个模块
1 broker:消息中间件,消息队列,任务中间件
-存储任务(函数):发送短信任务,统计在线人数。。。
-redis,reabbitmq 存储
-字符串形式,能把任务表示出来即可
函数名,函数参数,函数位置
2 worker:任务执行单元
-从消息队列[broker--》redis]---》取出任务执行--->程序(进程)
3 backend:结果存储 Result Stores
-任务执行完成后的结果存储在这里
-redis存储,关系型数据库。。
# 执行流程
1 其他程序---》提交任务(函数)---》任务序列化后存到celery的broker中
-redis 0
2 接下来:worker执行---》从broker中取任务--》执行
3 任务执行完后,把结果存到 bancked中
-redis 1
# 注意:
celery和其他程序是 独立运行的
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人(django)是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
celery包结构
#### 包结构 目录如下 ####### -celery_task -celery.py -user_task.py -order_task.py -goods_task.py -其他程序中提交任务 add_task_package.py -其他程序中查询结果 get_result_package.py ###具体步骤 ##################### 1 celery.py ############################## from celery import Celery #####1 实例化得到对象####### broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task']) #######2 写任务 ##########以后各种类型任务,单独写在py文件中 ###########################2 order_task.py user_task.py ############################## # order_task.py import time from .celery import app @app.task def cancel_order(order_id): time.sleep(2) return '订单:%s取消成功' % order_id ###user_task.py import time from .celery import app @app.task def send_sms(phone, code): time.sleep(1) return '手机号:%s,发送验证码:%s,成功' % (phone, code) ######## 3 其他程序,提交任务 ############################## from celery_task.user_task import send_sms res=send_sms.delay('1893424323',8888) print(res) ##### 4 启动worker########## 在包的一层,执行包,不需要具体到某个py文件了 ###win运行: pip3 install eventlet # -A celery_demo 包名----》因为他会去包下找 celery.py 中得app执行 celery -A celery_task worker -l info -P eventlet ####非win运行:mac linux celery -A celery_task worker -l info #####5 查询结果##### # 使用代码,查询结果 from celery_task.celery import app from celery.result import AsyncResult id = '46b26c73-62ae-403c-ba62-e469f2f8c69f' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): result = a.get() # hello world print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行')
celery实现异步任务,定时任务,延迟任务
###1 异步任务 任务名.delay(传参数) ### 2 延迟任务---》延迟多长事件干事 from datetime import datetime, timedelta # atetime.utcnow() 当前utc时间 eta = datetime.utcnow() + timedelta(seconds=15) res = cancel_order.apply_async(args=['10001',], eta=eta) # 15s 后执行这个任务 print(res) #### 3 定时任务--》一定要启动beat ##3.1 在celery.py 中写 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'send_sms': { 'task': 'celery_task.user_task.send_sms', 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': ('1896388888', '6666'), } } ### 3.2 启动worker celery -A celery_task worker -l info -P eventlet # 启动beat---》每个一段时间,就提交任务 celery -A celery_task beat -l info ### 3.3 等待即可
django中使用celery
# 两种方案:
-通用方案--》自己封装
-django-celery--》app---》创建出一些表
# 自己封装的通用方案
1 把封装的包:celery_task 复制到项目中
2 在celery.py 中加入
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
3 写任务,启动worker
4 在django的视图类中,异步调用即可
详细
celery.py
from celery import Celery import os # 任务里使用django的东西:缓存,表模型。。。必须加入 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev') #####1 实例化得到对象####### broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task']) #######2 写任务 ##########以后各种类型任务,单独写在py文件中 ##### 定时任务 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { }
user_task.py
import time from .celery import app from libs.send_sms import common_send_sms @app.task def send_sms(phone, code): res = common_send_sms(code, mobile=phone) if res: return '短信发送成功:%s' % phone else: return '短信发送失败:%s' % phone
views.py
def send(self, request): try: mobile = request.query_params['mobile'] code = get_code() # 生成验证码,要存一下 之前验证码放session中,现在放在缓存中 cache.set('sms_code_%s' % mobile, code, 61) # 放在缓存中,以手机号做区分 # 使用celery的异步发送短信 send_sms.delay(mobile, code) return APIResponse(msg='短信已发送') except MultiValueDictKeyError as e: raise APIException(detail='手机号必须携带') except Exception as e: # print(type(e)) raise APIException(detail=str(e))
双写一致性
# 轮播图加缓存
-出现问题:banner表中数据变了,缓存不会变
-mysql和redis数据不一致: mysql和redis双写一致性
# 双写一致性的解决方案
-1 mysql修改---》删缓存
-2 mysql修改---》改缓存
-3 定时更新---》每个5s,更新一次缓存
先删缓存,在更新mysql
先改缓存,再更新mysql
# 轮播图的接口---》使用定时更新,解决双写一致性问题
详细
############### home_task.py##################### import time from .celery import app from home.models import Banner from django.conf import settings from home.serializer import BannerSerializer from django.core.cache import cache @app.task def update_banner(): # 1 获取所有轮播图数据 queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT] # 2 序列化 ser = BannerSerializer(instance=queryset, many=True) # ser.data # 2.1 把服务端前缀拼接上 for item in ser.data: # media/banner/banner1.png, item['image'] = settings.BACKEND_URL + item['image'] # 3 放到缓存 cache.set('banner_list', ser.data) return '更新成功' ############celery.py####### app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task','celery_task.home_task']) app.conf.beat_schedule = { 'update_banner': { 'task': 'celery_task.home_task.update_banner', # 'schedule': timedelta(seconds=5), 'schedule': timedelta(minutes=3), 'args': (), }, } ##### 启动worker celery -A celery_task worker -l info -P eventlet #####启动beat celery -A celery_task beat -l info ## 以后尽管改 mysql数据,最多3分钟就会更新到最新了###
异步线程 秒杀案例
秒杀功能
- 并发量要高:承载住很多用户同时操作
-订单表
-扣减库存
- 效率要高
# 同步秒杀
-假设秒杀需要10s钟,项目并发量是3,总共5个商品要秒杀
-10s内,只有3个人能进入到系统,并且开始秒杀
前端
const routes = [ { path: '/', name: 'home', component: HomeView }, { path: '/seckill', name: 'seckill', component: SeckillView }, ]
<template> <div> <Header></Header> <div style="padding: 50px;margin-left: 100px"> <h1>Go语言课程</h1> <img src="http://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.png" height="300px" width="300px"> <br> <el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程</el-button> </div> <br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br> <Footer></Footer> </div> </template> <script> import Header from '@/components/Header' import Footer from '@/components/Footer' export default { name: "SckillView", data() { return { fullscreenLoading: false, task_id: '', t: null, } }, methods: { // ##############同步秒杀############## // handleSeckill() { // this.fullscreenLoading = true; // this.$axios({ // url: '/user/seckill/seckill/', // method: 'POST', // data: { // course_id: '99' // } // }).then(res => { // this.fullscreenLoading = false; // this.$message.success(res.msg) // }).catch(res => { // this.fullscreenLoading = false; // this.$message.error(res) // }) // } // ##############同步秒杀############## // ##############异步秒杀############## handleSeckill() { this.fullscreenLoading = true; this.$axios({ url: '/user/seckill/seckill/', method: 'POST', data: { course_id: '99' } }).then(res => { // 在排队,转圈的,还需要继续显示 this.$message.success(res.msg) this.task_id = res.task_id // 继续发送请求---》查询是否秒杀成功:1 成功 2 没成功 3 秒杀任务还没执行 // 启动定时任务,没隔1s,向后端发送一次请求 this.t = setInterval(() => { this.$axios({ url: '/user/seckill/get_result/', method: 'get', params: { task_id: this.task_id } }).then(res => { // 100 成功,success : 1 成功 0 失败 2 还没开始 if (res.success == '1') { // 转圈框不显示 this.fullscreenLoading = false; // 停止定时任务 clearInterval(this.t) this.t = null this.$message.success(res.msg) } else if (res.success == '0') { // 转圈框不显示 this.fullscreenLoading = false; // 停止定时任务 clearInterval(this.t) this.t = null this.$message.error(res.msg) } else { // this.$message.error(res.msg) console.log(res.msg) } }) }, 1000) }).catch(res => { this.fullscreenLoading = false; this.$message.error(res) }) } }, components: { Header, Footer } } </script> <style scoped> </style>
后端
#### 秒杀功能 import random from celery_task.order_task import seckill from celery_task.celery import app from celery.result import AsyncResult class SeckillView(ViewSet): #### 同步操作,性能不高 # @action(methods=['POST'], detail=False) # def seckill(self, request, *args, **kwargs): # ''' # #1 取出传入的 课程id # #2 查询课程 是否还有剩余 1s # #2.1 有剩余,开始下单扣减库存 1s # #2.2,在订单表中生成一条记录 2s # #2.3 秒杀成功返回给前端 # #3 课程没有剩余,秒杀失败,返回给前端 # ''' # course_id = request.data.get('course_id') # # # print('根据课程id:%s,查询课程是否还有剩余,耗时3s' % course_id) # time.sleep(1) # res = random.choice([True, False]) # if res: # 库存够 # print('扣减库存,耗时3s') # time.sleep(1) # print('下单,耗时4s') # time.sleep(2) # return APIResponse(msg='恭喜您秒到了') # else: # return APIResponse(code=101, msg='库存不足,秒杀失败') # 异步提交任务 @action(methods=['POST'], detail=False) def seckill(self, request, *args, **kwargs): course_id = request.data.get('course_id') task_id = seckill.delay(course_id) return APIResponse(msg='您正在排队', task_id=str(task_id)) @action(methods=['GET'], detail=False) def get_result(self, request, *args, **kwargs): task_id = request.query_params.get('task_id') a = AsyncResult(id=task_id, app=app) if a.successful(): result = a.get() # True 和 False if result: return APIResponse(success='1', msg='秒杀成功') else: return APIResponse(success='0', msg='秒杀失败') elif a.status == 'PENDING': print('任务等待中被执行') return APIResponse(success='2', msg='任务等待中被执行') else: return APIResponse(success='3', msg='秒杀任务正在执行')
路由
router.register('seckill', SeckillView, 'seckill')
任务
import random import time @app.task def seckill(course_id): print('根据课程id:%s,查询课程是否还有剩余,耗时2s' % course_id) time.sleep(2) res = random.choice([True, False]) if res: # 库存够 print('扣减库存,耗时1s') time.sleep(1) print('下单,耗时2s') time.sleep(2) return True else: return False
# 1 redis 是什么
非关系型数据库(存数据)
key-value形式存储
5大数据类型:字符串,hash,列表,集合,有序集合
纯内存存储
# 2 为什么这么快?
- 纯内存
- io多路复用网络模型
- 数据操作单线程,避免了线程切换
# 3 python 链接redis
- 普通方式: conn=redis.Redis(链接参数)
- 连接池[必须单例]: (django中使用数据库连接池)
-创建池
- 从池就拿链接
-连接池和半连接池有什么关系?
-连接池:
-半连接池:
-信号量 信号
Semaphore signal
# 4 python 的字符串操作
-1 缓存
-qs/单个对象---》ser.data --->[{},{}]/{}---》Response(data=ser.data)--》json格式字符串
- res=super().list()【Response的对象】--->res.data--->cache.set('banner_list',res.data)---》redis--》字符串---》pickle
-2 计数器(访问量,日活,月活)
-3 短信验证码存储--》过期时间
-4 常用api---》常用方法
set get ....conn.get
这些方法,对应着redis的命令
conn.get get
conn.set set
-5 所有
1 set(name, value, ex=None, px=None, nx=False, xx=False)
2 setnx(name, value)
3 psetex(name, time_ms, value)
4 mset(*args, **kwargs)
5 get(name)
6 mget(keys, *args)
7 getset(name, value)
8 getrange(key, start, end)
9 setrange(name, offset, value)
10 setbit(name, offset, value)
11 getbit(name, offset)
12 bitcount(key, start=None, end=None)
13 bitop(operation, dest, *keys)
14 strlen(name)
15 incr(self, name, amount=1)
#incrby
16 incrbyfloat(self, name, amount=1.0)
17 decr(self, name, amount=1)
18 append(key, value)
# 5 hash操作
-1 缓存,计数器---》hash类型能做的,一般都用字符串顶替它
-2 常用api
1 hset(name, key, value)
2 hmset(name, mapping) # 弃用了
3 hget(name,key)
4 hmget(name, keys, *args)
5 hgetall(name)
6 hlen(name)
7 hkeys(name)
8 hvals(name)
9 hexists(name, key)
10 hdel(name,*keys)
11 hincrby(name, key, amount=1)
12 hincrbyfloat(name, key, amount=1.0)
13 hscan(name, cursor=0, match=None, count=None)
14 hscan_iter(name, match=None, count=None) # 取所有
# 6 列表
-作用:队列,栈,消息队列[分布式]--blopop阻塞式, 有顺序可重复
-常用api
1 lpush(name, values)
2 rpush(name, values) 表示从右向左操作
3 lpushx(name, value)
4 rpushx(name, value) 表示从右向左操作
5 llen(name)
6 linsert(name, where, refvalue, value)
7 r.lset(name, index, value)
8 r.lrem(name, num,value)
9 lpop(name)
10 rpop(name) 表示从右向左操作
11 lindex(name, index)
12 lrange(name, start, end) # 通过它取所有
13 ltrim(name, start, end)
14 rpoplpush(src, dst)
15 blpop(keys, timeout)
16 r.brpop(keys, timeout),从右向左获取数据
17 brpoplpush(src, dst, timeout=0)
# 7 去列表所有数据---》做成生成器
# 由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要(lrange):
# 1、获取name对应的所有列表
# 2、循环列表
# 但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能:
import redis
conn=redis.Redis(host='127.0.0.1',port=6379)
def scan_list(name,count=2):
index=0
while True:
(列表的key,0,1)
(列表的key,2,3)
data_list=conn.lrange(name,index,count+index-1)
if not data_list:
return
index+=count
for item in data_list:
yield item
print(conn.lrange('test',0,100))
for item in scan_list('test',5):
print('---')
print(item)
# 8 redis 其他操作
delete(*names)
exists(name)
keys(pattern='*')
expire(name ,time)
rename(src, dst)
move(name, db))
randomkey()
type(name)
# 9 django中使用redis
-通用方案:
-pool.py--->得到单例的POOL
-在视图函数中---》导入,使用:conn = redis.Redis(connection_pool=POOL):
-第三方:
-安装
-配置文件配置:带池
-导入使用
from django_redis import get_redis_connection
conn = get_redis_connection()
-django 的缓存--》缓存到redis中
cache.set(key,value,3)
# 10 接口加缓存标签:---,task,name,09,redis,celery,luffy,import From: https://www.cnblogs.com/wzh366/p/17995119