首页 > 数据库 >FastAPI系列:异步redis

FastAPI系列:异步redis

时间:2024-02-28 19:12:26浏览次数:34  
标签:异步 lock app await redis FastAPI aioredis event

aioredis official website

Install

pip install aioredis

Connect to redis

from fastapi import FastAPI
import aioredis

app = FastAPI()

@app.on_event('startup')
async def startup_event():
    # 创建的是线程池对象,默认返回的结果为bytes类型,设置decode_responses表示解码为字符串
    app.state.redis_client  = aioredis.from_url('redis://xxx.xxx.xxx.xx/0',encoding="utf-8", decode_responses=True)
    # 通过aioredis.redis创建单个对象
    app.state.redis_client = aioredis.redis(host='xxx.xxx.xxx.xx')
    # 通过aioredis.redis创建连接池对象
    pool = ConnectionPool(host='xxx.xxx.xxx.xx', encoding="utf-8", decode_responses=True)
    app.state.redis_client = aioredis.redis(connection_pool=pool)


@app.on_event('shutdown')
async def shutdown_event():
    app.state.redis_client.close()

使用

@app.get('/index')
async def index():
    key = 'liuwei'
    # 设置键值对
    await app.state.redis_client.set(key, '123456')
    
    # 读取
    result = await app.state.redis_client.get(key)
    print(result)
    
    # 设置过期时间
    key2 = 'david'
    await app.state.redis_client.setex(name=key2, value='123', time=10)
    return {'msg': 'ok'}

# 管道操作
@app.get('/index2')
async def index2(request: Request):
    # 创建管道
    async with request.app.state.redis_client.pipeline(transaction=True) as pipe:
        # 批量进行管道操作
        ok1, ok2 = await (pipe.set('xiaoxiao', '测试数据').set('xiaoxiao2','测试数据2').execute())
    
    async with request.app.state.redis_client.pipeline(transaction=True) as pipe:
        cache1, cache2 = await (pipe.get('xiaoxiao').get('xiaoxiao2').execute())
        print(cache1, cache2)
        
    return {'msg': 'ok'}


if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app='main:app', host='0.0.0.0', port=8001, reload=True)

遇到的问题

1.python3.11版本,aioredis 2.0.1版本,redis 7.x版本
启动连接时会报一个TypeError: duplicate base class TimeoutError的错误
问了Copilot,说是兼容性问题,在 Python3.11 中,asyncio.TimeoutError 被移动到了 asyncio.exceptions 模块中,而 aioredis 库没有及时更新以适应这个变化。
所以我们找到aioredis目录下的exceptions.py文件,定位到14行代码
class TimeoutError(asyncio.TimeoutError, builtins.TimeoutError, RedisError):
    pass

所以我们修改为如下代码,即可运行
class TimeoutError(asyncio.exceptions.TimeoutError, RedisError):
    pass

发布订阅

from fastapi import FastAPI, Request
import asyncio
import async_timeout
import aioredis
from aioredis.client import Redis, PubSub

app = FastAPI()

# 定义事件消息模型
from pydantic import BaseModel

class MessageEvent(BaseModel):
    username: str
    message: dict

async def reader(channel: PubSub):
    while True:
        try:
            async with async_timeout.timeout(1):
                # 执行接收订阅消息
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None:
                    print(message)  # {'type': 'message', 'pattern': None, 'channel': 'channel:1', 'data': '{"username": "jack", "message": {"msg": "\\u5728startup_event\\u53d1\\u5e03\\u7684\\u4e8b\\u4ef6\\u6d88\\u606f"}}'}
                    # parse_raw将字符串转换为json
                    message_event = MessageEvent.parse_raw(message['data'])
                    print('订阅接收到的消息为:', message_event)
                await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            pass
        
@app.on_event('startup')
async def startup_event():
    #创建redis对象
    redis: Redis = aioredis.from_url('redis://139.196.220.98/0', encoding="utf-8", decode_responses=True)
    app.state.redis = redis
    #创建消息发布定义对象,获取发布订阅对象
    pubsub = redis.pubsub()
    #把当前的发布对象添加到全局app上下文中 
    app.state.pubsub = pubsub
    #把发布方法添加到全局app上下文中
    app.state.publish = redis.publish
    #开始订阅相关频道
    await pubsub.subscribe('channel:1', 'channel:2')
    #消息模型的创建
    event = MessageEvent(username='jack', message={'msg':'在startup_event发布的事件消息'})
    #把消息发布到channel:1频道上
    await redis.publish(channel='channel:1', message=event.json())
    #执行消息订阅循环监听
    asyncio.create_task(reader(pubsub))
    
@app.on_event('shutdown')
async def shutdown_event():
    pass
    #解除相关频道订阅
    app.state.pubsub.unsubscribe('channel:1','channel:2')
    #关闭redis连接
    app.state.redis.close()


@app.get('/index')
async def get(re:Request):
    #手动执行其他消息的发布
    event = MessageEvent(username='jack', message={'msg': '我是来自api发布的消息'})
    await re.app.state.publish(channel='channel:1', message=event.json())
    return {'msg': 'ok'}

分布式锁

import asyncio
import aioredis
from aioredis.lock import Lock

