常用状态码的使用场景
状态码 | 使用场景 |
---|---|
200 | 服务请求成功,并响应了数据 ,修改也是 |
201 | 新增数据成功,用于新建资源 |
204 | 用于删除内容 |
301 | 永久重定向 |
400 | 路径、?查询参数,请求头,请求体参数有错误 |
401 | 当前请求需要用户登录验证,登录失败,或未登录 |
403 | 没有权限访问,请联系管理员 |
404 | 请求失败,请求所希望得到的资源未被在服务器上发现。 |
一、基于传统的 token 认证
基于传统的 token 认证流程:用户登录,服务端给返回的token,并将token保存在服务端,以后再来访问时,需要携带token,服务器获取token后,再去数据库中获取token来进行校验。
- 注册DRF应用
# settings.py
# Application definition
INSTALLED_APPS = [
...
'rest_framework',
]
- 编写用户模型
# models.py
from django.db import models
# Create your models here.
class UserInfo(models.Model):
username = models.CharField(max_length=32)
password = models.CharField(max_length=32)
token = models.CharField(max_length=128,null=True,blank=True)
- 数据库迁移
# shell命令行
python manage.py makemigrations
# python manage.py check 查看数据库迁移是否有错
python manage.py migrate
- 编写视图
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from user.models import UserInfo
import uuid
class LoginView(APIView):
"""用户登录"""
def post(self,request,*args,**kwargs):
user = request.data.get('username')
pwd = request.data.get('password')
user_obj = UserInfo.objects.filter(username=user,password=pwd).first()
if not user_obj:
return Response({'code':401,'error':'用户名或密码错误'})
# 模拟生成的token
user_token = str(uuid.uuid4())
# 保存生成的token,用于后续的校验
user_obj.token = user_token
user_obj.save()
return Response({'code':201,'token':user_token})
class IndexView(APIView):
# 判断携带 token 是否生效
def get(self,request,*args,**kwargs):
token = request.query_params.get('token')
if not token:
return Response({"code":401,"error":"登录成功后才能访问"})
user_token = UserInfo.objects.filter(token=token).first()
if not user_token:
return Response({"code":401,'error':"token已失效"})
return Response(f"已登录成功,欢迎{user_token.username}!")
- 配置路由
# urls.py
from django.contrib import admin
from django.urls import path
from django.conf import settings
from django.conf.urls.static import static
from user.views import LoginView,IndexView
urlpatterns = [
path('admin/', admin.site.urls),
path('login/', LoginView.as_view()),
path('index/', IndexView.as_view()),
]
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
-
在表中预先存入下的数据,用于模拟验证
-
post请求用户名和密码
- 访问页面
二、DRF中使用pyjwt编写JWT 认证
JWT的全称是Json Web Token,常用于用户认证(用于前后端分离、微信小程序、app开发)
JWT认证流程:用于登录,服务端给用户返回一个token(服务器中不保存),之后用户再来访问,需要携带token,再做token的校验。从https://jwt.io/官网上看到了JWT字符串示例如下图:
- 用户提交用户名和密码给服务端,如果登录成功,使用JWT创建一个token值如下
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 1
- 第一部分标头通常(HEADER)由两部分组成:令牌的类型,即 JWT,以及正在使用的签名算法,例如 HMAC SHA256 或 RSA。
例:{ "alg": "HS256", "typ": "JWT" }
- 第二部分是负载(PYLOAD),其中包含声明。声明是关于实体(通常是用户)和附加数据的声明。共有三种类型:注册声明、公共声明和私人声明。注册声明:这些是一组预定义的声明,这些声明不是强制性的,而是推荐的,以提供一组有用的、可互操作的声明。其中一些是:iss(发行者)、exp(到期时间)、sub(主题)、aud(受众)等。(请注意,声明名称只有三个字符,因为 JWT 是紧凑的。)公共声明:这些可以由使用 JWT 的人随意定义。但是为了避免冲突,它们应该在 IANA JSON Web Token Registry 中定义,或者定义为包含抗冲突命名空间的 URI。私人声明:这些自定义声明是为了在同意使用它们的各方之间共享信息而创建的,这些声明既不是注册声明也不是公开声明。
例(不要加入敏感信息):{ "sub": "1234567890", "name": "John Doe", "admin": true }
- 第三部分是签名(SIGN),要创建签名部分,您必须获取编码的标头、编码的有效载荷、秘密、标头中指定的算法,并对其进行签名。
例:HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), secret)
- 以后用户访问时候,需要携带Token,后端需要对token进行校验
- 第一步:获取Token,对Token进行切割
- 第二步:对第二段进行base64url解密,并获取payload信息,检验token是否已经超时
- 第三步:把1,2,部分拼接,再次执行Heade中的加密算法加密,并与签名进行判断,相等即认证通过。
2.1 JWT 认证实现
- 安装 pyjwt,或者其它方式生成l链接
pip install pyjwt
- 1
- 由于不需要数据库保存,做出如下方式修改即可。
from rest_framework.views import APIView
from rest_framework.response import Response
from user.models import UserInfo
import uuid
from django.conf import settings
class LoginView(APIView):
"""用户登录"""
def post(self,request,*args,**kwargs):
user = request.data.get('username')
pwd = request.data.get('password')
user_obj = UserInfo.objects.filter(username=user,password=pwd).first()
if not user_obj:
return Response({'code':401,'error':'用户名或密码错误'})
import jwt
import datetime
salt = settings.SECRET_KEY
# 构造Header,默认如下
headers = {
'typ':'jwt',
'alg':'HS256'
}
# 构造Payload
payload = {
'user_id':user_obj.pk,#自定义用户ID
'username':user_obj.username,#自定义用户名
'exp':datetime.datetime.utcnow()+datetime.timedelta(minutes=1),# 设置超时时间,1min
}
jwt_token = jwt.encode(headers=headers,payload=payload,key=salt,algorithm='HS256')
return Response({'code':200,'token':jwt_token})
# {"username":"admin","password":"admin123"}
class IndexView(APIView):
def get(self,request,*args,**kwargs):
token = request.query_params.get('token')
if not token:
return Response({"code":401,"error":"登录成功后才能访问"})
# 切割
# 解密payload,判断是否过期
# 验证第三段的合法性
import jwt
salt = settings.SECRET_KEY
error = ""
try:
# 从token中获取payload【不校验合法性】
# unverified_payload = jwt.decode(token, None, False)
# print(unverified_payload)
# 从token中获取payload【校验合法性】
payload = jwt.decode(jwt=token, key=salt, algorithms=["HS256"])
print(payload)
return Response(f"已登录成功,欢迎!")
except jwt.ExpiredSignatureError:
error = "token已失效"
return Response({"code": 401, "error": error})
except jwt.DecodeError:
error = "token认证失败"
return Response({"code": 401, "error": error})
except jwt.InvalidTokenError:
error = "非法token"
return Response({"code": 401, "error": error})
2.2 JWT 结合 BaseAuthentication 使用
- 新建一个文件用于识别是否认证的功能模块
# user/extensions/jwt_authenticate.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.conf import settings
class JWTQueryParamsAuthentication(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
print(token)
if not token:
raise AuthenticationFailed({"code": 401, "error": "登录成功后才能访问"})
# 切割
# 解密payload,判断是否过期
# 验证第三段的合法性
import jwt
salt = settings.SECRET_KEY
try:
# 从token中获取payload【不校验合法性】
# unverified_payload = jwt.decode(token, None, False)
# print(unverified_payload)
# 从token中获取payload【校验合法性】
payload = jwt.decode(jwt=token, key=salt, algorithms=["HS256"])
print(payload)
return (payload,token)
except jwt.exceptions.ExpiredSignatureError:
error = "token已失效"
raise AuthenticationFailed({"code": 401, "error": error})
except jwt.exceptions.DecodeError:
error = "token已认证失败"
raise AuthenticationFailed({"code": 401, "error": error})
except jwt.exceptions.InvalidTokenError:
error = "非法的token"
raise AuthenticationFailed({"code": 401, "error": error})
""" 三种操作
1. 抛出异常,后续不在执行
2. return 一个元组(1,2)认证通过,
在视图中调用request.user就是元组的第一个值;
另外一个就是request.auth
3.None
"""
- 新建一个文件用于存储Token的功能模块
# user/utils/jwt_create_token.py
import jwt
from django.conf import settings
def create_token(payload):
salt = settings.SECRET_KEY
# 构造Header,默认如下
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
jwt_token = jwt.encode(headers=headers, payload=payload, key=salt, algorithm='HS256')
return jwt_token
- 添加全局认证
# settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': ['user.extensions.jwt_authenticate.JWTQueryParamsAuthentication',],
}
- 修改views.py文件
from rest_framework.views import APIView
from rest_framework.response import Response
from user.models import UserInfo
import datetime
from user.utils.jwt_create_token import create_token
from user.extensions.jwt_authenticate import JWTQueryParamsAuthentication
class LoginView(APIView):
"""用户登录"""
authentication_classes = [] # 取消全局认证
def post(self,request,*args,**kwargs):
user = request.data.get('username')
pwd = request.data.get('password')
user_obj = UserInfo.objects.filter(username=user,password=pwd).first()
if not user_obj:
return Response({'code':401,'error':'用户名或密码错误'})
payload = {
'user_id':user_obj.pk,#自定义用户ID
'username':user_obj.username,#自定义用户名
'exp':datetime.datetime.utcnow()+datetime.timedelta(minutes=1),# 设置超时时间,1min
}
jwt_token = create_token(payload=payload)
return Response({'code':200,'token':jwt_token})
# {"username":"admin","password":"admin123"}
class IndexView(APIView):
# 局部认证
# authentication_classes = [JWTQueryParamsAuthentication,]
def get(self,request,*args,**kwargs):
return Response("可以了")
三、DRF中使用djangorestframework-jwt
3.1 djangorestframework-jwt 使用流程
- 安装djangorestframework-jwt
pip install djangorestframework-jwt
- 更改settings中的配置文件
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES":(
'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
# 全局设置的方法,也可在单个视图中设置,如同之前编写的
)
}
# 设置JWT参数(如过期时间):
import datetime
JWT_AUTH = {
'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=300), # 设置JWT Token的有效时间
'JWT_AUTH_HEADER_PREFIX': 'JWT', # 设置在请求头中的前缀,之前是通过请求体来做的,这里有所区别
}
- 配置路由
from rest_framework_jwt.views import obtain_jwt_token
# urlpatterns中
path('login/', obtain_jwt_token), # jwt的认证接口(路径可自定义任意命名)
- 配合权限进行使用
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": [
'rest_framework_jwt.authentication.JSONWebTokenAuthentication', # 全局设置的方法,也可在单个视图中设置
],
'DEFAULT_PERMISSION_CLASSES': [
# IsAuthenticated 仅通过认证的用户
'rest_framework.permissions.IsAuthenticated'
]
}
3.2 使用扩展 Django 中的用户模型
- 参考地址,使用方法完全相同
- 需要在settings.py中指明
AUTH_USER_MODDEL="应用名.类名"
3.3 自定义认证成功返回字段
# user/utils/my_response_payload.py
def jwt_response_payload_handler(token, user=None, request=None):
"""
自定义jwt认证成功返回数据
"""
return {
'token': token,
'user_id': user.id,
'username': user.username
}
# settings.py
JWT_AUTH = {
'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=60*5), # 设置JWT Token的有效时间
'JWT_AUTH_HEADER_PREFIX': 'JWT', # 设置 请求头中的前缀
'JWT_RESPONSE_PAYLOAD_HANDLER':'user.utils.my_payload_response.jwt_response_payload_handler',
}