首页 > 编程问答 >FastAPI oauth2 + jwt 延长每个请求的exp时间

FastAPI oauth2 + jwt 延长每个请求的exp时间

时间:2024-08-03 15:35:36浏览次数:14  
标签:python jwt fastapi

根据fastapi的示例,我们可以使用ouath2和json网络令牌为用户创建登录:

from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return [{"item_id": "Foo", "owner": current_user.username}]

但是我们可以验证令牌将在30分钟后过期。 ACCESS_TOKEN_EXPIRE_MINUTES = 30 我想要的是一个令牌,每次用户向该应用程序中的任何端点发出请求时,该令牌都会将其过期时间延长当前时间 + 10 分钟。因此,我可以始终保持一段用户不活动状态,即使用户正在积极使用该应用程序,也不会突然注销。有没有一种方法可以做到这一点,而无需将用户存储在数据库中,而仅使用存储在客户端 Web 浏览器中的 jwts 来实现?解决此类问题的最佳实践是什么?每次更新授权标头 要求 ?设置cookies?请告知。

编辑:这是我尝试使用中间件来修改标头,但它仍然不起作用。

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception

    # Check if the token needs to be refreshed
    expiration_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
    if expiration_time - datetime.now(tz=timezone.utc) < timedelta(minutes=REFRESH_INTERVAL_MINUTES):
        # Refresh the token with a new expiration time
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        new_access_token = create_access_token(
            data={"sub": user.username}, expires_delta=access_token_expires
        )
        return user, new_access_token

    return user , token


async def get_current_active_user(
    current_user_and_token: Annotated[User, Depends(get_current_user)]
):
    current_user,_ = current_user_and_token
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

你提出的延长 JWT 过期时间的想法(每次请求都延长10分钟)在概念上与 JWT 的无状态性质相冲突。JWT 的一大优势在于它们是自包含的,这意味着你不需要在每次请求时都去数据库或其他地方验证它们。如果你开始修改每个请求的过期时间,那么你实际上就引入了需要某种状态管理的机制,这与 JWT 的初衷背道而驰。

更好的解决方案:

  1. 使用刷新令牌:

  2. 颁发一个短期的访问令牌(例如 30 分钟)和一个长期刷新令牌(例如 1 周或更长)。

  3. 每次请求时,检查访问令牌是否过期。
  4. 如果过期,则使用刷新令牌请求新的访问令牌。
  5. 刷新令牌的验证需要访问数据库或其他安全存储,以确保其有效性。
  6. 这种方法平衡了安全性和用户体验。

  7. 使用滑动窗口过期时间:

  8. 在服务器端设置一个配置项,例如 10 分钟。

  9. 每次请求时,检查访问令牌是否过期。
  10. 如果过期,但距离上次颁发时间不超过配置的滑动窗口时间,则自动刷新访问令牌并将其返回给客户端。
  11. 如果超过滑动窗口时间,则要求用户重新登录。
  12. 这种方法不需要刷新令牌,但仍然需要服务器端的状态管理。

代码示例(使用刷新令牌):

from datetime import datetime, timedelta

# ... other imports ...

REFRESH_TOKEN_EXPIRE_MINUTES = 1440  # 24 hours

# ... other code ...

@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> dict:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
    refresh_token = create_access_token(
        data={"sub": user.username}, expires_delta=refresh_token_expires
    )
    return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}


@app.post("/refresh_token")
async def refresh_access_token(refresh_token: str):
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    # 在这里,你应该验证刷新令牌是否在数据库或其他安全存储中
    user = get_user(fake_users_db, username=username)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

请注意:

  • 以上代码示例仅供参考,你需要根据自己的实际情况进行修改和完善。
  • 为了确保安全性,你需要将刷新令牌存储在安全的地方,例如数据库中。
  • 在刷新访问令牌时,你需要验证刷新令牌的有效性,例如检查它是否已被撤销。

希望这些信息能帮到你!

标签:python,jwt,fastapi
From: 78205950

相关文章

  • 如何在venv python中安装requirements.txt
    我是Python虚拟环境的初学者,在安装requirements.txt文件时遇到问题。问题是,当我运行命令来安装requirements.txt文件时,没有安装任何内容。平台:WindowsVS代码镜像如何解决这个问题?没有正确激活虚拟环境。请按照以下步骤操作:1.激活虚拟环境:在VSC......
  • 【代码随想录】图论复习(Python版)
    深度优先搜索1.搜索过程一个方向搜,不到黄河不回头,直到遇到绝境了,搜不下去了,再换方向(换方向的过程就涉及到了回溯)2.代码框架回溯法的代码框架:defbacktracking(参数):if终止条件:存放结果returnfor选择本层集合中的元素(树中节点孩子的数量......
  • 【Python】数据类型之字符串
    本篇文章将继续讲解字符串其他功能:1、求字符串长度功能:len(str)  ,该功能是求字符串str的长度。代码演示:2、通过索引获取字符串的字符。功能:str[a]  str为字符串,a为整型。该功能是获取字符串str索引为a处的字符。注意:字符串的索引是从0开始的。代码演示:注意......
  • 【Python】python基础
    本篇文章将讲解以下知识点:(1)循环语句(2)字符串格式化(3)运算符一:循环语句循环语句有两种:while   for本篇文章只讲解while循环格式:while 条件:  代码(只有条件为真的时候,此代码才会被执行,此处的代码可以是多行代码)(1)循环语句基本使用示例1:此处代码执行过程:1<3......
  • python 爬虫入门实战——爬取维基百科“百科全书”词条页面内链
    1.简述本次爬取维基百科“百科全书”词条页面内链,仅发送一次请求,获取一个html页面,同时不包含应对反爬虫的知识,仅包含最基础的网页爬取、数据清洗、存储为csv文件。爬取网址url为“https://zh.wikipedia.org/wiki/百科全书”,爬取内容为该页面所有内链及内链标识(下图蓝......
  • Python:match()和search()的区别
    在Python中,match()和search()函数通常与正则表达式(regularexpressions)一起使用,特别是在re模块中。尽管它们都用于搜索字符串中的模式,但它们在搜索行为上有关键的区别。re.match()re.match()函数尝试从字符串的起始位置匹配一个模式,如果不是起始位置匹配成功的话,match()......
  • Python:range()函数的用法
    range()函数是Python中一个内置函数,用于生成一个数字序列。这个函数通常用于在for循环中迭代一个指定的次数。range()函数可以接收一到三个参数,分别是起始值(start)、结束值(stop)和步长(step),但步长是可选的,默认值为1。基本用法两个参数:range(start,stop)生成一个从star......
  • 计算机毕业设计-基于python高校大学生评奖评优系统【源码+文档+PPT】
    精彩专栏推荐订阅:在下方主页......
  • Python知识点
    目录1、数据类型2、变量3、列表4、集合5、字典6、注释7、基本功能8、条件语句9、循环语句10、函数11、异常处理12、字符串操作13、正则表达式1、数据类型数据类型是可以存储在变量中的数据规范。解释器根据变量的类型为变量分配内存。下面是Python中的各种......
  • 家庭局域网中电脑唤醒 —— WOL远程唤醒(python实现)
    相关:https://blog.csdn.net/hih30250/article/details/136342258在WOL介绍里说过WOL数据包的最简格式是由6个字节的255和目标计算机的48位MAC地址,重复16次组成,并且这个数据包可以包含在任何协议中,最常见的是包含在UDP中。点击查看代码importsocketimportstructclass......