首页 > 其他分享 >【11.0】Fastapi的OAuth2.0的授权模式

【11.0】Fastapi的OAuth2.0的授权模式

时间:2023-10-01 15:45:41浏览次数:38  
标签:username OAuth2.0 return Fastapi 11.0 用户 token user password

【一】OAuth2.0的授权模式

  • 授权码授权模式(Authorization Code Grant)

  • 隐式授权模式(Implicit Grant)

  • 密码授权模式(Resource Owner Password Credentials Grant)

    image-20230930150928916

  • 客户端凭证授权模式(Client Credentials Grant)

【二】密码授权模式

【1】FastAPI 的 OAuth2PasswordBearer说明

  • OAuth2PasswordBearer是接收URL作为参数的一个类:
    • 客户端会向该URL发送username和password参数,然后得到一个Token值
  • OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
  • 当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)

【2】定义Token请求地址

# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")

【3】定义获取token视图

@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
    return {"token": token}

【4】定义登陆视图

from typing import Optional

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from starlette import status

app06 = APIRouter()

"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""

"""
OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值
OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
"""

# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")


@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
    return {"token": token}


"""基于 Password 和 Bearer token 的 OAuth2 认证"""

# 模拟数据库
fake_users_db = {
    "john snow": {
        "username": "john snow",
        "full_name": "John Snow",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret",
        # 模拟权限 : 未激活用户 无权限
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret2",
        # 模拟权限 : 激活用户,有权限
        "disabled": True,
    },
}


# 模拟加密密码操作
def fake_hash_password(password: str):
    return "fakehashed" + password


# 创建用户模型类
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


# 创建 用户登入传入的数据类
class UserInDB(User):
    hashed_password: str


# 定义登陆视图
@app06.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 根据输入的用户名 从数据库中获取到 用户数据
    user_dict = fake_users_db.get(form_data.username)
    # 用户不存在
    if not user_dict:
        # 抛出异常 , 用户不存在
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
    # 用户存在,将 查询到的用互数据进行校验
    user = UserInDB(**user_dict)
    # 校验 输入的密码是否正确 —(先对密码进行加密,再比对)
    hashed_password = fake_hash_password(form_data.password)
    # 如果 加密后的密码 不等于 数据库查询到的用户对应的密码
    if not hashed_password == user.hashed_password:
        # 抛出异常,登陆失败
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
    # 用户登陆成功 返回用户名和token等信息
    return {"access_token": user.username, "token_type": "bearer"}


# 模拟校验用户信息视图
def get_user(db, username: str):
    '''

    :param db: 数据库对象
    :param username: token字符串
    :return:
    '''
    # 判断当前token字符串是否存在于数据库当中
    if username in db:
        # 存在数据库当中,取出详细的用户信息返回
        user_dict = db[username]
        return UserInDB(**user_dict)


# 验证token
def fake_decode_token(token: str):
    # 传入 token 字符串
    # 从数据库中查询 当前登录对象
    user = get_user(fake_users_db, token)
    # 返回校验通过后的用户对象
    return user


# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_schema)):
    # 根据当前用户传入的 用户名和密码 自动签发token
    # 校验签发的token是否正确
    user = fake_decode_token(token)
    # token不正确
    if not user:
        # 抛出异常
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
            headers={"WWW-Authenticate": "Bearer"},
        )
    # token正确,返回用户信息
    return user


# 获取激活用户信息
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    # 依赖于上一步 用户已经登陆,并且已经是当前登录的用户对象
    # 校验 disabled 字段,校验是否处于激活状态
    if current_user.disabled:
        # 未激活抛出异常
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    # 激活返回用户对象
    return current_user

# 定义视图
@app06.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    # 只有当前用户是登陆且激活的用户才能被返回
    return current_user

【5】发起请求

image-20230930153546169

(1)Oauth2 密码模式认证

  • 点击 锁 弹出表单提示框,输入用户名和密码

image-20230930153716154

  • 登陆成功

image-20230930153731021

  • 点击 close 我们可以发现 原来的锁 锁上了
    • 表示我们已经获取到了 token

image-20230930153822683

  • 直接发起请求 也能正确的响应我们的数据

image-20230930153921134

(2)登陆

  • 用户一:john snow

image-20230930154038041

  • 用户二:alice

image-20230930154148817

  • 登陆失败

image-20230930154228999

(3)获取激活用户的信息

image-20230930154704356

  • 先登录激活用户

    image-20230930154716334

    • 正确响应信息

    image-20230930154749030

  • 登陆未激活用户

    image-20230930154828142

    • 提示未认证

    image-20230930154848673

【三】JWT认证(JSON Web Tokens)

【1】简单引入

  • 详细的JWT认证请见我其他博客文章

image-20230930154924953

【2】定义视图

from datetime import datetime, timedelta
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

app06 = APIRouter()

"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""

"""
OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值
OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
"""

# 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")


@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
    return {"token": token}


"""基于 Password 和 Bearer token 的 OAuth2 认证"""

