在 FastAPI 中,你可以使用装饰器来实现登录认证。以下是一个示例,演示如何创建一个自定义的登录认证装饰器,以确保只有授权的用户可以访问某些接口:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext
from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Optional
app = FastAPI()
# 用户数据库(模拟)
fake_users_db = {
"user1": {
"username": "user1",
"password_hash": "$2b$12$H2xj5hqWiRT.I1bXVevv4uZI1IopTAvw63i3kg0utS6B3id5hMkCG", # 密码是 "password123"
}
}
# 密码哈希工具
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 令牌配置
SECRET_KEY = "your_secret_key" # 需要替换为安全的密钥
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# OAuth2密码验证工具
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 用户模型
class User(BaseModel):
username: str
# 用户认证令牌模型
class Token(BaseModel):
access_token: str
token_type: str
# 用户登录表单
class UserInDB(BaseModel):
username: str
password_hash: str
# 用户登录请求模型
class UserLoginRequest(BaseModel):
username: str
password: str
# 生成访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 验证密码
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 获取用户信息
def get_user(username: str):
if username in fake_users_db:
user_dict = fake_users_db[username]
return User(**user_dict)
# 获取用户信息(用于令牌验证)
def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username)
if user is None:
raise credentials_exception
return user
# 登录认证装饰器
def authenticate_user(username: str, password: str):
user = get_user(username)
if user is None or not verify_password(password, user.password_hash):
return False
return True
# 登录接口
@app.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
if not authenticate_user(username, password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 受保护的资源,需要登录认证
@app.get("/protected-resource", response_model=User)
async def protected_resource(current_user: User = Depends(get_current_user)):
return current_user
在上述代码中,我们创建了一个自定义的登录认证装饰器 authenticate_user,它接受用户名和密码作为参数,并验证用户的凭据。如果用户验证成功,则返回 True,否则返回 False。接着,我们在登录接口 login_for_access_token 中使用这个装饰器来验证用户的凭据。如果用户凭据验证成功,我们生成了访问令牌,并将其返回给客户端。
受保护的资源接口 protected_resource 使用 Depends(get_current_user) 来进行登录认证,只有在用户成功登录后才能访问。这个接口返回当前登录用户的信息。
请注意,在实际应用中,你需要将用户的密码存储为安全的哈希值,而不是明文密码。示例中使用了密码哈希工具 passlib 来进行密码哈希和验证。此外,你还需要替换示例中的 SECRET_KEY 为一个真实的安全密钥,以确保令牌的安全性。
上述代码提供了一个简单的用户认证和令牌生成的示例,你可以根据自己的应用需求进行修改和扩展。
标签:username,登录,认证,access,token,user,str,password,装饰 From: https://www.cnblogs.com/yuezongke/p/17748299.html