首页 > 其他分享 >fastapi身份认证

fastapi身份认证

时间:2022-12-10 23:24:24浏览次数:72  
标签:username fastapi 认证 access token user str password 身份

官方文档

FastApi提供了OAuth2PasswordBearer类对OAuth2中的password授权模式提供了支持。

一、实现逻辑

  1. 创建OAuth2PasswordBearer实例并指明token url(认证用户获取token)。

    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
  2. 将创建的OAuth2PasswordBearer作为依赖添加到需要用户认证才能访问的url。

    @app.get("/items/")
    async def read_items(token: str = Depends(oauth2_scheme)):
        return {"token": token}
    
  3. 实现tokenUrl端点

    • 认证用户

      def authenticate_user(fake_db, username, password):
          user = get_user(fake_db, username) (1)
          if not user:
              return False
          if not verify_password(password, user.hashed_password):  (2)
              return False
          return user
      

      这里使用passlibBcrypt生成和验证hash密码。

      $ pip install passlib[bcrypt]
      

      (1)通过用户名查询系统用户

      (2)验证密码

    • 生成token

      def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
          to_encode = data.copy()
          if expires_delta:
              expire = datetime.utcnow() + expires_delta
          else:
              expire = datetime.utcnow() + timedelta(minutes=15)
          to_encode.update({"exp": expire})
          encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) (1)
          return encoded_jwt
      

      这里使用python-josecryptography生成和校验token,

      $ pip install python-jose[cryptography]
      

      (1)SECRET_KEY为生成的秘钥,ALGORITHM为秘钥算法,这里为HS256。

      $ openssl rand -hex 32
      
    • 定义端点

      @app.post("/token")
      async def login(form_data: OAuth2PasswordRequestForm = Depends()):
          user = authenticate_user(fake_users_db, form_data.username, form_data.password)
          if not user:
              raise HTTPException(
                  status_code=status.HTTP_401_UNAUTHORIZED,
                  detail="Incorrect username or password",
                  headers={"WWW-Authenticate": "Bearer"},
              )
          access_token_expires = timedelta(minutes=15)
          access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires) 
          return {"access_token": access_token, "token_type": "bearer"}
      

二、OAuth2PasswordBearer

class OAuth2PasswordBearer(OAuth2):
    def __init__(
        self,
        tokenUrl: str,
        scheme_name: Optional[str] = None,
        scopes: Optional[Dict[str, str]] = None,
        description: Optional[str] = None,
        auto_error: bool = True,
    ):
        if not scopes:
            scopes = {}
        flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
        super().__init__(
            flows=flows,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

OAuth2PasswordBearer的tokenUrl属性定义了认证请求处理url,由开发者自行实现token url。

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")  (1)


@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):  (2)
    return {"token": token}

(1)创建OAuth2PasswordBearer实例并声明token url;

(2)通过Depends(oauth2_scheme)可以从请求中获取Bearer token,如果用户没有登录则将会返回401错误;

三、实现token url端点:

@app.post("/token")   (1)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password) (2)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=15)    (3)
    access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires) (4)
    return {"access_token": access_token, "token_type": "bearer"}

(1)使用post方法请求/token即可进行用户认证获取token。

(2)通过请求中的用户名进行用户认证,认证通过后将返回用户信息

(3)token有效时间

(4)生成token

四、OAuth2PasswordRequestForm

class OAuth2PasswordRequestForm:
    def __init__(
        self,
        grant_type: str = Form(default=None, regex="password"),
        username: str = Form(),
        password: str = Form(),
        scope: str = Form(default=""),
        client_id: Optional[str] = Form(default=None),
        client_secret: Optional[str] = Form(default=None),
    ):
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scopes = scope.split()
        self.client_id = client_id
        self.client_secret = client_secret

OAuth2PasswordRequestForm是fastapi提供的获取请求中的用户名密码的依赖项,声明了如下的请求表单:

  • username
  • password
  • 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串。
  • 一个可选的 grant_type
  • 一个可选的 client_id
  • 一个可选的 client_secret

注:使用form_data需要安装python-multipart

$ pip install python-multipart

五、完整示例代码

import time
from datetime import timedelta, datetime
from typing import Union

from fastapi import Depends, FastAPI, HTTPException, Request