# 模拟数据库
fake_users_db = {
    "john snow": {
        "username": "john snow",
        "full_name": "John Snow",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret",
        # 模拟权限 : 未激活用户 无权限
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "[email protected]",
        "hashed_password": "fakehashedsecret2",
        # 模拟权限 : 激活用户,有权限
        "disabled": True,
    },
}


# 模拟加密密码操作
def fake_hash_password(password: str):
    return "fakehashed" + password


# 创建用户模型类
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


# 创建 用户登入传入的数据类
class UserInDB(User):
    hashed_password: str


# 定义登陆视图
@app06.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 根据输入的用户名 从数据库中获取到 用户数据
    user_dict = fake_users_db.get(form_data.username)
    # 用户不存在
    if not user_dict:
        # 抛出异常 , 用户不存在
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
    # 用户存在,将 查询到的用互数据进行校验
    user = UserInDB(**user_dict)
    # 校验 输入的密码是否正确 —(先对密码进行加密,再比对)
    hashed_password = fake_hash_password(form_data.password)
    # 如果 加密后的密码 不等于 数据库查询到的用户对应的密码
    if not hashed_password == user.hashed_password:
        # 抛出异常,登陆失败
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
    # 用户登陆成功 返回用户名和token等信息
    return {"access_token": user.username, "token_type": "bearer"}


# 模拟校验用户信息视图
def get_user(db, username: str):
    '''

    :param db: 数据库对象
    :param username: token字符串
    :return:
    '''
    # 判断当前token字符串是否存在于数据库当中
    if username in db:
        # 存在数据库当中,取出详细的用户信息返回
        user_dict = db[username]
        return UserInDB(**user_dict)


# 验证token
def fake_decode_token(token: str):
    # 传入 token 字符串
    # 从数据库中查询 当前登录对象
    user = get_user(fake_users_db, token)
    # 返回校验通过后的用户对象
    return user


# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_schema)):
    # 根据当前用户传入的 用户名和密码 自动签发token
    # 校验签发的token是否正确
    user = fake_decode_token(token)
    # token不正确
    if not user:
        # 抛出异常
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
            headers={"WWW-Authenticate": "Bearer"},
        )
    # token正确,返回用户信息
    return user


# 获取激活用户信息
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    # 依赖于上一步 用户已经登陆,并且已经是当前登录的用户对象
    # 校验 disabled 字段,校验是否处于激活状态
    if current_user.disabled:
        # 未激活抛出异常
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    # 激活返回用户对象
    return current_user


# 定义视图
@app06.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    # 只有当前用户是登陆且激活的用户才能被返回
    return current_user


