废话不说先上代码
# 项目是转发服务器,针对新手
from typing import Optional
from flask import Flask, request
import redis, queue, time, requests
app = Flask(__name__)
# 从连接池获取连接对象
redis_uri = 'redis://:[email protected]:6379/0'
redis_0 = redis.from_url(redis_uri)
class MyQueue(queue.Queue):
def __init__(self):
super().__init__()
self.set = set() # 存放队列的任务,作用仅仅是看下是否有任务在列队中
def put(self, item, block: bool = ..., timeout: Optional[float] = ..., key=None) -> None:
if key and not self.job_exists(key): # 如果key是空的,或者任务已存在,跳过
super(MyQueue, self).put(item, block, timeout)
self.set.add(key)
self.get()
def job_exists(self, key):
if key in self.set:
return True
return False
def get(self, block: bool = ..., timeout: Optional[float] = ...):
if len(self.set):
key = self.set.pop()
ret = super(MyQueue, self).get() # 执行
if isinstance(ret, str):
redis_0.set(key, ret, ex=5 * 60)
elif not ret:
redis_0.delete(key)
else:
redis_0.set(key, ret.text, ex=5 * 60)
my_queue = MyQueue()
def get_cache(func):
def wrapper(*args, **kwargs):
count = 0
flag = True
url = request.args.get('url')
if not url:
return "参数错误"
if not url.startswith('http://') and not url.startswith('https://'):
return '路由错误'
key = url.replace('^^', '&')
if not key:
return "未传参"
redis_value = redis_0.get(key)
if redis_value and redis_value != b'wait':
return redis_value.decode()
elif not redis_value:
redis_0.set(key, 'wait', ex=5 * 60)
func(args, kwargs)
while flag:
if not redis_0.get(key):
return '请求异常'
if redis_0.get(key) != b'wait':
return redis_0.get(key).decode()
time.sleep(0.1)
count += 1
if count >= 50:
redis_0.delete(key)
flag = False
return wrapper
@app.route('/cache')
@get_cache
def hello_world(*args, **kwargs): # put application's code here
url = request.args.get('url')
key = url.replace('^^', '&')
my_queue.put(_req(url=key, method='get'), key=str(key))
def _req(url, method, params=None, payload=None, **kwargs):
if method.lower() == 'get':
try:
headers = {"Authorization": "Basic aWNnb286c20yOTA2"}
return requests.get(url=url, params=params, headers=headers)
except requests.exceptions.MissingSchema:
return "参数错误"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6808)
正文
1. 思路
转发服务器,说白了就是帮忙把请求转发到指定位置。
公司有很多服务器,转发到指定供应商,供应商不想设置这么多服务器的权限。
因此公司决定制作一个转发服务器,专门转发到供应商的请求,部署5份。
另外供应商觉得请求过于频繁,要减少请求频率,并且减少瞬时请求次数
- 如上需求,我们拆分一下:
- 这个服务器负责转发(确认使用requests)
- 这个服务器只有5份,也就是效率要高(决定flask的关键)
- 减少请求频率的功能(确认使用redis)
- 减少瞬时请求次数的功能(使用任务队列queue)
2. 实战
2.1 安装:
既然决定flask、redis、requests,那么就安装:
pip install flask
pip install redis
pip install requests
2.2 编写代码
首先当然是导入包,然后初始化
# !/usr/bin/env python
# -*-coding:utf-8 -*-
"""
@File : app.py
@Time :2022/9/21 下午4:20
@author :umbrella
@email : [email protected]
@version :python 3.8
@Description:
"""
from flask import Flask, request
import redis, queue, requests
app = Flask(__name__)
# 从连接池获取连接对象
redis_uri = 'redis://:[email protected]:6379/0'
redis_0 = redis.from_url(redis_uri)
开始编写接口
这里要注意,本次需求都是get的请求
本次设计中,使用http://127.0.0.1/hello_world?url=转发
但是如果http://127.0.0.1/test?word=hello&from=umbrella来转发的话,会解析为
url=http://127.0.0.1/test?word=hello,from=umbrella两个参数,导致失败
因此使用^^来替换&,在这个项目中,又替换回来
@app.route('/hello_world')
def hello_world(*args, **kwargs): # put application's code here
url = request.args.get('url')
key = url.replace('^^', '&') # 替换回&
return requests.get(url=url)
if __name__ == '__main__':
app.run()
一个flask转发服务器就完成了,可以运行后访问试试:
python app.py
缓存功能就是把得到的数据缓存到redis中,然后下次请求从redis获取,那么我会选择做一个装饰器
key:当然是url的值
value:返回的值
def get_cache(func):
def wrapper(*args, **kwargs):
url = request.args.get('url')
if not url:
return "参数错误"
if not url.startswith('http://') and not url.startswith('https://'):
return '路由错误'
key = url.replace('^^', '&')
if not key:
return "未传参"
redis_value = redis_0.get(key)
if redis_value:
return redis_value.decode()
elif not redis_value:
func(args, kwargs)
return wrapper
然后调用装饰器
@app.route('/hello_world')
@get_cache
def hello_world(*args, **kwargs): ...
完成缓存和获取缓存功能,第二次请求的速度高了不少,还剩最后一个需求:队列
首先分析功能点:
请求进入队列时,要判断之前是否有相同的请求在队列中
如果有:不去转发,直接等待之前的任务完成,然后从redis获取
如果没有:转发
因为队列不好查找任务,所以我们在请求进来的时候,把任务放到一个set中,这样就好判断了
另外任务如果在请求中,此时set已经没有了,需要设置一个状态保证下一个任务不去请求,我选择redis存个wait
# 重写类
class MyQueue(queue.Queue):
def __init__(self):
super().__init__()
self.set = set() # 放任务的set
def put(self, item, block: bool = ..., timeout: Optional[float] = ..., key=None) -> None:
if key and not self.job_exists(key): # 如果有任务了,就不进来了,直接到装饰器去等待
super(MyQueue, self).put(item, block, timeout)
self.set.add(key)
self.get()
def job_exists(self, key):
if key in self.set:
return True
return False
def get(self, block: bool = ..., timeout: Optional[float] = ...):
if len(self.set):
key = self.set.pop()
ret = super(MyQueue, self).get() # 缓存请求的结果
if isinstance(ret, str):
redis_0.set(key, ret, ex=5 * 60)
elif not ret:
redis_0.delete(key)
else:
redis_0.set(key, ret.text, ex=5 * 60)
my_queue = MyQueue()
# 修改装饰器:
def get_cache(func):
def wrapper(*args, **kwargs):
count = 0
flag = True
url = request.args.get('url')
if not url:
return "参数错误"
if not url.startswith('http://') and not url.startswith('https://'):
return '路由错误'
key = url.replace('^^', '&')
if not key:
return "未传参"
redis_value = redis_0.get(key)
if redis_value and redis_value != b'wait': # 如果获取得到结果,并且不是进行中,那么返回
return redis_value.decode()
elif not redis_value:
redis_0.set(key, 'wait', ex=5 * 60) # 1. 开始请求之前,设置个wait
func(args, kwargs)
while flag: # 如果是wait,那么从redis获取结果,直到成功
if not redis_0.get(key):
return '请求异常'
if redis_0.get(key) != b'wait':
return redis_0.get(key).decode()
time.sleep(0.1) # 防止性能问题
count += 1
if count >= 50: # 如果获取50次,还没拿到结果,那么删除key
redis_0.delete(key)
flag = False
return wrapper
# 修改主函数
@app.route('/cache')
@get_cache
def hello_world(*args, **kwargs): # put application's code here
url = request.args.get('url')
key = url.replace('^^', '&')
my_queue.put(_req(url=key, method='get'), key=str(key))
def _req(url, method, params=None, payload=None, **kwargs):
if method.lower() == 'get':
try:
return requests.get(url=url, params=params, headers=headers)
except requests.exceptions.MissingSchema:
return "参数错误"
结语
程序还有一些些问题,仅供练手,非正式项目
标签:return,get,队列,self,redis,url,key,转发,服务器 From: https://www.cnblogs.com/zaxl932946/p/16721861.html