async def redis_lock():
    # 创建客户端
    r = aioredis.from_url('redis://xxx.xxx.xxx.xx/0', encoding="utf-8", decode_responses=True)
    
    # 定义获取锁对象,设置锁的超时时间
    """
    redis:客户端对象
    lock_name:锁名称
    timeout:锁过期时间,单位秒
    sleep:表示锁被某个客户端对象拥有而其他客户端想获取锁时,每次循环迭代检测锁状态的休眠时间,默认为0.1
    blocking_timeout:表示客户端在阻塞状态下尝试获取锁需要花费的时间,为None时表示无限制
    lock_class:表示锁定实现类
    thread_local: 表示当前锁的令牌是否存储在本地线程中,默认为True
    
    """
    def get_lock(redis, lock_name, timeout=10, sleep=0.2, blocking_timeout=None, lock_class=Lock, thread_local=True):
        return redis.lock(name=lock_name, timeout=timeout, sleep=sleep, blocking_timeout=blocking_timeout, lock_class=lock_class, thread_local=thread_local)
    
    # 实例化一个锁对象
    lock = get_lock(redis=r, lock_name='xiaozhong')
    # blocking为False, 则不再阻塞,直接返回结果
    lock_acquire = await lock.acquire(blocking=False) # blocking=False非阻塞
    if lock_acquire:
        #开始上锁
        is_locked = await lock.locked()
        if is_locked:
            print('执行业务逻辑处理')
            #锁的token
            token = lock.local.token
            #表示当前页面所需时间
            await asyncio.sleep(15)
            #判断当前锁是否是自己的锁
            await lock.owned()
            #增加过期时间
            await lock.extend(10)
            #表示获取锁当前的过期时间
            await r.pttl(name='jack')
            print('客户端锁的签名:', token)
            await lock.reacquire()
            #锁的释放
            await lock.release()
    
if __name__ == '__main__':
    asyncio.run(redis_lock())
    
    

标签:异步,lock,app,await,redis,FastAPI,aioredis,event
From: https://www.cnblogs.com/weiweivip666/p/18041474

相关文章

  • FastAPI系列:fastapi定制的数据库操作库sqlmodel
    官网sqlmodel安装#安装sqlmodel会自动安装pydantic和sqlalchemypipinstallsqlmodel使用#步骤1,创建sqlmodel引擎fromsqlmodelimportcreate_engine#driver://用户名:密码@ip/数据库engine=create_engine("mysql+mysqldb://root:123456@localhost/api")#步骤......
  • FastAPI系列:自定义认证
    fromtypingimportOptional,TuplefromfastapiimportFastAPI,RequestfrompydanticimportBaseModel#通过starlette.authentication导入AuthenticationBackendfromstarlette.authenticationimportAuthenticationBackend,AuthenticationError,AuthCredentials,S......
  • FastAPI系列:依赖注入
    函数式依赖项fromfastapiimportFastAPIfromfastapiimportQuery,Dependsfromfastapi.exceptionsimportHTTPExceptionapp=FastAPI()defusername_check(username:str=Query(...)):ifusername!='zhong':raiseHTTPException(status_code......
  • FastAPI系列:环境配置读取
    依赖包pipinstallpython-dotenv使用#.env文件ADMIN_EMAIL="deadpool@example.com"APP_NAME="ChimichangApp"#config.pyfrompydantic_settingsimportBaseSettingsclassSettings(BaseSettings):app_name:str="AwesomeAPI"......
  • FastAPI系列:后台任务进程
    注:后台任务应附加到响应中,并且仅在发送响应后运行用于将单个后台任务添加到响应中fromfastapiimportFastAPIfromfastapi.responsesimportJSONResponsefromstarlette.backgroundimportBackgroundTaskfrompydanticimportBaseModelapp=FastAPI()classUser(B......
  • FastAPI系列:中间件
    中间件介绍中间件是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应返回之前工作装饰器版中间件1.必须使用装饰器@app.middleware("http"),且middleware_type必须为http2.中间件参数:request,call_next,且call_next它将接收request作为参数@app.middleware("h......
  • FastAPI系列:模型用法
    模型基本用法frompydanticimportBaseModelclassItem(BaseModel):#通过继承BaseModelname:strprice:floatis_offer:Union[bool,None]=None常用的模型属性和方法dict()#将数据模型的字段和值封装成字典json()#将数据模型的字段和值封装成json格......
  • FastAPI系列:上传文件File和UploadFile
    上传文件#file仅适用于小文件@app.post("/files/")asyncdefcreate_file(file:bytes|None=File(default=None)):ifnotfile:return{"message":"Nofilesent"}else:return{"file_size":len(file)}......
  • FastAPI系列:路径参数额外校验Path
    路径参数额外校验PathfromfastapiimportPathapp=FastAPI()@app.get('/items/{item_id}')asyncdefread_items(item_id:str=Path(default=None,max_length=3,min_length=1,title='theidofitemtoget')):"""def......
  • FastAPI系列:查询字符串参数
    单个查询字符串@app.get('/index/{username}')defindex(username:str,id:int):#id为查询字符串?id=5return{"message":"success","username":username,"id":id}可选的查询字符串参数@app.get('/items/{item_id}......