三大认证源码分析
drf的APIView在执行视图类的方法之前在dispatch中执行了三大认证
self.initial(request, *args, **kwargs)
initial的源码如下:
def initial(self, request, *args, **kwargs):
# 能够解析的编码,版本控制
self.format_kwarg = self.get_format_suffix(**kwargs)
neg = self.perform_content_negotiation(request)
request.accepted_renderer, request.accepted_media_type = neg
version, scheme = self.determine_version(request, *args, **kwargs)
request.version, request.versioning_scheme = version, scheme
# 认证组件的执行位置
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 频率组件
self.check_throttles(request)
认证类执行源码分析
认证组件源码:
def perform_authentication(self, request):
request.user # 乍一看是个属性,点进去发现是个方法,只是被包装成了属性
Request类的user方法:
@property
def user(self):
if not hasattr(self, '_user'):
with wrap_attributeerrors():
# 此时的self是Request的对象,那么我们就需要找Request类中的_authenticate()
self._authenticate()
return self._user
def _authenticate(self):
# self.authenticators就是我们放在视图类上的一个个对象放到列表中
for authenticator in self.authenticators:
try:
# 返回了两个值,第一个是当前用户,第二个是token,只走这一个认证类后面的就不再走了
# 也可以只返回一个None,会继续执行下一个认证类
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
# 解压赋值,self.user是当前登录用户,self.auth是token
# self是当次请求的新的Request对象
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
总结:
1.配置在视图类上的认证类会在执行视图类方法之前执行,且在权限认证之前执行
2.自己写的认证类可以返回两个值或者返回一个None
3.后续可以从request.user中取出当前登录用户(前提是要在认证类中返回)
权限类执行源码分析
权限组件源码:
def check_permissions(self, request):
# permission是配置在视图类上的权限类的对象
for permission in self.get_permissions():
# 权限类对象执行has_permission,这就是我们为什么要重写has_permission方法的原因
# self是视图类的对象,就是我们自己写的has_permission的view参数
if not permission.has_permission(request, self):
# 如果返回的是False,走这里,就说明没有权限
# 配置了多个权限的情况下,第一个没过,后面的权限类就不会再继续执行了
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
get_permissions源码:
def get_permissions(self):
# self.permission_classes 就是我们配置在视图类上的权限类(不加括号)组成的列表
# permission就是里面的有权限类,permission()就是将权限类的对象放到列表中
return [permission() for permission in self.permission_classes]
总结:
1.权限组件的执行就是取出配置在视图类上的权限类,实例化得到对象,然后一个一个执行对象的has_perminnion方法,如果返回False,就直接结束,不再往下继续执行,权限认证不通过
2.如果视图类上没配置权限类,那么会使用配置文件中的配置
3.优先使用项目配置文件,其次使用drf内置配置文件
4.配置在视图类上的权限类会在认证类之后,频率类之前执行
频率组件源码分析
频率组件源码:
def check_throttles(self, request):
throttle_durations = []
# self.get_throttles()配置在视图类上的频率类对象,一个个的放到列表中
for throttle in self.get_throttles():
# 取出的频率类对象执行allow_request方法,如果返回False,则超频了,不能继续
# 如果返回的是False,则没有超频,可以继续
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration)
总结:
我们自己写的频率类继承了BaseThrottle,重写了allow_request,在内部判断如果超频就返回False,如果没超频就返回True
自定义频率类
from rest_framework.throttling import BaseThrottle
import time
class MyThrottle(BaseThrottle):
USER_THROTTLE = {}
# (1)取出访问者ip
# (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,在字典里,继续往下走 {ip地址:[时间1,时间2,时间3,时间4]}
# (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,把这种数据pop掉,这样列表中只有60s以内的访问时间,
# (4)判断,当列表小于3,说明一分钟以内访问不足三次,把当前时间插入到列表第一个位置,返回True,顺利通过
# (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
def __init__(self):
self.history = None
def allow_request(self, request, view):
# 1.取出访问者ip
ip = request.META.get('REMOTE_ADDR')
ctime = time.time()
# 2.判断ip在不在访问字典中
if ip not in self.USER_THROTTLE:
self.USER_THROTTLE[ip] = [ctime]
return True
# 3.判断当前时间减去ip列表中的第一次访问时间是否超过了60秒,如果超过60秒则将第一次访问时间弹出
self.history = self.USER_THROTTLE.get(ip)
while self.history and ctime - self.history[-1] > 60:
self.history.pop()
# 4.通过判断列表长度来判断当前访问次数
if len(self.history) < 3:
self.history.insert(0, ctime)
return True
# 5.列表长度大于3,返回False
else:
return False
# 超频后提示剩余多长时间可以继续访问
def wait(self):
import time
ctime = time.time()
return 60 - (ctime - self.history[-1])
SinpleRateThrottle
SimpleRateThrottle的allow_request方法
def allow_request(self, request, view):
# 这里就是通过配置文件和scop取出设置的频率限制
if self.rate is None:
return True
# 返回什么就以什么做限制,返回ip就以ip做限制
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
# 访问时间存储在self.history列表中
self.history = self.cache.get(self.key, [])
# 获取当前访问时间
self.now = self.timer()
# 判断self.history列表是否有值,且当前时间减去设置的时间是否大于第一次访问时间,是则将第一次访问时间弹出
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
# 判断访问次数是否超过设置的频率
if len(self.history) >= self.num_requests:
# 返回False
return self.throttle_failure()
# 返回True
return self.throttle_success()
SimpleRateThrottle的init方法
def __init__(self):
if not getattr(self, 'rate', None):
# self.rate= '5、h'
self.rate = self.get_rate()
# 5 3600
self.num_requests, self.duration = self.parse_rate(self.rate)
SimpleRateThrottle的get_rate方法
def get_rate(self):
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
# self.scope 是 lqz 字符串
# return '5/h'
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
SimpleRateThrottle的parse_rate方法
def parse_rate(self, rate):
# '5/h'
if rate is None:
return (None, None)
# num =5
# period= 'hour'
num, period = rate.split('/')
# num_requests=5
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
# (5,3600)
return (num_requests, duration)
基于APIView写分页
class BookView(ViewSetMixin, APIView):
def list(self, request):
books = Book.objects.all()
# 使用步骤
# 1 实例化得到一个分页类的对象
paginator = CommonLimitOffsetPagination()
# 2 调用分页类对象的paginate_queryset方法来完成分页,返回的page是 要序列化的数据,分页好的
page = paginator.paginate_queryset(books, request, self)
if page is not None:
serializer = BookSerializer(instance=page, many=True)
# 3 返回数据,调用paginator的get_paginated_response方法
# return paginator.get_paginated_response(serializer.data)
return Response({
'total': paginator.count,
'next': paginator.get_next_link(),
'previous': paginator.get_previous_link(),
'results': serializer.data
})
异常处理
APIView的dispatch执行三大认证和视图类的方法时,如果出了异常会被异常捕获,之后做统一处理
drf中就内置了一个函数做上面的操作,但是这个函数只处理drf的异常,程序出错或者主动抛的异常都不会被处理,如果需要处理的话就需要重新写一个函数
def common_exception_handler(exc, context):
# exc 错误对象
# context:上下文,有view:当前出错的视图类的对象,args和kwargs视图类方法分组出来的参数,request:当次请求的request对象
# 只要走到这里,就要记录日志 ,只有错了,才会执行这个函数
# 记录日志尽量详细
print(f'{datetime.now()},{id},{ip},{request.method},{request.path},{context.get("view").__class__.__name__},{str(exc)}')
res = exception_handler(exc, context)
if res: # 有值,说明返回了Response 对象,没有值说明返回None
# 如果是Response 对象说明是drf的异常,已经被处理了,如果是None表明没有处理,就是非drf的异常
res = Response(data={'code': 888, 'msg': res.data.get('detail', '请联系系统管理员')})
else:
# res = Response(data={'code': 999, 'msg': str(exc)})
# 记录日志
res = Response(data={'code': 999, 'msg': '系统错误,请联系系统管理员'})
return res
标签:None,return,get,APIView,self,request,rate,源码,三大
From: https://www.cnblogs.com/zyg111/p/17103363.html