我正在将 Redis 异步客户端与 Celery 一起使用,但在两者之间的集成方面遇到了一些问题。上下文是我需要删除 redis.Redis 实例(在构造函数中创建)以便关闭连接(该对象有一个 close 方法,但当 asyncio 事件循环关闭时我无法使用它,heal_client 方法仅在这些情况下才会使用。 我的代码如下:
import redis.asyncio as redis
from app.infra.services.notifications import INotifier
from app.schemas.notification import NotificationMessage
from app.config import settings
class RedisNotifier(INotifier):
def __init__(self):
self.redis_client = redis.Redis(
host=settings.REDIS_PUBSUB_HOST,
port=settings.REDIS_PUBSUB_PORT,
password=settings.REDIS_PUBSUB_PASSWORD,
)
async def send_notification(self, room: str, notification: NotificationMessage) -> None:
await self.redis_client.publish(room, notification.json())
def heal_client(self) -> None:
import gc
del self.redis_client
gc.collect()
self.redis_client.close()
self.redis_client = redis.Redis(
host=settings.REDIS_PUBSUB_HOST,
port=settings.REDIS_PUBSUB_PORT,
password=settings.REDIS_PUBSUB_PASSWORD,
)
return None
redis_notifier = RedisNotifier()
# NOTIFIER is a global constant. Changing its value here will change the
# notifier used throughout the entire project.
NOTIFIER = redis_notifier
我的问题是调用heal_client方法是否总是保证redis.Redis对象(在构造函数中创建)将被立即删除? 这就是我通常使用该方法的方式:
@celery_app.task(
retry_backoff=True,
max_retries=5,
acks_late=True,
name="charlie-update-session", # bind=True, max_retries=2, acks_late=True,
)
def update_session(username: str, current_id: int, type_action: str):
'''
To avoid unexpected behaivor in the redis asynchronous client for notifications, it is
necessary that all tasks use the same event loop. Celery prefork with concurrency 1
guarantees a single OS Thread per worker, so we can use the same event loop associated
with the Thread.
'''
log.info(f"[INFO]: A process of Update session was initiated. User: {username}")
start = timeit.default_timer()
loop = get_event_loop()
if loop.is_closed():
loop = new_event_loop()
set_event_loop(loop)
if isinstance(NOTIFIER, RedisNotifier):
NOTIFIER.heal_client()
print(id(loop))
task = loop.create_task(
execute_update_data(
username=username, current_id=current_id, type_action=type_action
)
)
loop.run_until_complete(task)
stop = timeit.default_timer()
不,
heal_client
方法中的代码不能保证
redis.Redis
对象会被立即删除。以下是原因和解决方法:
为什么不能保证立即删除:
-
Python 的垃圾回收机制:
Python 使用垃圾回收机制来自动管理内存。
del self.redis_client
语句只是删除了self.redis_client
对对象的引用,而当一个对象没有任何引用时,它才会被垃圾回收机制标记为可回收。 -
gc.collect()
的局限性:gc.collect()
强制进行垃圾回收,但这只是一个建议,Python 解释器不一定立即执行。 -
异步操作:
redis.Redis.close()
是一个异步操作,它不会立即关闭连接,而是在事件循环的下一个周期中执行。
潜在问题:
-
资源泄漏:
如果
redis.Redis
对象没有及时关闭,可能会导致 Redis 服务器上的连接泄漏。 -
程序崩溃:
在
heal_client
方法中,你试图在删除对象后调用self.redis_client.close()
,这会导致AttributeError
错误。
解决方法:
-
使用
async with
语句: 最佳实践是在异步函数中使用async with
语句来创建和管理异步客户端连接。这将确保连接在函数结束时被正确关闭,即使出现异常。
python
async def send_notification(self, room: str, notification: NotificationMessage) -> None:
async with redis.Redis(
host=settings.REDIS_PUBSUB_HOST,
port=settings.REDIS_PUBSUB_PORT,
password=settings.REDIS_PUBSUB_PASSWORD,
) as redis_client:
await redis_client.publish(room, notification.json())
-
在事件循环关闭时关闭连接:
如果你需要在事件循环关闭时手动关闭连接,可以使用
loop.run_until_complete(self.redis_client.close())
。
```python async def close_connections(self): await self.redis_client.close()
loop = asyncio.get_event_loop() try: # ... your code ... finally: loop.run_until_complete(close_connections()) loop.close() ```
改进后的
RedisNotifier
类:
import redis.asyncio as redis
class RedisNotifier:
def __init__(self):
self.redis_host = settings.REDIS_PUBSUB_HOST
self.redis_port = settings.REDIS_PUBSUB_PORT
self.redis_password = settings.REDIS_PUBSUB_PASSWORD
async def send_notification(self, room: str, notification: NotificationMessage) -> None:
async with redis.Redis(
host=self.redis_host,
port=self.redis_port,
password=self.redis_password,
) as redis_client:
await redis_client.publish(room, notification.json())
async def close_connection(self):
# You might need to adjust this depending on how you're managing the connection
if hasattr(self, '_redis_client'):
await self._redis_client.close()
在
update_session
任务中:
async def execute_update_data(...):
# ... your code ...
@celery_app.task(...)
def update_session(...):
# ... your code ...
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(execute_update_data(...))
finally:
if isinstance(NOTIFIER, RedisNotifier):
loop.run_until_complete(NOTIFIER.close_connection())
loop.close()
通过使用
async with
或在事件循环关闭时显式关闭连接,可以确保及时释放资源并避免潜在问题。