首页 > 编程问答 >这段代码是否保证Python对象被立即删除?

这段代码是否保证Python对象被立即删除?

时间:2024-07-23 07:35:30浏览次数:8  
标签:python garbage-collection

我正在将 Redis 异步客户端与 Celery 一起使用,但在两者之间的集成方面遇到了一些问题。上下文是我需要删除 redis.Redis 实例(在构造函数中创建)以便关闭连接(该对象有一个 close 方法,但当 asyncio 事件循环关闭时我无法使用它,heal_client 方法仅在这些情况下才会使用。 我的代码如下:

import redis.asyncio as redis

from app.infra.services.notifications import INotifier
from app.schemas.notification import NotificationMessage
from app.config import settings

        
class RedisNotifier(INotifier):
    def __init__(self):
        self.redis_client = redis.Redis(
        host=settings.REDIS_PUBSUB_HOST,
        port=settings.REDIS_PUBSUB_PORT,
        password=settings.REDIS_PUBSUB_PASSWORD,
    )

    async def send_notification(self, room: str, notification: NotificationMessage) -> None:
        await self.redis_client.publish(room, notification.json())

    def heal_client(self) -> None:
        import gc
        
        del self.redis_client
        gc.collect()
        self.redis_client.close()
        self.redis_client = redis.Redis(
            host=settings.REDIS_PUBSUB_HOST,
            port=settings.REDIS_PUBSUB_PORT,
            password=settings.REDIS_PUBSUB_PASSWORD,
        )
        return None


redis_notifier = RedisNotifier()

# NOTIFIER is a global constant. Changing its value here will change the 
# notifier used throughout the entire project.
NOTIFIER = redis_notifier

我的问题是调用heal_client方法是否总是保证redis.Redis对象(在构造函数中创建)将被立即删除? 这就是我通常使用该方法的方式:

@celery_app.task(
    retry_backoff=True,
    max_retries=5,
    acks_late=True,
    name="charlie-update-session",  # bind=True, max_retries=2, acks_late=True,
)
def update_session(username: str, current_id: int, type_action: str):
    '''
    To avoid unexpected behaivor in the redis asynchronous client for notifications, it is
    necessary that all tasks use the same event loop. Celery prefork with concurrency 1
    guarantees a single OS Thread per worker, so we can use the same event loop associated
    with the Thread.
    '''
    log.info(f"[INFO]: A process of Update session was initiated. User: {username}")
    start = timeit.default_timer()
    loop = get_event_loop() 
    if loop.is_closed():
        loop = new_event_loop()
        set_event_loop(loop)
        if isinstance(NOTIFIER, RedisNotifier):
            NOTIFIER.heal_client()
    print(id(loop))
    task = loop.create_task(
        execute_update_data(
            username=username, current_id=current_id, type_action=type_action
        )
    )
    loop.run_until_complete(task)
    stop = timeit.default_timer()

不, heal_client 方法中的代码不能保证 redis.Redis 对象会被立即删除。以下是原因和解决方法:

为什么不能保证立即删除:

  1. Python 的垃圾回收机制: Python 使用垃圾回收机制来自动管理内存。 del self.redis_client 语句只是删除了 self.redis_client 对对象的引用,而当一个对象没有任何引用时,它才会被垃圾回收机制标记为可回收。
  2. gc.collect() 的局限性: gc.collect() 强制进行垃圾回收,但这只是一个建议,Python 解释器不一定立即执行。
  3. 异步操作: redis.Redis.close() 是一个异步操作,它不会立即关闭连接,而是在事件循环的下一个周期中执行。

潜在问题:

  • 资源泄漏: 如果 redis.Redis 对象没有及时关闭,可能会导致 Redis 服务器上的连接泄漏。
  • 程序崩溃: heal_client 方法中,你试图在删除对象后调用 self.redis_client.close() ,这会导致 AttributeError 错误。

解决方法:

  1. 使用 async with 语句: 最佳实践是在异步函数中使用 async with 语句来创建和管理异步客户端连接。这将确保连接在函数结束时被正确关闭,即使出现异常。

python async def send_notification(self, room: str, notification: NotificationMessage) -> None: async with redis.Redis( host=settings.REDIS_PUBSUB_HOST, port=settings.REDIS_PUBSUB_PORT, password=settings.REDIS_PUBSUB_PASSWORD, ) as redis_client: await redis_client.publish(room, notification.json())

  1. 在事件循环关闭时关闭连接: 如果你需要在事件循环关闭时手动关闭连接,可以使用 loop.run_until_complete(self.redis_client.close())

```python async def close_connections(self): await self.redis_client.close()

loop = asyncio.get_event_loop() try: # ... your code ... finally: loop.run_until_complete(close_connections()) loop.close() ```

改进后的 RedisNotifier 类:

import redis.asyncio as redis

class RedisNotifier:
    def __init__(self):
        self.redis_host = settings.REDIS_PUBSUB_HOST
        self.redis_port = settings.REDIS_PUBSUB_PORT
        self.redis_password = settings.REDIS_PUBSUB_PASSWORD

    async def send_notification(self, room: str, notification: NotificationMessage) -> None:
        async with redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            password=self.redis_password,
        ) as redis_client:
            await redis_client.publish(room, notification.json())

    async def close_connection(self):
        # You might need to adjust this depending on how you're managing the connection
        if hasattr(self, '_redis_client'): 
            await self._redis_client.close() 

