首页 > 其他分享 >简单的队列转发服务器

简单的队列转发服务器

时间:2022-09-23 10:48:00浏览次数:49  
标签:return get 队列 self redis url key 转发 服务器

废话不说先上代码

# 项目是转发服务器,针对新手
from typing import Optional

from flask import Flask, request
import redis, queue, time, requests

app = Flask(__name__)
# 从连接池获取连接对象
redis_uri = 'redis://:[email protected]:6379/0'
redis_0 = redis.from_url(redis_uri)


class MyQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        self.set = set()  # 存放队列的任务,作用仅仅是看下是否有任务在列队中

    def put(self, item, block: bool = ..., timeout: Optional[float] = ..., key=None) -> None:
        if key and not self.job_exists(key):  # 如果key是空的,或者任务已存在,跳过
            super(MyQueue, self).put(item, block, timeout)
            self.set.add(key)
            self.get()

    def job_exists(self, key):
        if key in self.set:
            return True
        return False

    def get(self, block: bool = ..., timeout: Optional[float] = ...):
        if len(self.set):
            key = self.set.pop()
            ret = super(MyQueue, self).get()  # 执行
            if isinstance(ret, str):
                redis_0.set(key, ret, ex=5 * 60)
            elif not ret:
                redis_0.delete(key)
            else:
                redis_0.set(key, ret.text, ex=5 * 60)


my_queue = MyQueue()


def get_cache(func):
    def wrapper(*args, **kwargs):
        count = 0
        flag = True
        url = request.args.get('url')
        if not url:
            return "参数错误"
        if not url.startswith('http://') and not url.startswith('https://'):
            return '路由错误'
        key = url.replace('^^', '&')
        if not key:
            return "未传参"
        redis_value = redis_0.get(key)
        if redis_value and redis_value != b'wait':
            return redis_value.decode()
        elif not redis_value:
            redis_0.set(key, 'wait', ex=5 * 60)
            func(args, kwargs)
        while flag:
            if not redis_0.get(key):
                return '请求异常'
            if redis_0.get(key) != b'wait':
                return redis_0.get(key).decode()
            time.sleep(0.1)
            count += 1
            if count >= 50:
                redis_0.delete(key)
                flag = False
    return wrapper


@app.route('/cache')
@get_cache
def hello_world(*args, **kwargs):  # put application's code here
    url = request.args.get('url')
    key = url.replace('^^', '&')
    my_queue.put(_req(url=key, method='get'), key=str(key))


def _req(url, method, params=None, payload=None, **kwargs):
    if method.lower() == 'get':
        try:
            headers = {"Authorization": "Basic aWNnb286c20yOTA2"}
            return requests.get(url=url, params=params, headers=headers)
        except requests.exceptions.MissingSchema:
            return "参数错误"


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=6808)

正文

1. 思路

转发服务器,说白了就是帮忙把请求转发到指定位置。

公司有很多服务器,转发到指定供应商,供应商不想设置这么多服务器的权限。

因此公司决定制作一个转发服务器,专门转发到供应商的请求,部署5份。

另外供应商觉得请求过于频繁,要减少请求频率,并且减少瞬时请求次数

  • 如上需求,我们拆分一下:
    1. 这个服务器负责转发(确认使用requests)
    2. 这个服务器只有5份,也就是效率要高(决定flask的关键)
    3. 减少请求频率的功能(确认使用redis)
    4. 减少瞬时请求次数的功能(使用任务队列queue)

2. 实战

2.1 安装:

既然决定flask、redis、requests,那么就安装:

pip install flask
pip install redis
pip install requests

2.2 编写代码

首先当然是导入包,然后初始化

# !/usr/bin/env python
# -*-coding:utf-8 -*-

"""
@File       : app.py
@Time       :2022/9/21 下午4:20
@author     :umbrella
@email      : [email protected]
@version    :python 3.8
@Description:
"""

from flask import Flask, request
import redis, queue, requests

app = Flask(__name__)
# 从连接池获取连接对象
redis_uri = 'redis://:[email protected]:6379/0'
redis_0 = redis.from_url(redis_uri)

开始编写接口

这里要注意,本次需求都是get的请求

本次设计中,使用http://127.0.0.1/hello_world?url=转发

但是如果http://127.0.0.1/test?word=hello&from=umbrella来转发的话,会解析为

url=http://127.0.0.1/test?word=hello,from=umbrella两个参数,导致失败

因此使用^^来替换&,在这个项目中,又替换回来

@app.route('/hello_world')
def hello_world(*args, **kwargs):  # put application's code here
    url = request.args.get('url')
    key = url.replace('^^', '&')  # 替换回&
    return requests.get(url=url)

if __name__ == '__main__':
    app.run()

一个flask转发服务器就完成了,可以运行后访问试试:

python app.py

缓存功能就是把得到的数据缓存到redis中,然后下次请求从redis获取,那么我会选择做一个装饰器

key:当然是url的值

value:返回的值

def get_cache(func):
    def wrapper(*args, **kwargs):
        url = request.args.get('url')
        if not url:
            return "参数错误"
        if not url.startswith('http://') and not url.startswith('https://'):
            return '路由错误'
        key = url.replace('^^', '&')
        if not key:
            return "未传参"
        redis_value = redis_0.get(key)
        if redis_value:
            return redis_value.decode()
        elif not redis_value:
            func(args, kwargs)
    return wrapper

然后调用装饰器

@app.route('/hello_world')
@get_cache
def hello_world(*args, **kwargs): ...

完成缓存和获取缓存功能,第二次请求的速度高了不少,还剩最后一个需求:队列

首先分析功能点:

请求进入队列时,要判断之前是否有相同的请求在队列中