"""OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证"""
# 模拟数据库
fake_users_db.update({
    "john snow": {
        "username": "john snow",
        "full_name": "John Snow",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
})

####配置

# 生成秘钥
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"  # 生成密钥 openssl rand -hex 32
ALGORITHM = "HS256"  # 算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # 访问令牌过期分钟


# 模拟 Token 库
class Token(BaseModel):
    """返回给用户的Token"""
    access_token: str
    token_type: str


# 加密密码的方法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 签发 token 的方法
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/jwt/token")


# 校验密码
def verity_password(plain_password: str, hashed_password: str):
    """对密码进行校验"""
    return pwd_context.verify(plain_password, hashed_password)


# 获取签发认证后的用户
def jwt_get_user(db, username: str):
    # 判断用户是否存在数据库中
    if username in db:
        # 存在数据库中则返回用户信息
        user_dict = db[username]
        # 返回用户对象
        return UserInDB(**user_dict)


# JWT 认证用户
def jwt_authenticate_user(db, username: str, password: str):
    # 首先获取到签发认证的用户
    user = jwt_get_user(db=db, username=username)
    if not user:
        # 签发不成功,返回失败
        return False
    # 签发成功则校验密码
    if not verity_password(plain_password=password, hashed_password=user.hashed_password):
        return False
    # 密码正确返回用户对象
    return user


# 签发token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    # 先将 数据 copy 一份
    to_encode = data.copy()
    # 过期时间存在则取出
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 默认时间 15 分钟
        expire = datetime.utcnow() + timedelta(minutes=15)
    # 更新过期时间
    to_encode.update({"exp": expire})
    # 加密签发token
    # claims 原始数据
    # key 秘钥
    # algorithm 算法
    encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
    # 返回签发好的 token
    return encoded_jwt


# 定义视图,登录签发 token , 定义响应数据体 Token
@app06.post("/jwt/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 认证用户
    user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
    if not user:
        # 认证不通过则抛出异常
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # 认证过期时间
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 签发token
    access_token = create_access_token(
        # 默认的参数 是 sub
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    # 返回签发成功的token
    return {"access_token": access_token, "token_type": "bearer"}


# 创建依赖 --- 获取当前登录的用户
async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
    # 定义一个异常对象,方便下面多次调用
    credentials_exception = HTTPException(
        status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 对 token 进行解码
        payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
        # 获取到 解码后的数据中的 用户名
        username = payload.get("sub")
        # 用户名不存在
        if username is None:
            # 抛出异常
            raise credentials_exception
    except JWTError:
        # JWT 认证失败 则抛出异常
        raise credentials_exception

    # 获取到签发认证成功后的用户对象
    user = jwt_get_user(db=fake_users_db, username=username)
    if user is None:
        raise credentials_exception
    return user


# 创建依赖 --- 获取当前登录的激活后的用户
async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
    return current_user


# 定义登陆视图
@app06.get("/jwt/users/me")
async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
    return current_user

【3】发起请求

(1)登陆签发token

  • 正确登录

image-20230930160738632

  • 失败登录

image-20230930160822526

(2)登陆用户获取信息

  • 未登录认证

image-20230930161015118

  • 认证 激活 用户

    image-20230930161033816

    • 获取当前用户信息

    image-20230930161131047

  • 认证 未激活 用户

    • 原理同上

标签:username,OAuth2.0,return,Fastapi,11.0,用户,token,user,password
From: https://www.cnblogs.com/dream-ze/p/17738897.html

相关文章

  • FastAPI学习-26 并发 async / await
    前言有关路径操作函数的asyncdef语法以及异步代码、并发和并行的一些背景知识async和await关键字如果你正在使用第三方库,它们会告诉你使用await关键字来调用它们,就像这样:results=awaitsome_library()然后,通过asyncdef声明你的路径操作函数:@app.get('/')asy......
  • FastAPI学习-22.response 异常处理 HTTPException
    前言某些情况下,需要向客户端返回错误提示。这里所谓的客户端包括前端浏览器、其他应用程序、物联网设备等。需要向客户端返回错误提示的场景主要如下:客户端没有执行操作的权限客户端没有访问资源的权限客户端要访问的项目不存在等等...遇到这些情况时,通常要返回 4XX(40......
  • FastAPI学习-23.异常处理器 exception_handler
    前言通常我们可以通过raise抛出一个HTTPException异常,请求参数不合法会抛出RequestValidationError异常,这是最常见的2种异常。HTTPException异常向客户端返回HTTP错误响应,可以使用 raise触发 HTTPException。fromfastapiimportFastAPI,HTTPExceptionapp=Fa......
  • FastAPI学习-24.自定义异常处理器 exception_handler
    前言添加自定义处理器,要使用 Starlette的异常工具。安装自定义异常处理器假设要触发的自定义异常叫作 UnicornException。且需要FastAPI实现全局处理该异常。此时,可以用 @app.exception_handler() 添加自定义异常控制器:fromfastapiimportFastAPI,Requestfromfa......
  • FastAPI学习-25.response_model 定义响应模型
    你可以在任意的_路径操作_中使用 response_model 参数来声明用于响应的模型:@app.get()@app.post()@app.put()@app.delete()fromtypingimportAny,List,UnionfromfastapiimportFastAPIfrompydanticimportBaseModelapp=FastAPI()classItem(BaseModel)......
  • 在线问诊 Python、FastAPI、Neo4j — 提供接口服务
    目录构建服务层接口路由层PostMan调用采用FastAPI搭建服务接口:https://www.cnblogs.com/vipsoft/p/17684079.htmlFastAPI文档:https://fastapi.tiangolo.com/zh/构建服务层qa_service.pyfromservice.question_classifierimport*fromservice.question_parserimpor......
  • fastapi+tortoise-orm+redis+celery 多worker数据库连接
    我用fastapi在写接口,数据库orm用的是tortoise-orm,接口的数据库操作是正常的。现在加入了celery,但是每个celery在执行任务时,不能获取到数据库连接我想要每个worker获得数据库连接,但是不要每个任务都去连接一次,并在每个worker结束时,断开连接,但是不能断开其他worker的数据库连接from......
  • 在线问诊 Python、FastAPI、Neo4j — 问题反馈
    目录查出节点拼接节点属性测试结果问答演示通过节点关系,找出对应的节点,获取节点属性值,并拼接成想要的结果。接上节生成的CQL#输入question_class={'args':{'看东西有时候清楚有时候不清楚':['symptom']},'question_types':['symptom_disease']}#输出[{'question_typ......
  • 在线问诊 Python、FastAPI、Neo4j — Cypher 生成
    目录构建节点字典构建CypherCQL语句Test这边只是为了测试,演示效果和思路,实际应用中,可以通过NLP构建CQL接上一篇的问题分类question="请问最近看东西有时候清楚有时候不清楚是怎么回事"#最终输出data={'args':{'看东西有时候清楚有时候不清楚':['symptom']},'questio......
  • 在线问诊 Python、FastAPI、Neo4j — 构建问题分类器
    目录构建字典数据构建Trie字典树按实体组装字典问题分析将问题进行分析,和系统已有的分类进行关联构建字典数据将构建的知识图片字典化,用于后面对问题的解析,下图为症状的字典,其它字典同理构建Trie字典树将建字典数据,组装集合cur_dir='/'.join(os.path.abspath(__file......