from fastapi.security.oauth2 import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from starlette import status
from passlib.context import CryptContext
from jose import JWTError, jwt

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$uaiu4T29ukSDdbKIwv51mee661OGiaycuE276syh79I5uAIIVkatC",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "$2b$12$uaiu4T29ukSDdbKIwv51mee661OGiaycuE276syh79I5uAIIVkatC",
        "disabled": True,
    },
}

class UserInDB(User):
    hashed_password: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class Token(BaseModel):
    access_token: str
    token_type: str


pwd_context = CryptContext(schemes=["bcrypt"])


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username, password):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"


def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_activate_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=15)
    access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
    return {"access_token": access_token, "token_type": "bearer"}

  
@app.get("/user/me", response_model=User)
async def read_items(user: User = Depends(get_current_activate_user)):
    return user

六、中间件

fastapi提供了中间件可以在请求执行前和请求执行后做一下其他的拦截。

创建中间件只需要使用@app.middleware("http")装饰器,所有的请求执行前和执行后都将经过中间件。

import time

from fastapi import FastAPI, Request

app = FastAPI()


@app.middleware("http") (1)
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()  (2)
    response = await call_next(request)
    process_time = time.time() - start_time  (3)
    response.headers["X-Process-Time"] = str(process_time)
    return response

(1)声明该函数为中间件

(2)请求处理前前执行

(3)请求处理完成后执行

标签:username,fastapi,认证,access,token,user,str,password,身份
From: https://www.cnblogs.com/yourblog/p/16972587.html

相关文章

  • 获取Openstack认证令牌
    在运行身份管理服务的典型OpenStack部署中,可以指定用于认证的项目名、用户名和密码凭证。下面以使用cURL命令为例进行示范。(1)首先导出环境变量OS_PROJECT_NAME(项目名)、OS_......
  • 10:Java人脸识别认证-Java API 实战
    (目录)1.提出问题,引入SDK的概念什么是SDK?我们并不具备开发人脸识别的能力,但我们可以用大公司已经开发好的工具或者功能,来实现人脸识别,而大公司提供的就叫SDK(Software......
  • python中openpyxl给excel表去重和身份证号信息提取
    前言:python操作excel用openpyxl库非常方便,今天学习一下给excel表去重,还有身份证号信息提取,自动计算年龄。#coding:utf-8fromopenpyxlimportload_workbookfromopenpyxl.......
  • asp.net core 基于Cookies的认证,自定义认证方案
    前言:看完《ASP.NETCore6框架揭秘》很久了,这几天我发现自己对基于Cookie的认证还是有点疑惑,特别是自定义“认证方案”。所以写代码加强理解。别误会,我不会......
  • 上海ISO三体系认证办理常见流程
    ISO三体系认证是国际上标准化体系认证的结合,又称三标体系认证或三标一体,包括ISO9001质量管理体系、ISO14001环境管理体系、ISO45001职业安全健康管理体系。认证三体系标准已......
  • 基于 Serverless一键体验FastAPI
    基于Serverless一键体验FastAPI​​活动地址​​​​服务搭建​​​​服务使用​​​​ServerlessDevs介绍​​​​什么是ServerlessDevs​​​​ServerlessDevs优势​......
  • 什么是UKCA认证?玩具办理UKCA测试标准周期。
    什么是UKCA认证?UKCA是英国合格认定(UKConformityAssessed)的简称.2019年2月2日,英国公布了在无协议脱欧的情况下将会采用UKCA标志方案。2021年1月1日之后,开始正式实施新标......
  • MSDS认证什么产品需要做呢?
    MSDS认证需要提供什么资料什么是MSDS认证?MSDS认证报告即MaterialSafetyDataSheet,(材料安全数据表),中文名为化学品安全说明书。美国、加拿大、澳洲及亚洲多国一般称MSDS,而......
  • 面对庞大复杂的身份和权限管理,企业该怎么办?
    摘要:随着各领域加快向数字化、移动化、互联网化的发展,企业信息环境变得庞大复杂,身份和权限管理面临巨大的挑战。本文分享自华为云社区《面对庞大复杂的身份和权限管理,企业......
  • 美国站亚马逊需要做什么认证呢
    近期,亚马逊在严查登山扣及其相关产品,被抽查到没有相关认证的登山扣产品将直接被下架!而登山扣上亚马逊,需要做什么认证呢?一、什么是登山扣登山扣是扣子的一种,顾名思义其就......