/Users/song/Code/redis_learn/le00.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import asyncio
from redis import Redis,AuthenticationError,TimeoutError
REDIS_OPEN: bool = False
REDIS_HOST: str = '127.0.0.1'
REDIS_PORT: int = 6379
REDIS_PASSWORD: str = ''
REDIS_DATABASE: int = 0
REDIS_TIMEOUT: int = 5
class RedisCli(Redis):
def __init__(self):
super(RedisCli, self).__init__(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=REDIS_DATABASE,
socket_timeout=REDIS_TIMEOUT,
decode_responses=True # 转码 utf-8
)
async def init_redis_connect(self):
"""
触发初始化连接
:return:
"""
try:
self.ping()
except TimeoutError:
print("连接redis超时")
sys.exit()
except AuthenticationError:
print("连接redis认证失败")
sys.exit()
except Exception as e:
print('连接redis异常 {}', e)
sys.exit()
async def conn_redis(redis_client):
await redis_client.init_redis_connect()
if __name__=="__main__":
# 创建redis连接对象
redis_client = RedisCli()
asyncio.run(conn_redis(redis_client))
redis_client.set('name','alice')
res= redis_client.get('name')
print(res)
# 关闭redis连接
redis_client.close()
```
# requirements.txt
async-timeout==4.0.2
redis==4.5.1
标签:__,python,redis,REDIS,init,client,连接
From: https://www.cnblogs.com/zhuoss/p/17113583.html