首页 > 系统相关 >记一次python消费kafka进程持续消耗内存问题

记一次python消费kafka进程持续消耗内存问题

时间:2023-07-03 16:55:10浏览次数:43  
标签:__ python kafka gc 内存 线程 logger

前提:python写了一个kafka消费的脚本,脚本中消费kafka消息并将消费到的数据放在一个线程池中进行业务代码处理,使用supervisor管理这个脚本进程

遇到问题:这个进程占用的内存会越来越大,知道将机器内存消耗完

排查:网上找了一堆内存分析工具,好像都需要预埋代码,或者重新启动一个进程,全扯淡。

解决:读取kafka消息的代码添加一个计数器,没消费一个kafka消息加1,累计处理10000次请求,执行一次线程池重启和主动gc

import gc
from concurrent.futures import ThreadPoolExecutor

def process_message(args):
    pass

def consume_kafka_data(topic):
    kafka_servers = []
    consumer = KafkaConsumer(topic, bootstrap_servers=kafka_servers, auto_offset_reset='latest', enable_auto_commit=False)
    for message in consumer:
        yield message.value

if __name__ == "__main__":
    topic = ""
    executor = ThreadPoolExecutor(max_workers=10)
    for value in consume_kafka_data(topic):
        try:
            args = json.loads(value.decode('utf-8'))
            executor.submit(process_message, args)
            if index > 100000:
                index = 0
                logger.info("关闭线程池")
                executor.shutdown(wait=True)
                logger.info("gc 回收")
                gc.collect()
                logger.info("开启线程池")
                executor = ThreadPoolExecutor(max_workers=10)
            index += 1
        except Exception as exception:
            logger.exception(exception)

 

标签:__,python,kafka,gc,内存,线程,logger
From: https://www.cnblogs.com/gangdou/p/17523283.html

相关文章

  • python基础35 网络编程 软件开发架构和七层协议
    软件开发架构网络编程我们要基于网络来编写一款B/S或者C/S架构的软件,比如:ATM,我们写的只是ATM的单机版本,没有接入网络系统,别人无法访问到的目的以ATM为例,现在我们想把之前写的ATM系统编程基于网络传输的,别人如果想用,就必须把客户端下载到本地电脑上,已登录为例,用户把用......
  • Python之Mixins机制
    Mixins机制classVehicle:#交通工具passclassFlyMinix:"""将主类中有多个子类要继承的方法单独拿出来,重新定义一个类,将这个有给有需要的子类继承,在主类中不再写这个方法"""deffly(self):"""跟飞行相关的功能......
  • python 文件操作
    文件介绍文本文件可以使用记事本软件打开txt,py,md,json二进制文件不能使用记事本软件打开音频文件mp3视频文件mp4....图片png,jpg,gif,exe文件操作打开文件open()rwa是⽂本⽅式打开,适⽤于⽂本⽂件,会对⼆进制进⾏编码转换rbwbab是⼆进制......
  • python configparser模块的应用
    1、获取所有sectionimportconfigparser config=configparser.ConfigParser()config .read('文件所在的路径',encoding='utf-8')ret=config .sections()print(ret) 2、获取指定section下所有的键值对importconfigparser config=configparser.ConfigParser()......
  • python pytest 参数化的几种方式
    在pytest框架中,可以使用多种方式进行参数化测试。以下是一些常见的参数化方式及其示例:使用@pytest.mark.parametrize装饰器:可以使用pytest提供的@pytest.mark.parametrize装饰器来指定参数化测试的参数。下面是一个示例:[email protected]("num,expecte......
  • Python 转 Byte
    Python转Byte:详细介绍Python是流行的编程语言之一,拥有广泛的用途。在Python中,我们经常需要将数据从一种格式转换为另一种格式。其中一个转换的方式是将Python对象转换为字节数据,或者反过来。这篇文章将深入介绍Python转换为字节数据。什么是Byte?在计算机科学中,字......
  • python pytest框架文件结构
    pytest框架的文件结构相对灵活,没有强制要求特定的文件结构。然而,以下是一种常见的pytest框架文件结构示例:测试文件:测试文件是编写测试用例的主要部分,通常以test_开头,并以.py为扩展名。例如,test_example.py。测试函数:在测试文件中,测试用例通常以函数的形式存在。测试函数的命......
  • 关于python 跨域处理方式详解
    关于Python跨域处理方式详解跨域是指在浏览器中,一个网页的脚本试图访问另一个网页的脚本时,由于浏览器的同源策略,会出现跨域问题。Python作为一种常用的后端语言,也需要处理跨域问题。本文将详细讲解Python跨域处理的方式。什么是跨域在浏览器中,同源策略是一种安全机制,它限制了一......
  • Python错误:selenium自带click方法点击不到元素
    问题描述:selenium自带click方法,有的时候不好用,元素定位到了,但是就是点不上。 解决办法:原因分析:点击不到元素!解决办法:(1).selenium自带的click()方法:fromseleniumimportwebdriverel=driver.find_element(By.ID,ID)#找到元素el.click()#执行点击缺陷:不稳定......
  • python 实现 ctrl + 左键 单击刷新网页
    importsysfrompywinauto.applicationimportApplicationimportmouseimportkeyboardimporttimeclassApp(object):def__init__(self,pid):app=Application(backend='uia').connect(process=int(pid))self.win=app.top_w......