如果有:不去转发,直接等待之前的任务完成,然后从redis获取

如果没有:转发

因为队列不好查找任务,所以我们在请求进来的时候,把任务放到一个set中,这样就好判断了

另外任务如果在请求中,此时set已经没有了,需要设置一个状态保证下一个任务不去请求,我选择redis存个wait

# 重写类
class MyQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        self.set = set()  # 放任务的set

    def put(self, item, block: bool = ..., timeout: Optional[float] = ..., key=None) -> None:
        if key and not self.job_exists(key):  # 如果有任务了,就不进来了,直接到装饰器去等待
            super(MyQueue, self).put(item, block, timeout)
            self.set.add(key)
            self.get()

    def job_exists(self, key):
        if key in self.set:
            return True
        return False

    def get(self, block: bool = ..., timeout: Optional[float] = ...):
        if len(self.set):
            key = self.set.pop()
            ret = super(MyQueue, self).get()  # 缓存请求的结果
            if isinstance(ret, str):
                redis_0.set(key, ret, ex=5 * 60)
            elif not ret:
                redis_0.delete(key)
            else:
                redis_0.set(key, ret.text, ex=5 * 60)


my_queue = MyQueue()
# 修改装饰器:
def get_cache(func):
    def wrapper(*args, **kwargs):
        count = 0
        flag = True
        url = request.args.get('url')
        if not url:
            return "参数错误"
        if not url.startswith('http://') and not url.startswith('https://'):
            return '路由错误'
        key = url.replace('^^', '&')
        if not key:
            return "未传参"
        redis_value = redis_0.get(key)
        if redis_value and redis_value != b'wait':  # 如果获取得到结果,并且不是进行中,那么返回
            return redis_value.decode()
        elif not redis_value:
            redis_0.set(key, 'wait', ex=5 * 60)  # 1. 开始请求之前,设置个wait
            func(args, kwargs)
        while flag:  # 如果是wait,那么从redis获取结果,直到成功
            if not redis_0.get(key):
                return '请求异常'
            if redis_0.get(key) != b'wait':
                return redis_0.get(key).decode()
            time.sleep(0.1)  # 防止性能问题
            count += 1
            if count >= 50:  # 如果获取50次,还没拿到结果,那么删除key
                redis_0.delete(key)
                flag = False
    return wrapper
# 修改主函数
@app.route('/cache')
@get_cache
def hello_world(*args, **kwargs):  # put application's code here
    url = request.args.get('url')
    key = url.replace('^^', '&')
    my_queue.put(_req(url=key, method='get'), key=str(key))


def _req(url, method, params=None, payload=None, **kwargs):
    if method.lower() == 'get':
        try:
            return requests.get(url=url, params=params, headers=headers)
        except requests.exceptions.MissingSchema:
            return "参数错误"

结语

程序还有一些些问题,仅供练手,非正式项目

标签:return,get,队列,self,redis,url,key,转发,服务器
From: https://www.cnblogs.com/zaxl932946/p/16721861.html

相关文章

  • ssh端口转发
    前言有时候由于网络策略的限制,我们无法直接访问目标服务,但有个中间服务器两边互通且可以ssh访问,这时候就可以借助ssh实现端口转发。假设有以下场景:主机A:192.168.0.20,客......
  • BM42 用两个栈实现队列
    描述用两个栈来实现一个队列,使用n个元素来完成n次在队列尾部插入整数(push)和n次在队列头部删除整数(pop)的功能。队列中的元素为int类型。保证操作合法,即保证pop操......
  • MQ 消息队列时如何确保消息不丢失
    面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了MQ技术(如Kafka、RabbitMQ、RocketMQ),基本都会抛出一个问题:在使用MQ的时候,怎么确保消息100%不丢......
  • 无法从命令行或调试器启动服务,必须首先安装Windows服务(使用installutil.exe),然后用S
    window服务调试报错:无法从命令行或调试器启动服务,必须首先安装Windows服务(使用installutil.exe),然后用ServerExplorer、Windows服务器管理工具或NETSTART命令启动它 ......
  • 工作流服务器和图形化工作流配置管理应用
    开源的.Net工作流引擎Elsa初试——创建工作流服务器和图形化工作流配置管理应用 微软的WorkflowFoundation基于.NetFramework,并且没有向.NetCore迁移的计划。我们......
  • 内网时间服务器chrony
    时间服务器172.31.1.4客户端:172.31.1.2,172.31.1.3服务端:1.更改/etc/chrony.conf。其中,对于内网环境,必须要启用localstratum10。因为你需要将本机的时间,给客户端做时......
  • excel导出大数据量时服务器cpu过高tomcat卡死问题排查
      最近发现一套线上生产系统每周一都会出现宕机的现象,CPU很高,持续几分钟后tomcat直接卡死,系统无法登陆刷新无反应,重启后又回复正常,各种定位各种检查。最开始想到的是不是......
  • vue3+ts+elementui中的手动上传至服务器
    <el-uploadclass="inline"ref="uploadImgRef":http-request="uploadImg":auto-upload="false":accept="'.jpg,.png'"><template#trigger><el-butto......
  • 服务器里面运行一个项目用docker容器
    先建一个文件docker-compose.yml#使用说明V3.3.0#1.使用docker-compose宿主机不需要配置host来发现#2.无需修改源码,根目录docker-composeup即可#3.静静等......
  • 什么是NTP服务器?ntp时间服务器京准带你从入门到了解
    什么是NTP服务器?ntp时间服务器京准带你从入门到了解什么是NTP服务器?ntp时间服务器京准带你从入门到了解京准电子科技官微——ahjzsz1、NTP协议概述NTP最早由美国Delawa......