首页 > 编程语言 >Python 全栈系列256 异步任务与队列消息控制(填坑)

Python 全栈系列256 异步任务与队列消息控制(填坑)

时间:2024-06-22 16:00:11浏览次数:13  
标签:Python resp worker 全栈 msg 填坑 test data id

说明

每个创新都会伴随着一系列的改变。

在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。

内容

1 问题描述

我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行一次。然后我就发现输出队列的数据大约是6~7倍于原始数据。

2 分析

在同步执行的状态下,前面 一个任务没有结束,后面的任务即使到了执行时间也会错过。这个在APS任务里是非常明确的。但由于Celery执行的Worker是异步的,这意味着即使前一个任务没有完成,后一个任务还是会如期启动,另开一个线头。

Worker1之前的模式是采用xrange方式获取数据,在处理完成后才将消息删除。

由于模式识别的过程比较复杂,层层过滤,所以单个worker执行的时间超过了60秒。这样在这批消息删除之前,每次启动的worker都取到了相同的数据,处理后也会输出到结果队列。

3 解决办法

理论上,每次worker的取数应该是采用xfetch比较合理,但是对应的,xfetch会因为worker的中断导致消息残留。所以就要有另一些worker来进行残余消息的检测和处理。结果就是 xfetch worker + residual worker配合,显得麻烦。

过去在同步状态下,我就偷懒,只用一个worker进行xrange,这样只有消息被真实消费才会删除。

xfetch是支持多个worker并行的,而xrange则智能支持单个worker。

所以,本次要做的事就是把xfetch + residual 模式搞一下,以后该用什么模式就什么模式。

4 实践

为每个worker提供一种获取残余消息(residual)的办法,每个小时执行一次即可。普通的worker(fetch)一般是秒级,或者分钟级执行的。

当前的QManager是架在RedisAgent服务上封装的对象,这个对象极大简化了平时的操作。不过之前,并没有完全将QManager与RedisAgent的参数对接,采用了较为简单的方式。

本次需要做的是先使用RedisAgent完成对应的任务,然后将QManager进行升级。

构造测试队列

test_list = [{'doc_id':1, 'content':'first'}, {'doc_id':2, 'content':'ss'}]
qm.ensure_group('test.test.test')
qm.parrallel_write_msg('test.test.test', test_list)

{'status': True, 'msg': 'ok,add 2 of 2  messages'}

获取消息

qm.xfetch('test.test.test', count=1)
{'data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'}],
 'status': True,
 'msg': 'ok'}
  • 1 判断是否有延误消息

两个关键参数,一个是队列名称,一个是延误时间。如果不写延误时间,就是看所有的延误。

resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test' , 'idle_seconds':20}).json()

resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test','idle_seconds':None }).json()

{'status': True,
 'msg': 'ok',
 'data': [['1718984345178-0', 'consumer1', 36675032, 1]]}

延误时间的最大作用是避免获取短时间内超时的任务(如果任务本身就需要很长时间)

如果data字段长度不为0,那么就会有延误消息,获取最小和最大的id即可。

  • 2 根据起止id获取数据
delay_data = resp['data'] 
start_id = delay_data[0][0]
end_id = delay_data[-1][0]

