【一】OAuth2.0的授权模式
-
授权码授权模式(Authorization Code Grant)
-
隐式授权模式(Implicit Grant)
-
密码授权模式(Resource Owner Password Credentials Grant)
-
客户端凭证授权模式(Client Credentials Grant)
【二】密码授权模式
【1】FastAPI 的 OAuth2PasswordBearer说明
- OAuth2PasswordBearer是接收URL作为参数的一个类:
- 客户端会向该URL发送username和password参数,然后得到一个Token值
- OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
- 当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
【2】定义Token请求地址
# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")
【3】定义获取token视图
@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
return {"token": token}
【4】定义登陆视图
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from starlette import status
app06 = APIRouter()
"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""
"""
OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值
OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
"""
# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")
@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
return {"token": token}
"""基于 Password 和 Bearer token 的 OAuth2 认证"""
# 模拟数据库
fake_users_db = {
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
# 模拟权限 : 未激活用户 无权限
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
# 模拟权限 : 激活用户,有权限
"disabled": True,
},
}
# 模拟加密密码操作
def fake_hash_password(password: str):
return "fakehashed" + password
# 创建用户模型类
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
# 创建 用户登入传入的数据类
class UserInDB(User):
hashed_password: str
# 定义登陆视图
@app06.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 根据输入的用户名 从数据库中获取到 用户数据
user_dict = fake_users_db.get(form_data.username)
# 用户不存在
if not user_dict:
# 抛出异常 , 用户不存在
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
# 用户存在,将 查询到的用互数据进行校验
user = UserInDB(**user_dict)
# 校验 输入的密码是否正确 —(先对密码进行加密,再比对)
hashed_password = fake_hash_password(form_data.password)
# 如果 加密后的密码 不等于 数据库查询到的用户对应的密码
if not hashed_password == user.hashed_password:
# 抛出异常,登陆失败
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
# 用户登陆成功 返回用户名和token等信息
return {"access_token": user.username, "token_type": "bearer"}
# 模拟校验用户信息视图
def get_user(db, username: str):
'''
:param db: 数据库对象
:param username: token字符串
:return:
'''
# 判断当前token字符串是否存在于数据库当中
if username in db:
# 存在数据库当中,取出详细的用户信息返回
user_dict = db[username]
return UserInDB(**user_dict)
# 验证token
def fake_decode_token(token: str):
# 传入 token 字符串
# 从数据库中查询 当前登录对象
user = get_user(fake_users_db, token)
# 返回校验通过后的用户对象
return user
# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_schema)):
# 根据当前用户传入的 用户名和密码 自动签发token
# 校验签发的token是否正确
user = fake_decode_token(token)
# token不正确
if not user:
# 抛出异常
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
# OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
headers={"WWW-Authenticate": "Bearer"},
)
# token正确,返回用户信息
return user
# 获取激活用户信息
async def get_current_active_user(current_user: User = Depends(get_current_user)):
# 依赖于上一步 用户已经登陆,并且已经是当前登录的用户对象
# 校验 disabled 字段,校验是否处于激活状态
if current_user.disabled:
# 未激活抛出异常
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
# 激活返回用户对象
return current_user
# 定义视图
@app06.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
# 只有当前用户是登陆且激活的用户才能被返回
return current_user
【5】发起请求
(1)Oauth2 密码模式认证
- 点击 锁 弹出表单提示框,输入用户名和密码
- 登陆成功
- 点击 close 我们可以发现 原来的锁 锁上了
- 表示我们已经获取到了 token
- 直接发起请求 也能正确的响应我们的数据
(2)登陆
- 用户一:john snow
- 用户二:alice
- 登陆失败
(3)获取激活用户的信息
-
先登录激活用户
- 正确响应信息
-
登陆未激活用户
- 提示未认证
【三】JWT认证(JSON Web Tokens)
【1】简单引入
- 详细的JWT认证请见我其他博客文章
【2】定义视图
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
app06 = APIRouter()
"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""
"""
OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值
OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
"""
# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")
@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
return {"token": token}
"""基于 Password 和 Bearer token 的 OAuth2 认证"""
# 模拟数据库
fake_users_db = {
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
# 模拟权限 : 未激活用户 无权限
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
# 模拟权限 : 激活用户,有权限
"disabled": True,
},
}
# 模拟加密密码操作
def fake_hash_password(password: str):
return "fakehashed" + password
# 创建用户模型类
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
# 创建 用户登入传入的数据类
class UserInDB(User):
hashed_password: str
# 定义登陆视图
@app06.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 根据输入的用户名 从数据库中获取到 用户数据
user_dict = fake_users_db.get(form_data.username)
# 用户不存在
if not user_dict:
# 抛出异常 , 用户不存在
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
# 用户存在,将 查询到的用互数据进行校验
user = UserInDB(**user_dict)
# 校验 输入的密码是否正确 —(先对密码进行加密,再比对)
hashed_password = fake_hash_password(form_data.password)
# 如果 加密后的密码 不等于 数据库查询到的用户对应的密码
if not hashed_password == user.hashed_password:
# 抛出异常,登陆失败
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
# 用户登陆成功 返回用户名和token等信息
return {"access_token": user.username, "token_type": "bearer"}
# 模拟校验用户信息视图
def get_user(db, username: str):
'''
:param db: 数据库对象
:param username: token字符串
:return:
'''
# 判断当前token字符串是否存在于数据库当中
if username in db:
# 存在数据库当中,取出详细的用户信息返回
user_dict = db[username]
return UserInDB(**user_dict)
# 验证token
def fake_decode_token(token: str):
# 传入 token 字符串
# 从数据库中查询 当前登录对象
user = get_user(fake_users_db, token)
# 返回校验通过后的用户对象
return user
# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_schema)):
# 根据当前用户传入的 用户名和密码 自动签发token
# 校验签发的token是否正确
user = fake_decode_token(token)
# token不正确
if not user:
# 抛出异常
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
# OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
headers={"WWW-Authenticate": "Bearer"},
)
# token正确,返回用户信息
return user
# 获取激活用户信息
async def get_current_active_user(current_user: User = Depends(get_current_user)):
# 依赖于上一步 用户已经登陆,并且已经是当前登录的用户对象
# 校验 disabled 字段,校验是否处于激活状态
if current_user.disabled:
# 未激活抛出异常
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
# 激活返回用户对象
return current_user
# 定义视图
@app06.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
# 只有当前用户是登陆且激活的用户才能被返回
return current_user
"""OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证"""
# 模拟数据库
fake_users_db.update({
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
})
####配置
# 生成秘钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # 生成密钥 openssl rand -hex 32
ALGORITHM = "HS256" # 算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 访问令牌过期分钟
# 模拟 Token 库
class Token(BaseModel):
"""返回给用户的Token"""
access_token: str
token_type: str
# 加密密码的方法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 签发 token 的方法
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/jwt/token")
# 校验密码
def verity_password(plain_password: str, hashed_password: str):
"""对密码进行校验"""
return pwd_context.verify(plain_password, hashed_password)
# 获取签发认证后的用户
def jwt_get_user(db, username: str):
# 判断用户是否存在数据库中
if username in db:
# 存在数据库中则返回用户信息
user_dict = db[username]
# 返回用户对象
return UserInDB(**user_dict)
# JWT 认证用户
def jwt_authenticate_user(db, username: str, password: str):
# 首先获取到签发认证的用户
user = jwt_get_user(db=db, username=username)
if not user:
# 签发不成功,返回失败
return False
# 签发成功则校验密码
if not verity_password(plain_password=password, hashed_password=user.hashed_password):
return False
# 密码正确返回用户对象
return user
# 签发token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
# 先将 数据 copy 一份
to_encode = data.copy()
# 过期时间存在则取出
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# 默认时间 15 分钟
expire = datetime.utcnow() + timedelta(minutes=15)
# 更新过期时间
to_encode.update({"exp": expire})
# 加密签发token
# claims 原始数据
# key 秘钥
# algorithm 算法
encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
# 返回签发好的 token
return encoded_jwt
# 定义视图,登录签发 token , 定义响应数据体 Token
@app06.post("/jwt/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 认证用户
user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
if not user:
# 认证不通过则抛出异常
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 认证过期时间
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 签发token
access_token = create_access_token(
# 默认的参数 是 sub
data={"sub": user.username}, expires_delta=access_token_expires
)
# 返回签发成功的token
return {"access_token": access_token, "token_type": "bearer"}
# 创建依赖 --- 获取当前登录的用户
async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
# 定义一个异常对象,方便下面多次调用
credentials_exception = HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 对 token 进行解码
payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
# 获取到 解码后的数据中的 用户名
username = payload.get("sub")
# 用户名不存在
if username is None:
# 抛出异常
raise credentials_exception
except JWTError:
# JWT 认证失败 则抛出异常
raise credentials_exception
# 获取到签发认证成功后的用户对象
user = jwt_get_user(db=fake_users_db, username=username)
if user is None:
raise credentials_exception
return user
# 创建依赖 --- 获取当前登录的激活后的用户
async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user
# 定义登陆视图
@app06.get("/jwt/users/me")
async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
return current_user
【3】发起请求
(1)登陆签发token
- 正确登录
- 失败登录
(2)登陆用户获取信息
- 未登录认证
-
认证 激活 用户
- 获取当前用户信息
-
认证 未激活 用户
- 原理同上