根据fastapi的示例,我们可以使用ouath2和json网络令牌为用户创建登录:
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return [{"item_id": "Foo", "owner": current_user.username}]
但是我们可以验证令牌将在30分钟后过期。
ACCESS_TOKEN_EXPIRE_MINUTES = 30
我想要的是一个令牌,每次用户向该应用程序中的任何端点发出请求时,该令牌都会将其过期时间延长当前时间 + 10 分钟。因此,我可以始终保持一段用户不活动状态,即使用户正在积极使用该应用程序,也不会突然注销。有没有一种方法可以做到这一点,而无需将用户存储在数据库中,而仅使用存储在客户端 Web 浏览器中的 jwts 来实现?解决此类问题的最佳实践是什么?每次更新授权标头
要求 ?设置cookies?请告知。
编辑:这是我尝试使用中间件来修改标头,但它仍然不起作用。
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
# Check if the token needs to be refreshed
expiration_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
if expiration_time - datetime.now(tz=timezone.utc) < timedelta(minutes=REFRESH_INTERVAL_MINUTES):
# Refresh the token with a new expiration time
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
new_access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return user, new_access_token
return user , token
async def get_current_active_user(
current_user_and_token: Annotated[User, Depends(get_current_user)]
):
current_user,_ = current_user_and_token
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
你提出的延长 JWT 过期时间的想法(每次请求都延长10分钟)在概念上与 JWT 的无状态性质相冲突。JWT 的一大优势在于它们是自包含的,这意味着你不需要在每次请求时都去数据库或其他地方验证它们。如果你开始修改每个请求的过期时间,那么你实际上就引入了需要某种状态管理的机制,这与 JWT 的初衷背道而驰。
更好的解决方案:
-
使用刷新令牌:
-
颁发一个短期的访问令牌(例如 30 分钟)和一个长期刷新令牌(例如 1 周或更长)。
- 每次请求时,检查访问令牌是否过期。
- 如果过期,则使用刷新令牌请求新的访问令牌。
- 刷新令牌的验证需要访问数据库或其他安全存储,以确保其有效性。
-
这种方法平衡了安全性和用户体验。
-
使用滑动窗口过期时间:
-
在服务器端设置一个配置项,例如 10 分钟。
- 每次请求时,检查访问令牌是否过期。
- 如果过期,但距离上次颁发时间不超过配置的滑动窗口时间,则自动刷新访问令牌并将其返回给客户端。
- 如果超过滑动窗口时间,则要求用户重新登录。
- 这种方法不需要刷新令牌,但仍然需要服务器端的状态管理。
代码示例(使用刷新令牌):
from datetime import datetime, timedelta
# ... other imports ...
REFRESH_TOKEN_EXPIRE_MINUTES = 1440 # 24 hours
# ... other code ...
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> dict:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
refresh_token = create_access_token(
data={"sub": user.username}, expires_delta=refresh_token_expires
)
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
@app.post("/refresh_token")
async def refresh_access_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
# 在这里,你应该验证刷新令牌是否在数据库或其他安全存储中
user = get_user(fake_users_db, username=username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
请注意:
- 以上代码示例仅供参考,你需要根据自己的实际情况进行修改和完善。
- 为了确保安全性,你需要将刷新令牌存储在安全的地方,例如数据库中。
- 在刷新访问令牌时,你需要验证刷新令牌的有效性,例如检查它是否已被撤销。
希望这些信息能帮到你!
标签:python,jwt,fastapi From: 78205950