resp = req.post('http://172.17.0.1:24118/xrange/',json = {'stream_name':'test.test.test' , 
                                                          'start_id': start_id,
                                                          'end_id':end_id}).json()
{'status': True,
 'msg': 'ok',
 'data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},
  {'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}

所以相应低,修改QMananger(version1.3)的xrange方法,并增加xpending方法

xrange

...
    # 批量获取数据
    def xrange(self, stream_name, count = None, start_id = '-' , end_id ='+'):
        cur_count = count or self.batch_size 

        recs_resp = req.post(self.redis_agent_host + 'xrange/',
                json ={'connection_hash':self.redis_connection_hash, 
                        'stream_name':stream_name,
                        'count':cur_count,
                        'start_id':start_id,
                        'end_id':end_id
                        }).json()
        return recs_resp

xpending。原来的接口似乎有点小bug:如果队列没有延误,接口查询会失败

...
    def xpending(self, stream_name,count = None, idle_seconds = 3600):
        cur_count = count or self.batch_size 
        # 1 确认是否有延误消息:没有延误消息的情况接口会报错
        try:
            resp = req.post(self.redis_agent_host + 'get_pending_msg/',
                    json ={'stream_name': stream_name, 
                        'idle_seconds': idle_seconds
                    }).json()
            # 如果没有数据,直接返回(标准格式)
            if len(resp['data']) == 0:
                print('No Pending')
                return resp 
        except:
            return {'status':True, 'msg':'query pending fail', 'data':[]}
            
        # 2 获取被延误的消息
        min_id = resp['data'][0][0]
        max_id = resp['data'][-1][0]
        return self.xrange(stream_name, count = cur_count, start_id = min_id, end_id = max_id)

Note: 我们对正常执行的任务,感知/容忍的周期为分钟;对延误执行(补漏)的任务,感知/容忍的周期为小时。

来看改造后的QM

#  xfetch,但是此时已经无数据可取
qm.xfetch('test.test.test' )
{'status': True, 'msg': 'ok', 'data': []}
# xpending 此时有两条延误较长时间的消息
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True,
 'msg': 'ok',
 'data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},
  {'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}
# 用xrange取出,处理
data_list = qm.xpending('test.test.test' , idle_seconds=3600)['data']
[{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},
 {'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]

# 假设处理完,准备删除消息
data_msg_list = qm.extract_msg_id(data_list)
['1718984345178-0', '1718984345178-1']
qm.xdel('test.test.test', data_msg_list)
{'data': 2, 'status': True, 'msg': 'ok'}

# 再次使用xpending
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True, 'msg': 'no source data', 'data': []}

另外,xpending中,即使是把pending的消息处理掉了,仍然可以读到pending信息,所以每次会调用一下xrange查询一个不存在的区间,稍微有点浪费。不过考虑到这是补救型的操作,一个小时才运行一次,就没有关系了。

标签:Python,resp,worker,全栈,msg,填坑,test,data,id
From: https://blog.csdn.net/yukai08008/article/details/139870889

相关文章

  • Python—学生信息管理系统
    用Python来实现学生信息管理系统学生信息管理系统(Python)简介:基本信息管理和学生成绩管理。基本信息管理模块的主要功能有学生信息的添加、删除、修改、显示和学生数据的导入导出,学生成绩管理模块的主要功能有统计课程最高分、最低分和平均分。一、功能模块:基本信息管理......
  • 参考Python官网学习Python
    参考Python官网学习PythonPython教程—Python3.12.4文档Python:Python解释器易于扩展,使用C或C++(或其他C能调用的语言)即可为Python扩展新功能和数据类型。Python也可用作定制软件中的扩展程序语言。-------------------------------------------------------------......
  • Python 潮流周刊#57:Python 该采用日历版本吗?
    本周刊由Python猫出品,精心筛选国内外的250+信息源,为你挑选最值得分享的文章、教程、开源项目、软件工具、播客和视频、热门话题等内容。愿景:帮助所有读者精进Python技术,并增长职业和副业的收入。本期周刊分享了12篇文章,12个开源项目,赠书5本,全文2200字。以下是本期......
  • python3使用pyVmomi获取vCenter中虚拟机cpu/内存信息
    原创文档编写不易,未经许可请勿转载。文档中有疑问的可以邮件联系我。邮箱:[email protected]说明文章分享在Linux操作系统中安装python3环境,并通过pyVmomi获取vCenter中运行的虚拟机信息,最后把获取的虚拟机数据存储到数据库中。前三章为基础环境构建,第四章为脚本代码。环境软......
  • Python网络爬虫的实现,要想学编程,首先的把爬虫学好!
    爬虫实现1.爬虫初步实现(1)我们爬取中国化工市场机械网,以下为相关代码演示importrequestsfrombs4importBeautifulSoupres=requests.get(addresses[i])res.encoding='GB18030'#通过修改编码方式为GB18030,兼容网站编码(gb2312)#这里的'html.parser......
  • python: fucontion
     defsixteenFoo(k):"""十六進制:paramk::return:"""fstr=''ifk==10:fstr="A"elifk==11:fstr="B"elifk==12:fstr=&q......
  • .Net开发 ,Java开发 ,python开发 数据库开发,开发相关工具
    开发电脑安装相关工具软件Java开发Java编程、高级编程、面试题资料Python编程,网络编程,爬虫,面试,设计资料数据库mysql,sqlservice,sql优化资料C#语言、.Net开发、.Net高级编程资料......
  • 新闻管理与推荐系统Python+Django+协同过滤推荐算法+管理系统
    一、介绍新闻管理与推荐系统。本系统使用Python作为主要开发语言开发的一个新闻管理与推荐的网站平台。网站前端界面采用HTML、CSS、BootStrap等技术搭建界面。后端采用Django框架处理用户的逻辑请求,并将用户的相关行为数据保存在数据库中。通过Ajax技术实现前后端的数据通信。......
  • 一起学python,挑战年薪30w
     哈喽!各位小伙伴们大家好,以下是为大家准备的学习编程的方法和心得,让你挑战年薪30万不是梦!!!!!Python编程之旅:新手学习心得与方法探索踏入Python编程的世界,对我而言,既是一次全新的探索,也是一场与未来对话的冒险。在这个数字化飞速发展的时代,编程能力不再只是程序员的必备技能,它......
  • 【深度学习】python之人工智能应用篇——图像生成技术(二)
    说明:两篇文章根据应用场景代码示例区分,其他内容相同。图像生成技术(一):包含游戏角色项目实例代码、图像编辑和修复任务的示例代码和图像分类的Python代码示例图像生成技术(二):包含简化伪代码示例、使用GAN生成医学图像代码示例和使用GAN生成产品展示图代码示例图像生成是......