Redis使用案例之限流器
一、什么是限流器
限流器是一种流量限制的工具,可用于防止接口在同一时间有过高的并发
Redis有两种实现限流器的算法
1.基于桶令牌的算法
以时间段为单位,恒定的给桶中放入令牌,每次请求从桶中获取令牌
2.基于漏桶算法
控制接口流速,先创建一个桶,所有请求进来都放到桶中,再以固定的速度放出请求
本文介绍的是Redission实现的基于桶令牌的限流器
二、基于桶令牌算法限流器使用的数据结构
Redission实现的限流器中使用了Redis的hash、zset、字符串数据结构
为什么使用zset作为限流数据保存的底层结构
-
zset的底层数据结构
Redis中的zset数据由字典hash和跳表skiplist实现
每一个zset对象都包含两个值,一个成员member,一个分数score
字典部分是为了方便查询成员信息时使用hash算法
跳表部分是为了方便根据分数进行范围查询(因为跳表中的每一层都是根据分数排序的单链表结构)
-
使用zset的原因主要是因为其中的跳表结构
可以将调用的时间当做分数,方便统计一段时间内的访问量
三、具体实现
-
首先需要记录限流器的速率以及窗口大小,可以采用hash结构进行记录,方便获取值
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"//设置限流器的单位时间的速率 + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"//设置限流器限流的时间间隔 + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",//设置限流器类型,这里redission默认0 Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal()); }
限流器使用之前需要先设置初始值,Redission的内部使用了lua脚本的方式来保证原子性
-
如果限流器的初始值之前已经设置过就可以尝试获取令牌资源
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { byte[] random = getServiceManager().generateIdArray(); return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');"//获取之前存储的速率信息 + "local interval = redis.call('hget', KEYS[1], 'interval');"//获取限流的时间间隔信息 + "local type = redis.call('hget', KEYS[1], 'type');"//获取限流器的类型信息 + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"//判断限流器是否进行了初始化 + "local valueName = KEYS[2];"//定义value的key值,保存的是当前令牌的剩余数量 + "local permitsName = KEYS[4];"//定义限流保存的相关的入参,使用zset保存 + "if type == '1' then "//redission中默认是0 + "valueName = KEYS[3];" + "permitsName = KEYS[5];" + "end;" + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "//判断速率是否大于当前令牌请求的资源量,请求资源量超出则异常 + "local currentValue = redis.call('get', valueName); "//获取当前剩余令牌数量 + "local res;"//定义返回值 + "if currentValue ~= false then " + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "//根据当前时间统计已经超出限流时间的请求 + "local released = 0; " + "for i, v in ipairs(expiredValues) do " + "local random, permits = struct.unpack('Bc0I', v);" + "released = released + permits;"//循环统计超时请求量 + "end; " + "if released > 0 then "//如果有超时请求 + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "//移除不需要进行到期统计的请求 + "if tonumber(currentValue) + released > tonumber(rate) then "//令牌数量+释放的资源大于速率,说明之前资源并没有充分请求,重新设置令牌值为速率-释放调的资源量 + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); " + "else "//否则令牌数量为剩余令牌资源+释放掉的资源 + "currentValue = tonumber(currentValue) + released; " + "end; "//设置令牌数量 + "redis.call('set', valueName, currentValue);" + "end;" + "if tonumber(currentValue) < tonumber(ARGV[1]) then "//令牌数量小于请求资源量 + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); " + "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));" + "else "//剩余令牌数量充足,添加元素并减去令牌资源 + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "res = nil; " + "end; " + "else "//如果令牌没有值没有则说明是第一次进入进行初始化设置并添加请求并根据请求资源减去令牌数 + "redis.call('set', valueName, rate); " + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "res = nil; " + "end;" //超时相关部分(一般不使用) + "local ttl = redis.call('pttl', KEYS[1]); " + "if ttl > 0 then " + "redis.call('pexpire', valueName, ttl); " + "redis.call('pexpire', permitsName, ttl); " + "end; " + "return res;", Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()), value, System.currentTimeMillis(), random); }
上面这段代码就是获取令牌资源的核心部分,也是使用lua脚本来保障原子性操作的