首页 > 编程语言 >带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程

时间:2023-11-28 13:07:19浏览次数:47  
标签:队列 32 最小 range 等待时间 time print import data

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程_等待时间

 (注:照片源自免费网站,地址:https://www.freepik.com/photos/angry-panda/13

 

 

==================================================

 

 

事情起源是最近在看一个TensorFlow的代码,是 TensorFlow实现了一个最小等待时间的队列,解释一下就是一个进程阻塞在一个队列上等待数据的读取,但是这个阻塞是有条件的,需要满足两个条件中的任意一个即可解除阻塞:一个是读取到了N个数据后;另一个是在阻塞了T秒后如果读取到的数据不为零。满足这两个条件,那么阻塞在队列上的读取进程即可恢复执行。

 

理论上这个设计是可以提高运算效率的,但是个人认为这个设计在实际使用中作用是十分有效的,但是这个功能既然TensorFlow有提供而其他框架并没有提供,那么我们是否可以手动实现一个呢,或者是python版本呢。

 

根据前面的描述我们可以知道这个设计需要在满足下面两个条件时接触阻塞:

1. 读取到最少N个数据;

2. 读取到一个以上数据,并且阻塞用时达到阈值T;

 

 

 

对于这个实现,考虑过使用多进程、多线程、异步,后来想想还是用多线程实现大致的逻辑,具体如下(consumer函数没有实现,不能运行,只有逻辑):

from multiprocessing import Process, Queue
import threading
from threading import Thread
import numpy as np
import time
import asyncio


q = Queue(maxsize=128)


def produce(q):
    while True:
        q.push( np.random.rand(128, 128) )
        time.sleep(np.random.randint(10)/1000)


for _ in range(10):
    p = Process(target=produce, args=(q))
    p.start()


data = []
def consumer():
    pass

flag = False
lock_2 = threading.Lock()
def timer(lock):
    while True:
        time.sleep(0.1)
        lock2.acquire()
        if not flag:
            if not data:
                data2 = data[:32]
                data = data[32:]
            else:
                data.append(q.get())

                data2 = data[:32]
                data = data[32:]
            consumer(data)
        else:
            flag = False
        lock2.release()


t = Thread(target=timer, args=(lock,))
t.run()


while True:
    lock2.acquire()
    data.append(q.get())
    if len(data) < 32:
        lock2.release()
        continue
    else:
        flag = True
        data2 = data[:32]
        data = data[32:]
        consumer(data2)
        lock2.release()

 

 

 

后来想想刚才的实现好像很没有必要,完全可以用更简单的实现,具体如下:

主要逻辑:(N为32,最小等待时间为1秒)

q = Queue(maxsize=128)

# while True:
for i in range(1000000):
    for _ in range(32):
        try:
            data.append(q.get(block=False, timeout=1.0/32))
        except queue.Empty:
            print("Empty!!!")
    if len(data) == 0:
        data.append(q.get())

    print("time: ", i)
    consumer(data)

 

不过很不幸的是这个代码并不能正常运行,原因是queue对象在block为False情况下timeout只能是正整数,如果是小于零的数则视为0.

可以运行的代码

from multiprocessing import Process, Queue
import threading
from threading import Thread
import numpy as np
import time
import queue


q = Queue(maxsize=128)


def produce(q):
    while True:
        q.put( np.random.rand(128, 128) )
        time.sleep(np.random.randint(100)/1000)


for _ in range(10):
    p = Process(target=produce, args=(q, ))
    p.start()


data = []
def consumer(data):
    print("长度:", len(data))
    data.clear()
    time.sleep(0.1)

"""
while True:
    print(q.qsize())
"""

# while True:
for i in range(3600):
    a_t = time.time()
    for _ in range(32):
    #for _ in range(64):
        data.append(q.get())
        if time.time() - a_t > 0.1:
            break

    # print("time: ", i)
    consumer(data)

 

 

当for _ in range(32)时,运行如下:

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程_多线程_02

 

 

 

当for _ in range(64)时,运行如下:

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程_等待时间_03

 

 

--------------------------------------------------------

 

 

由于上面的代码都是基于假设的produce方法和consumer方法,在实际应用中我们可以通过调整参数的方式来寻找更高效的运行方式,具体的修改参数有两处,为:

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程_等待时间_04

 

 

具体调整的位置为第6行的range(32)以及第9行的0.1,通过这两个数值的修改可以对具体的product和consumer进行性能提升。

 

具体问题时需要根据具体表现出的吞吐率来对这两个数值进行调整。

 

 

range数值代表我们预设的希望读取的数据量;0.1处的数值代表我们的最小容忍时间/等待时间。

 

 

==================================================

 

 

对上面的代码做些修改,加入了吞吐率的计算:

from multiprocessing import Process, Queue
import threading
from threading import Thread
import numpy as np
import time
import queue


q = Queue(maxsize=128)


def produce(q):
    while True:
        q.put( np.random.rand(128, 128) )
        time.sleep(np.random.randint(100)/1000)


for _ in range(10):
    p = Process(target=produce, args=(q, ))
    p.start()


data = []
def consumer(data):
    l = len(data)
    #print("长度:", len(data))
    data.clear()
    time.sleep(0.1)
    return l

"""
while True:
    print(q.qsize())
"""

s = 0
# while True:
b_t = time.time()
for i in range(36):
    a_t = time.time()
    #for _ in range(32):
    for _ in range(16):
        data.append(q.get())
        if time.time() - a_t > 0.1:
            break

    # print("time: ", i)
    s += consumer(data)
b = time.time() - b_t
print(b)
print(s)
print("吞吐率", s/b)

 

带有最小间隔时间的队列读取实现 —— 最小等待时间的队列 —— Python编程_多线程_05

 

 

 

 

在保持最小间隔时间0.1不变的情况下:

当range(32)和range(64)时吞吐率大致为200个每一秒,在range(16)时则为155个每一秒。

 

 

 

 

===============================================

 

标签:队列,32,最小,range,等待时间,time,print,import,data
From: https://blog.51cto.com/u_15642578/8598599

相关文章

  • Python用偏最小二乘回归Partial Least Squares,PLS分析桃子近红外光谱数据可视化
    全文链接:https://tecdat.cn/?p=34376原文出处:拓端数据部落公众号PLS,即偏最小二乘(PartialLeastSquares),是一种广泛使用的回归技术,用于帮助客户分析近红外光谱数据。如果您对近红外光谱学有所了解,您肯定知道近红外光谱是一种次级方法,需要将近红外数据校准到所要测量的参数的主要......
  • 支持修改键值的优先队列(以C++,Java为例)
    #include<queue>#include<functional>template<typenameT1,typenameT2>classmutable_priority_queue;template<typenameT1,typenameT2>classmutable_priority_queue{private:std::function<bool(conststd::pair<T1,T......
  • 一些Linux下系统安全软件的最小可用知识(目前1个软件)
    ClamAV使用概述ClamAV杀毒是Linux平台最受欢迎的杀毒软件,ClamAV属于免费开源产品,支持多种平台。ClamAV是基于病毒扫描的命令行工具,但同时也有支持图形界面的ClamTK工具。ClamAV主要用于邮件服务器扫描邮件。它有多种接口从邮件服务器扫描邮件,支持文件格式有如:ZIP、RAR、TAR......
  • 力扣907. 子数组的最小值之和(单调栈)
    给定一个整数数组 arr,找到 min(b) 的总和,其中 b 的范围为 arr 的每个(连续)子数组。由于答案可能很大,因此 返回答案模 10^9+7 。 示例1:输入:arr=[3,1,2,4]输出:17解释:子数组为[3],[1],[2],[4],[3,1],[1,2],[2,4],[3,1,2],[1,2,4],[3,1,2,4]。最小值为3,1,2,4,1,1,2,1,1,1,和......
  • 最小生成树(Kruskal和Prim算法)
    最小生成树(Kruskal和Prim算法)部分资料来源于:最小生成树(Kruskal算法)_kruskal算法求最小生成树-CSDN博客、【算法】最小生成树——Prim和Kruskal算法-CSDN博客关于图的几个概念定义:连通图:在无向图中,若任意两个顶点vi与vj都有路径相通,则称该无向图为连通图。强连通图:在有向图中,若......
  • 907. 子数组的最小值之和(贡献法,单调栈,前后缀分解)
     题目不难,但是涉及到的知识点很丰富。classSolution:defsumSubarrayMins(self,arr:List[int])->int:MOD=10**9+7n=len(arr)pre=[-1]*nsuf=[n]*nstk=[]foriinrange(n):w......
  • 队列
    一、算法描述本篇文章讲述的数据结构是,队列,数组模拟队列,也不是循环队列。队列的结构,完全就是学校食堂排队打饭的那个队列。一个队头,一个队尾,从队头出,从队尾进,排队打饭也是这样hhh。//用数组模拟的队列定义如下:inthh,tt;intq[N];/* hh表示队头,tt表示队尾(我习惯于表示队尾......
  • 12_二叉树的最小深度
    二叉树的最小深度给定一个二叉树,找出其最小深度。最小深度是从根节点到最近叶子节点的最短路径上的节点数量。说明:叶子节点是指没有子节点的节点。示例1:输入:root=[3,9,20,null,null,15,7]输出:2示例2:输入:root=[2,null,3,null,4,null,5,null,6]输出:5【思路】当遍......
  • PTA-ch7b-5 : 最小工期
    最小工期一个项目由若干个任务组成,任务之间有先后依赖顺序。项目经理需要设置一系列里程碑,在每个里程碑节点处检查任务的完成情况,并启动后续的任务。现给定一个项目中各个任务之间的关系,请你计算出这个项目的最早完工时间。输入格式:首先第一行给出两个正整数:项目里程碑的数量N......
  • 队列(最基本队列,标准队列 2个,双端队列,单调队列)
    2023-11-26最基本队列:一次性使用的classQueue01{//最基本队列,一次性的,数组模拟,先进先出//功能:入队,出队,判满,判空,显示队头,显示队列privateint[]queue;privateintfront=-1;//指向第一个元素前一个位置privateinttail=-1;//指向最后一个元素p......