异步版本 rediss.py
from fastapi import FastAPI, Depends,APIRouter
import redis.asyncio as aioredis
import uvicorn
from contextlib import asynccontextmanager
app = FastAPI()
# Redis 连接池配置
REDIS_URL = "redis://192.168.252.128:6379/0"
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化 Redis 客户端
app.state.redis = await aioredis.from_url(REDIS_URL)
yield
# 关闭 Redis 连接
await app.state.redis.close()
app.router.lifespan_context = lifespan
def get_redis_client():
return app.state.redis
router = APIRouter()
@router.get("/set/{key}/{value}")
async def set_key(key: str, value: str, redis: aioredis.Redis = Depends(get_redis_client)):
await redis.set(key, value)
return {"message": f"Key {key} set to {value}"}
@router.get("/get/{key}")
async def get_key(key: str, redis: aioredis.Redis = Depends(get_redis_client)):
value = await redis.get(key)
if value:
return {"key": key, "value": value.decode()}
else:
return {"message": f"Key {key} not found"}
app.include_router(router)
if __name__ == "__main__":
uvicorn.run("rediss:app", host="0.0.0.0", port=8089, reload=True, )
异步连接池版本
from fastapi import FastAPI, Depends, APIRouter
import aioredis
import uvicorn
from contextlib import asynccontextmanager
app = FastAPI()
# Redis 连接池配置
REDIS_URL = "redis://192.168.252.128:6379/0"
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化 Redis 连接池
app.state.redis_pool = await aioredis.from_url(REDIS_URL)
try:
yield
finally:
# 关闭 Redis 连接池
app.state.redis_pool.close()
await app.state.redis_pool.wait_closed()
app.router.lifespan_context = lifespan
async def get_redis_client():
async with app.state.redis_pool as redis:
return redis
router = APIRouter()
@router.get("/set/{key}/{value}")
async def set_key(key: str, value: str, redis: aioredis.Redis = Depends(get_redis_client)):
await redis.set(key, value)
return {"message": f"Key {key} set to {value}"}
@router.get("/get/{key}")
async def get_key(key: str, redis: aioredis.Redis = Depends(get_redis_client)):
value = await redis.get(key)
if value:
return {"key": key, "value": value.decode()}
else:
return {"message": f"Key {key} not found"}
app.include_router(router)
if __name__ == "__main__":
uvicorn.run("rediss:app", host="0.0.0.0", port=8089, reload=True, )
标签:get,Fastapi,app,redis,value,key,使用,router
From: https://www.cnblogs.com/qcy-blog/p/18544076