update_session 任务中:

async def execute_update_data(...):
    # ... your code ...

@celery_app.task(...)
def update_session(...):
    # ... your code ...
    loop = asyncio.get_event_loop() 
    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(execute_update_data(...))
    finally:
        if isinstance(NOTIFIER, RedisNotifier):
            loop.run_until_complete(NOTIFIER.close_connection())
        loop.close()

通过使用 async with 或在事件循环关闭时显式关闭连接,可以确保及时释放资源并避免潜在问题。

标签:python,garbage-collection
From: 78781059

相关文章

  • python selenium 行为错误:AttributeError:“Context”对象没有属性“driver”
    我正在使用pythonselenium与Behavior包一起工作。这是代码:@given('theuserisontheloginpage')defstep_given_user_on_login_page(context):PATH='C:/Users/PycharmProjects/ui_test/chromedriver-win32/chromedriver.exe'context.driver=......
  • python 脚本中的路点用于处理大数据集
    我编写了一个脚本,将一堆来自api的请求写入csv文件。该api中有数千个请求,并且在结束循环/退出程序之前永远不会结束。如何合并航路点,以便如果再次发生停顿,它会自动继续前进并最终打印所有请求?我尝试了一些不起作用的方法,但我不知道下一步该怎么做。以下是使用航路点......
  • Python 中的 SSL 模块不可用(在 OSX 上)
    我在OSX10.13上的virtualenv中运行时遇到问题。我已经运行了pipinstall并且路径brewinstallopenssl指向/usr/local/include/openssl有谁知道如何解决这一问题?在我重新安装../opt/openssl/include/openssl使用python后,这种......
  • AWS Elastic Beanstalk chown PythonPath 错误
    我正在AWS的elasticbeanstalk上部署一个Web应用程序,遇到了同样的错误:[StageApplication].Stoprunningthecommand.Error:chown/var/app/staging/venv/bin/python:nosuchfileordirectory.我在我的环境配置中看到属性:PYTHONPATH:/var/......
  • Python:支持索引的内存对象数据库?
    我正在做一些数据整理,如果我可以将一堆字典放入内存数据库中,然后对其运行简单的查询,这会简单得多。例如,类似:people=db([{"name":"Joe","age":16},{"name":"Jane","favourite_color":"red"},])over_16=db.filter(age__g......
  • 如何构建一维数组的二维数组的特定 Python 结构?
    如何构建一维数组(即行向量)的二维数组的特定结构以满足特定我正在维护的遗留程序的结构?我可以在此结构中生成正确的内容all_measurements[:12]array([[0.,0.,0.,2.],[0.02,0.334,0.04,2.24],[0.04,0.668,0.08,2.48],...........
  • 如何使用 Python Flask 将新的咖啡馆(元素)添加到数据库(SQLite)?
    这是我的代码:@app.route("/add",methods=["POST"])defpost_new_cafe():new_cafe=Cafe(name=request.form.get("name"),map_url=request.form.get("map_url"),img_url=request.form.get("img......
  • 使用 tkinter 为 python 创建 GUI 时如何解决语法错误?
    我是一名Python初学者,决定使用tkinter制作一个小型GUI,该GUI接受用户(潜在餐馆)的3个输入,然后单击按钮后随机输出其中一家餐馆。我不断收到语法错误,并认为它与我的buttonfunc或调用它的命令有关。此代码尚未包含在GUI上输出餐厅的任何位置。任何帮助将不胜感激#Pyth......
  • 在 python 中打开 gnome 终端立即显示为僵尸
    作为背景,我正在编写一个脚本来训练多个pytorch模型。我有一个训练脚本,我希望能够在gnome终端中作为子进程运行。这样做的主要原因是我可以随时关注训练进度。如果我可能有多个GPU,我想在单独的窗口中多次运行我的训练脚本。为了实现这一点,我一直在使用popen。以下代码用于打......
  • python threading.Condition 的意外行为
    我正在尝试同步多个线程。我期望使用threading.Condition和threading.Barrier时的脚本输出大致相同,但事实并非如此。请解释一下为什么会发生这种情况。一般来说,我需要线程在一个无限循环中执行工作(一些IO操作),但是每个循环都是以主线程的权限开始的,而权限是仅在......