1 反序列化更新,instance 就传要修改的对象,保证修改完成
def update(self, instance, validated_data):
publish_id = validated_data.pop('publish')
author_id = validated_data.pop('author')
for k in validated_data:
setattr(instance, k, validated_data[k])
instance.publish_id = publish_id
instance.author.set(author_id)
return instance
2 反序列化保存和更新,字段如下,实现更新和保存
publish_id=serializers.IntegerField(write_only=True)
authors_l=serializers.ListField(write_only=True)
class BookSerializer(serializers.Serializer):
publish_id = serializers.IntegerField(write_only=True)
authors_l = serializers.ListField(write_only=True)
def create(self, validated_data):
publish_id = validated_data.pop('publish_id')
author = validated_data.pop('authors_l')
book = Book.objects.create(**validated_data, publish_id=publish_id)
book.author.add(*author)
return book
def update(self, instance, validated_data):
print(validated_data)
publish_id = validated_data.pop('publish_id')
author_id = validated_data.pop('authors_l')
for k in validated_data:
setattr(instance, k, validated_data[k])
instance.publish_id = publish_id
instance.author.set(author_id)
return instance
def update(self, instance, validated_data):
print(validated_data) # {'book_name': 'Mona1', 'book_price': 441, 'publish_author': [1, [2, 3]]}
publish_id = validated_data.get('publish_author')[0]
author_id = validated_data.get('publish_author')[1]
validated_data.pop('publish_author')
for k in validated_data:
setattr(instance, k, validated_data[k])
instance.publish_id = publish_id
instance.author.set(author_id)
return instance
3 book,publish,author,author_detail
-把所有表的5个接口都写好
-不写author_detail,把author_detail统一放在author中
settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': "drftaskbook",
"USER": "root",
"PASSWORD": "123456",
"HOST": "127.0.0.1",
"PORT": 3307,
"CHARSET": "utf8mb4",
}
}
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
INSTALLED_APPS = [
'book',
'rest_framework',
]
views.py
from django.shortcuts import render
from rest_framework.response import Response
from rest_framework.views import APIView
from .models import Book, Publish, Authors
from .serializer import BookSerializer, PublishSerializer, AuthorsSerializer
# 所有
class BookView(APIView):
# 查询多条
def get(self, request):
obj = Book.objects.all()
if not obj:
return Response({'code': 404, "msg": f"没有图书存在"})
else:
serializer = BookSerializer(instance=obj, many=True)
return Response({'code': 200, "msg": "查询所有图书成功", "results": serializer.data})
# 新增一条
def post(self, request):
serializer = BookSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, "msg": "新增图书成功"})
else:
return Response({'code': 404, "msg": serializer.errors})
# 单个
class BookDetailView(APIView):
# 查询单个
def get(self, request, pk):
obj = Book.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, "msg": f"图书不存在"})
else:
serializer = BookSerializer(instance=obj)
return Response({'code': 200, "msg": f"查询id为{pk}的图书成功", "results": serializer.data})
# 删除单个
def post(self, request, pk):
obj = Book.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, "msg": f"图书不存在"})
else:
obj.delete()
return Response({'code': 200, "msg": f"删除id为{pk}的图书成功"})
# 修改单个
def put(self, request, pk):
obj = Book.objects.filter(pk=pk).first()
serializer = BookSerializer(instance=obj, data=request.data)
if not obj:
return Response({'code': 404, "msg": f"图书不存在"})
if serializer.is_valid():
serializer.save()
return Response({'code': 200, "msg": f"修改id为{pk}的图书成功", "result": serializer.data})
else:
return Response({'code': 200, "msg": serializer.errors})
class PublishView(APIView):
# 查询所有出版社
def get(self, request):
obj = Publish.objects.all()
serializer = PublishSerializer(instance=obj, many=True)
if not obj:
return Response({'code': 404, "msg": f"没有出版社存在"})
else:
return Response({'code': 200, "msg": "查询所有出版社成功", "results": serializer.data})
# 添加出版社
def post(self, request):
serializer = PublishSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, "msg": "新增出版社成功", "result": serializer.data})
else:
return Response({'code': 200, "msg": serializer.errors})
class PublishDetailView(APIView):
# 查询单个出版社
def get(self, request, pk):
obj = Publish.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的出版社不存在"})
serializer = PublishSerializer(instance=obj)
return Response({'code': 200, "msg": f"查询id为{pk}的出版社成功", "results": serializer.data})
# 删除单个出版社
def post(self, request, pk):
obj = Publish.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的出版社不存在"})
else:
obj.delete()
return Response({'code': 200, 'msg': f"删除id为{pk}的出版社成功"})
# 修改单个出版社
def put(self, request, pk):
obj = Publish.objects.filter(pk=pk).first()
serializer = PublishSerializer(instance=obj, data=request.data)
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的出版社不存在"})
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': f"修改id为{pk}的出版社成功", 'result': serializer.data})
else:
return Response({'code': 404, 'msg': serializer.errors})
class AuthorsView(APIView):
# 查询所有作者
def get(self, request):
obj = Authors.objects.all()
serializer = AuthorsSerializer(instance=obj, many=True)
if not obj:
return Response({'code': 404, "msg": f"没有作者存在"})
else:
return Response({'code': 200, "msg": "查询所有作者成功", "results": serializer.data})
# 添加作者
def post(self, request):
serializer = AuthorsSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, "msg": "新增作者成功", "result": serializer.data})
else:
return Response({'code': 200, "msg": serializer.errors})
class AuthorsDetailView(APIView):
# 查询单个作者
def get(self, request, pk):
obj = Authors.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的作者不存在"})
serializer = AuthorsSerializer(instance=obj)
return Response({'code': 200, "msg": f"查询id为{pk}的作者成功", "results": serializer.data})
# 删除单个作者
def post(self, request, pk):
obj = Authors.objects.filter(pk=pk).first()
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的作者不存在"})
else:
obj.delete()
return Response({'code': 200, 'msg': f"删除id为{pk}的作者成功"})
# 修改单个作者
def put(self, request, pk):
obj = Authors.objects.filter(pk=pk).first()
serializer = AuthorsSerializer(instance=obj, data=request.data)
if not obj:
return Response({'code': 404, 'msg': f"id为{pk}的作者不存在"})
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': f"修改id为{pk}的作者成功", 'result': serializer.data})
else:
return Response({'code': 404, 'msg': serializer.errors})
serializer.py
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/12
from rest_framework import serializers
from .models import Book, Publish, Authors, Authors_Detail
class PublishSerializers(serializers.Serializer):
name = serializers.CharField()
city = serializers.CharField()
class AuthorsSerializers(serializers.Serializer):
name = serializers.CharField()
age = serializers.IntegerField()
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = Book
fields = '__all__'
extra_kwargs = {
'publish': {'write_only': True},
'authors': {'write_only': True}
}
publish_detail = PublishSerializers(read_only=True, source='publish')
authors_detail = AuthorsSerializers(read_only=True, source='authors', many=True)
class PublishSerializer(serializers.ModelSerializer):
class Meta:
model = Publish
fields = '__all__'
class AuthorDetailSerializer(serializers.Serializer):
phone = serializers.CharField()
addr = serializers.CharField()
class AuthorsSerializer(serializers.ModelSerializer):
class Meta:
model = Authors
fields = ['name', 'age', 'author_detail']
author_detail = AuthorDetailSerializer()
def create(self, validated_data):
author_detail = validated_data.pop('author_detail')
author_detail_obj = Authors_Detail.objects.create(**author_detail)
data = Authors.objects.create(**validated_data, author_detail=author_detail_obj)
return data
def update(self, instance, validated_data):
author_detail = validated_data.pop('author_detail')
for k, v in validated_data.items():
setattr(instance, k, v)
instance.save()
for k, v in author_detail.items():
setattr(instance.author_detail, k, v)
instance.author_detail.save()
return instance
models.py
from django.db import models
class Book(models.Model):
name = models.CharField(max_length=32, db_index=True)
price = models.DecimalField(max_digits=5, decimal_places=2)
time = models.DateTimeField(auto_now_add=True, null=True, blank=True)
publish = models.ForeignKey('Publish', on_delete=models.CASCADE)
authors = models.ManyToManyField('Authors')
class Publish(models.Model):
name = models.CharField(max_length=32)
city = models.CharField(max_length=32)
class Authors(models.Model):
name = models.CharField(max_length=32)
age = models.IntegerField()
author_detail = models.OneToOneField(to='Authors_Detail', on_delete=models.CASCADE)
class Authors_Detail(models.Model):
phone = models.CharField(max_length=11,null=True,blank=True)
addr = models.CharField(max_length=32,null=True,blank=True)
urls.py
from django.contrib import admin
from django.urls import path, include
from book.views import BookView, BookDetailView, PublishView, PublishDetailView, AuthorsView, AuthorsDetailView
urlpatterns = [
path('admin/', admin.site.urls),
path('bookview/', BookView.as_view()),
path('bookdetailview/<int:pk>', BookDetailView.as_view()),
path('publishview/', PublishView.as_view()),
path('publishdetailview/<int:pk>', PublishDetailView.as_view()),
path('authorsview/', AuthorsView.as_view()),
path('authorsdetailview/<int:pk>', AuthorsDetailView.as_view()),
]
4 拓展作业
-找一个混合的后台模板,集成到你项目中
-1 显示用户访问记录
-2 echars---》显示访问客户端类型 饼形图
-https://echarts.apache.org/examples/zh/editor.html?c=pie-simple
-3 图书增删查改
5 登录注册修改密码接口
#1 创建用户表---》auth 的user表,扩写字段 加个 mobile
-注册接口:手机号
-手机号
-用户名:要么用手机号,要么随机生成
-密码:设置默认密码(加密)
-登陆接口:
-手机号+密码--》登陆成功
-修改密码接口:
-原密码--》校验
-更新
----------------------------
# 2 根据id获取用户详情
# 3 修改用户信息
# 4 查询所有用户
-----写到两个视图中--路由action--》序列化返序列化类可能不一样----
views.py
from django.shortcuts import render
from rest_framework.viewsets import GenericViewSet
from .serializer import LoginSerializer, PassWordSerializer, RegisterSerializer
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import Users
class User(GenericViewSet):
queryset = Users.objects.all()
serializer_class = LoginSerializer
lookup_field = 'phone'
def get_serializer_class(self):
if self.action == 'edit_password':
return PassWordSerializer
if self.action == 'register':
return RegisterSerializer
else:
return self.serializer_class
@action(methods=['POST'], detail=False)
def login(self, request):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
return Response({'code': 200, 'msg': '登录成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
@action(methods=['POST'], detail=False)
def register(self, request):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': '注册成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
@action(methods=['POST'], detail=False)
def edit_password(self, request):
mobile = request.data.get('mobile')
obj = Users.objects.filter(mobile=mobile).first()
if not obj:
return Response({'code': 404, 'msg': '手机号不存在!'})
serializer = self.get_serializer(instance=obj, data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': '修改成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
urls.py
from django.contrib import admin
from django.urls import path,include
from app01.views import User
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('users',User,'users')
urlpatterns = [
path('',include(router.urls))
]
models.py
from django.db import models
from django.contrib.auth.models import User, AbstractUser
class Users(AbstractUser):
mobile = models.CharField(max_length=11)
serializer.py
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/16
from rest_framework import serializers
from .models import Users
import re
import hashlib
class LoginSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile', 'password']
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
res = Users.objects.filter(mobile=value)
if not res:
raise serializers.ValidationError("手机号码不存在!")
elif not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def validate_password(self, attrs):
md5 = hashlib.md5()
md5.update(attrs.encode())
attrs = md5.hexdigest()
res = Users.objects.filter(password=attrs)
if not res:
raise serializers.ValidationError('密码不正确!')
return attrs
class PassWordSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile', 'password']
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
res = Users.objects.filter(mobile=value)
if not res:
raise serializers.ValidationError("手机号码不存在!")
elif not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def update(self, instance, validated_data):
password = validated_data.get('password')
md5 = hashlib.md5()
md5.update(password.encode())
password = md5.hexdigest()
instance.password = password
instance.save()
return instance
class RegisterSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile']
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
if not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def create(self, validated_data):
mobile = validated_data.get('mobile')
password = '123456'
md5 = hashlib.md5()
md5.update(password.encode())
password = md5.hexdigest()
mobile_last4 = mobile[-4:]
username = '用户' + mobile_last4 + '_' + password[0:8]
Users.objects.create(mobile=mobile, username=username, password=password)
return validated_data
第二版
views.py
import hashlib
import uuid
from django.shortcuts import render
from rest_framework.viewsets import GenericViewSet
from .serializer import LoginSerializer, PassWordSerializer, RegisterSerializer, DetailSerializer
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import Users
from .authentication import LoginAuth
class User(GenericViewSet):
queryset = Users.objects.all()
serializer_class = LoginSerializer
def get_serializer_class(self):
if self.action == 'edit_password':
return PassWordSerializer
if self.action == 'register':
return RegisterSerializer
else:
return self.serializer_class
@action(methods=['POST'], detail=False)
def login(self, request):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': '登录成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
@action(methods=['POST'], detail=False)
def register(self, request):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': '注册成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
@action(methods=['PUT'], detail=False)
def edit_password(self, request):
mobile = request.data.get('mobile')
obj = Users.objects.filter(mobile=mobile).first()
if not obj:
return Response({'code': 404, 'msg': '手机号不存在!'})
serializer = self.get_serializer(instance=obj, data=request.data)
if serializer.is_valid():
serializer.save()
return Response({'code': 200, 'msg': '修改成功!'})
else:
return Response({'code': 404, 'msg': serializer.errors})
# 2 根据id获取用户详情
# 3 修改用户信息
# 4 查询所有用户
from rest_framework.mixins import RetrieveModelMixin, UpdateModelMixin, ListModelMixin
class UserDetail(GenericViewSet, RetrieveModelMixin, UpdateModelMixin, ListModelMixin):
queryset = Users.objects.all()
serializer_class = DetailSerializer
authentication_classes = [LoginAuth]
serializer.py
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/16
import uuid
from rest_framework import serializers
from .models import Users, UserToken
import re
import hashlib
from rest_framework.exceptions import ValidationError
class LoginSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile', 'password']
def md5hex(self, data):
data = str(data)
md5 = hashlib.md5()
md5.update(data.encode())
return md5.hexdigest()
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
res = Users.objects.filter(mobile=value)
if not res:
raise serializers.ValidationError("手机号码不存在!")
elif not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def create(self, validated_data):
mobile = validated_data.get('mobile')
old_password = validated_data.get('password')
new_password = self.md5hex(old_password)
obj = Users.objects.filter(mobile=mobile, password=new_password).first()
if obj:
token = str(uuid.uuid4())
UserToken.objects.update_or_create(defaults={"token": token}, user=obj)
return validated_data
else:
raise ValidationError('密码不正确!')
class PassWordSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile', 'password']
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
res = Users.objects.filter(mobile=value)
if not res:
raise serializers.ValidationError("手机号码不存在!")
elif not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def update(self, instance, validated_data):
password = validated_data.get('password')
md5 = hashlib.md5()
md5.update(password.encode())
password = md5.hexdigest()
instance.password = password
instance.save()
return instance
class RegisterSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = ['mobile']
def validate_mobile(self, value):
# 检查手机号是否符合特定格式或其他条件
pattern = re.compile(r'^1[3-9]\d{9}$')
if not pattern.match(value):
raise serializers.ValidationError("手机号码格式不正确!")
return value
def create(self, validated_data):
mobile = validated_data.get('mobile')
password = '123456'
md5 = hashlib.md5()
md5.update(password.encode())
password = md5.hexdigest()
mobile_last4 = mobile[-4:]
username = '用户' + mobile_last4 + '_' + password[0:8]
Users.objects.create(mobile=mobile, username=username, password=password)
return validated_data
class DetailSerializer(serializers.ModelSerializer):
class Meta:
model = Users
fields = '__all__'
def md5hex(self, data):
data = str(data)
md5 = hashlib.md5()
md5.update(data.encode())
return md5.hexdigest()
def update(self, instance, validated_data):
old_password = validated_data.get('password')
for k, v in validated_data.items():
setattr(instance, k, v)
new_password = self.md5hex(old_password)
instance.password = new_password
instance.save()
return instance
urls.py
from django.contrib import admin
from django.urls import path,include
from app01.views import User,UserDetail
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('users',User,'users')
router.register('userdetail',UserDetail,'userdetail')
urlpatterns = [
path('',include(router.urls))
]
models.py
from django.db import models
from django.contrib.auth.models import User, AbstractUser
class Users(AbstractUser):
mobile = models.CharField(max_length=11)
class UserToken(models.Model):
token = models.CharField(max_length=64)
user = models.OneToOneField(to='Users', on_delete=models.CASCADE)
authentication.py
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/16
import uuid
from rest_framework.authentication import BaseAuthentication
from .models import Users, UserToken
from rest_framework.response import Response
from rest_framework.exceptions import AuthenticationFailed
class LoginAuth(BaseAuthentication):
def authenticate(self, request):
token = request.META.get('HTTP_TOKEN')
res = UserToken.objects.filter(token=token).first()
if res:
user = res.user
return user, token
else:
raise AuthenticationFailed('请先登录!')
6 0417作业图书
# 1 图书的 查询单条,查询所有
# 2 图书 新增,修改,删除
# 3 普通用户可以 查询单条,查询所有
# 4 超级用户可以使用所有
# 5 所有接口,一天访问5次,按ip限制
# 6 图书查询所有接口
-图书名 和 price范围过滤
price_gt=10&price_lt=100
# 7 可以按价格和名字排序
# 8 查询所有接口,按 偏移分页
# 9 扩展---》继承BaseThrottle 实现频率限制
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/17
from rest_framework.permissions import BasePermission
class BookPermission(BasePermission):
def has_permission(self, request, view):
res = request.user.user_type
res_str = request.user.get_user_type_display()
if request.user and request.user.is_authenticated:
if res == 3:
return True
elif request.method == 'GET':
return True
return False
class BookView(ModelViewSet):
queryset = Book.objects.all()
serializer_class = BookSerializer
authentication_classes = [LoginAuth]
permission_classes = [BookPermission]
def list(self, request, *args, **kwargs):
qs = self.filter_queryset(self.get_queryset())
serializer = self.get_serializer(instance=qs, many=True)
return Response({'code': 100, 'msg':'查询所有数据成功','result':serializer.data})
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'app01.throttling.CommonThrottle'
],
}
7 0418 jwt作业
# 1 编写全局异常处理定制统一返回格式
-打印出错误详细信息(那个视图类报错了)
-扩展:自定义异常类,前端收到更多错误状态码
-扩展:研究全局异常处理源码执行流程
# 2 使用coreapi自动生成接口文档
-postman导出导入接口
-showdoc编写几个接口
-扩展:搭建yapi
-扩展:drf-yasg自动生成接口文档
# 3 快速体验jwt
# 4 拓展
-登陆接口--》签发token
-头 header = {'typ': 'JWT', 'alg': 'HS256'}
-荷载:{当前用户}
-签名:md5+settings.SECRET_KEY
-base64 adsfads.asfdasfd.asfda
-认证:写个认证了
. 分成三段
前两段 签名
比较新的和第三段,比较成功--》使用荷载--》用户id--》拿到用户
异常类
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/18
from rest_framework.views import exception_handler
from rest_framework.response import Response
# 自定义异常类
class PasswordException(Exception):
def __init__(self, msg):
self.msg = msg
import time
# 时间,请求方式,请求的地址,客户端ip,用户id
def common_exception_handler(exc, context):
"""
:param exc: 错误的原因 division by zero
:param context: 触发错误的各种参数
{
'view': <app01.views.testExc object at 0x000002058BF2A740>,
'args': (), 'kwargs': {},
'request': <rest_framework.request.Request: GET '/app01/testexc/'>
}
"""
request = context.get('request')
create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
method = request.method
addr = request.get_full_path()
ip = request.META.get('REMOTE_ADDR')
id = request.user
view = context.get('view')
log = [create_time, method, addr, ip, id,view]
with open('log.log', 'a', encoding='utf-8') as f:
f.write('\n' + str(log))
response = exception_handler(exc, context)
if response:
if isinstance(response.data, dict):
error = response.data.get('detail', '系统错误,请联系heartt')
return Response({"code": 991, 'msg': f"{error}"})
elif isinstance(response.data, list):
error = response.data[0]
else:
error = '系统错误,请联系heartt'
return Response({"code": 992, 'msg': f"{error}"})
else:
if isinstance(exc, ZeroDivisionError):
response = Response({"code": 993, 'msg': f"{str(exc)}"})
elif isinstance(exc, PasswordException):
response = Response({"code": 994, 'msg': f"密码错误"})
elif isinstance(exc, Exception):
response = Response({"code": 995, 'msg': f"{str(exc)}"})
else:
response = Response({"code": 999, 'msg': f"{str(exc)}"})
return response
views.py
from django.shortcuts import render
from rest_framework.viewsets import GenericViewSet
from django.contrib.auth.models import User
from .serializer import UserSerializer
from rest_framework.decorators import action
from rest_framework.response import Response
from django.contrib import auth
import hashlib
import json
import base64
from django.conf import settings
from .Authenticated import UserAuth
from rest_framework.mixins import ListModelMixin
class UserLoginView(GenericViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
authentication_classes = [UserAuth]
@action(methods=['POST'], detail=False)
def login(self, request):
username = request.data.get('username')
password = request.data.get('password')
user = auth.authenticate(request, username=username, password=password)
if user:
header = {'typ': 'JWT', 'alg': 'HS256'}
auth.login(request, user)
obj = User.objects.filter(username=username).first()
payload = {
key: getattr(obj, key)
for key in ['id', 'username', 'is_superuser']
}
header_str = json.dumps(header)
payload_str = json.dumps(payload)
secret_key = settings.SECRET_KEY
signature = header_str + payload_str + secret_key
md5 = hashlib.md5()
md5.update(signature.encode('utf-8'))
res = md5.hexdigest()
header_str_b64 = base64.b64encode(header_str.encode('utf-8'))
payload_str_b64 = base64.b64encode(payload_str.encode('utf-8'))
signature_b64 = base64.b64encode(res.encode('utf-8'))
access_key = header_str_b64 + b'.' + payload_str_b64 + b'.' + signature_b64
return Response({'code': 100, 'msg': '登录成功!', 'access_key': access_key})
class UserDetailView(GenericViewSet,ListModelMixin):
def list(self, request, *args, **kwargs):
return Response({'code': 200, 'msg': f'查询成功,当前用户是{request.user}'})
认证类
# -*- coding: utf-8 -*-
# author : heart
# blog_url : https://www.cnblogs.com/ssrheart/
# time : 2024/4/18
import base64
import hashlib
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from django.conf import settings
import json
from django.contrib.auth.models import User
class UserAuth(BaseAuthentication):
def authenticate(self, request):
token = request.META.get('HTTP_TOKEN')
if token:
secret_key = settings.SECRET_KEY
ss = token.split('.')
header = ss[0]
payload = ss[1]
new_access_key = ss[2]
header_b = base64.b64decode(header)
payload_b = base64.b64decode(payload)
header_ab = json.dumps(json.loads(header_b))
payload_ab = json.dumps(json.loads(payload_b))
signature = header_ab + payload_ab + secret_key
md5 = hashlib.md5()
md5.update(signature.encode('utf-8'))
res = md5.hexdigest()
signature_b = base64.b64encode(res.encode('utf-8'))
if signature_b.decode('utf-8') == new_access_key:
id = json.loads(payload_b).get('id')
user_obj = User.objects.filter(pk=id).first()
return user_obj,new_access_key
else:
raise AuthenticationFailed('请先登录!')
标签:return,self,案例,import,data,Response,serializer,DRF
From: https://www.cnblogs.com/ssrheart/p/18150677