首页 > 数据库 >redisson分布式限流[RRateLimiter]源码分析

redisson分布式限流[RRateLimiter]源码分析

时间:2022-10-30 11:33:11浏览次数:86  
标签:令牌 redisson -- redis ARGV 限流 call local 源码

接下来在讲一讲平时用的比较多的限流模块--RRateLimiter

1.简单使用

public static void main(String[] args) throws InterruptedException {
RRateLimiter rateLimiter = createLimiter();

int allThreadNum = 20;

CountDownLatch latch = new CountDownLatch(allThreadNum);

long startTime = System.currentTimeMillis();
for (int i = 0; i < allThreadNum; i++) {
// new Thread(() -> {
if(i % 3 == 0) Thread.sleep(1000);
boolean pass = rateLimiter.tryAcquire();
if(pass) {
log.info("get ");
} else {
log.info("no");
}
// latch.countDown();
// }).start();
}
// latch.await();
System.out.println("Elapsed " + (System.currentTimeMillis() - startTime));
}

public static RRateLimiter createLimiter() {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setPassword("123456")
.setAddress("redis://xxxx:6379");

RedissonClient redisson = Redisson.create(config);
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3");
// 初始化:PER_CLIENT 单实例执行,OVERALL 全实例执行
// 最大流速 = 每10秒钟产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, 3, 10, RateIntervalUnit.SECONDS);
return rateLimiter;
}

实际结果:

[2022-10-29 14:32:46.261][INFO ][main][][] RedisTest - get 
[2022-10-29 14:32:46.312][INFO ][main][][] RedisTest - get
[2022-10-29 14:32:46.358][INFO ][main][][] RedisTest - get
[2022-10-29 14:32:47.416][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:47.469][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:47.517][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:48.577][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:48.623][INFO ][main][][] RedisTest - no

2\. 实现限流redisson使用了哪些redis数据结构

  1. Hash结构 -- 限流器结构:
    参数rate代表速率
    参数interval代表多少时间内产生的令牌
    参数type代表单机还是集群
  2. redisson分布式限流[RRateLimiter]源码分析_初始化

  3. ZSET结构 -- 记录获取令牌的时间戳,用于时间对比。
    1667025166312 --> 2022-10-29 14:32:46
    1667025166262 --> 2022-10-29 14:32:46
    1667025166215 --> 2022-10-29 14:32:46

redisson分布式限流[RRateLimiter]源码分析_sed_02

3\. String结构 --记录的是当前令牌桶中的令牌数【很明显被我用完了现在是0】

redisson分布式限流[RRateLimiter]源码分析_redis_03

3\. 超过10s,我再次获取一个令牌,数据结构发生的变化

  1. ZSET结构。-- 新生成一个ZSET结构,存放获取令牌的时间戳

redisson分布式限流[RRateLimiter]源码分析_sed_04

  1. String 结构 --当前令牌桶还有2个令牌

redisson分布式限流[RRateLimiter]源码分析_初始化_05

4\. 源码浅析

RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3");
// 初始化
// 最大流速 = 每10秒钟产生3个令牌
rateLimiter.trySetRate(RateType.PER_CLIENT, 3, 10, RateIntervalUnit.SECONDS);

初始化定义没有什么好讲的,就是创建HASH结构

主要还是讲讲: rateLimiter.tryAcquire()

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');local interval = redis.call('hget', KEYS[1], 'interval');local type = redis.call('hget', KEYS[1], 'type');assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')local valueName = KEYS[2];local permitsName = KEYS[4];if type == '1' then valueName = KEYS[3];permitsName = KEYS[5];end;local currentValue = redis.call('get', valueName); if currentValue ~= false then local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('fI', v);released = released + permits;end; if released > 0 then redis.call('zrem', permitsName, unpack(expiredValues)); currentValue = tonumber(currentValue) + released; redis.call('set', valueName, currentValue);end;if tonumber(currentValue) < tonumber(ARGV[1]) then local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1); local random, permits = struct.unpack('fI', nearest[1]);return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);else redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); return nil; end; else assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); redis.call('set', valueName, rate); redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); return nil; end;", Arrays.asList(this.getName(), this.getValueName(), this.getClientValueName(), this.getPermitsName(), this.getClientPermitsName()), new Object[]{value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong()});
}

主要就是这段lua代码,下面我详细过一下

作者目前用的3.16.3版本,刚好遇见redisson的bug,见​​3197​​,请大家用最新版本,以下为修复后解析。

-- 获取hash结构的速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 获取hash结构的时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
-- 获取hash结构的时间类型
local type = redis.call("hget", KEYS[1], "type")
-- 判断是否初始化限流结构
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")

-- {name}:value string结构,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]

-- {name}:permits zset结构,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]

-- 单机限流才会用到,集群模式不用关注
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end

-- 生产速率rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")

-- 初始化RateLimiter并不会初始化stirng结构,因此第一次获取这里currentValue是null
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
-- 第二次获取令牌执行
-------------------------- 获取zset结构:统计之前的请求令牌数
-- 范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
for i, v in ipairs(expiredValues) do
-- 获取请求数permits
local random, permits = struct.unpack("fI", v)
released = released + permits
end

-- 之前的请求令牌数 > 0, 例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数
if released > 0 then
-- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
------------------------- 更新string结构:=剩下令牌数+释放令牌数
redis.call("set", valueName, currentValue)
end

-- 如果当前令牌数 < 请求的令牌数
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
local random, permits = struct.unpack("fI", nearest[1])
-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
------------------------- 更新zset结构
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
------------------------- 更新Stringt结构,减少一个剩下的令牌数
redis.call("decrby", valueName, ARGV[1])
return nil
end
else
--------汀雨笔记----------------- 初始化Stringt结构,当前限流器的令牌数
redis.call("set", valueName, rate)

--------汀雨笔记----------------- 初始化zset结构
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体,
-- ARGV[2]就是请求时间戳,ARGV[1]是请求的令牌数,统计会用到,ARGV[3]是当前时间戳为种子的随机数,具体用处还不知道,知道的网友可以留言

------------------------- 更新Stringt结构,因为这是获取令牌操作,减掉一个令牌
-------------------------【本文作者认为,这里可以直接初始化string结构,值为rate - 1】
redis.call("decrby", valueName, ARGV[1])
return nil
end

复制代码

这段lua代码也并不复杂,令牌桶的数量主要是通过时间窗口来控制,判断上一个请求是否超过了令牌生产周期。

留下一个疑问?

-- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】 
redis.call("zrem", permitsName, unpack(expiredValues))
复制代码

我自己在本地测试,只要超过10s,permitsName就不一样,这就导致了这部分数据是不能移除的,就产生了冗余数据,从前面的截图也可以看出,是新生成了一个zset数据结构。

相当于直接走到了这一步:

------------------------- 更新zset结构 
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
复制代码

至于为什么会产生这样的结果,会的小伙伴可以留言,或者过段时间我提个issue。

及时当勉励 岁月不待人

能看到这里的人呀,都是菁英。❤❤️❤️❤️❤

非常感谢

菁英们能看到这里,如果这个文章写得还不错,觉得有点东西的话 求点赞

标签:令牌,redisson,--,redis,ARGV,限流,call,local,源码
From: https://blog.51cto.com/u_15773567/5807487

相关文章

  • 多语言在线客服系统源码-自动识别中英环境-私有化部署完美支持跨境电商网站
    如果您的客户遍布全球,客户沟通就必须跨越语言障碍。用客户当地的语言跟他们交谈,可以帮助您在客户生命周期的所有阶段建立信任,当然也包括服务支持。 具体做法,看看这四点......
  • Nginx源码编译并运行
    获取源码包并解压登录http://nginx.org/en/download.htmlwgethttp://nginx.org/download/nginx-1.20.2.tar.gztar-zxvfnginx-1.20.2.tar.gz安装Nginxcdnginx-1.2......
  • spdlog日志库源码:logger类
    目录特性类图关系logger数据成员logger函数成员构造与析构构造函数拷贝构造、移动构造交换操作log()记录日志消息格式串普通字符串日志级别宽字符支持sink_it_:将log消息交......
  • 最新php多商户多仓库带扫描云进销存系统ERP管理系统Saas营销版无限商户源码
    1、电脑端+手机端,手机实时共享,手机端一目了然。2、多商户Saas营销版无限开商户,用户前端自行注册,后台管理员审核开通3、管理员开通商户,可以设置商户到期时间、权限等等,无......
  • Spring源码-SpringMVC-搭建springmvc环境
    一、新建模块myself-web新建gradle的web项目,右键项目名,选择NEW-Moudle.左边选择Gradle,右下选择web即可。build.gradleplugins{id'java'id'war'id"com.bmuschko......
  • xxl-job 源码初探
    xxl-job客户端把JobHandle的value和method映射关系存储到map中1.1启动入口1.2在该文件getBean后被调用1.3xxl-job实际映射位置1.4放置到map中......
  • String源码分析(四)
    ......
  • Thread类【JDK源码分析】
    Thread类【JDK源码分析】​​前言​​​​推荐​​​​说明​​​​Thread类​​​​基本信息​​​​嵌套类摘要​​​​字段摘要​​​​方法摘要​​​​currentThread​......
  • StringBuffer类【JDK源码分析】
    StringBuffer类【JDK源码分析】​​前言​​​​推荐​​​​说明​​​​StringBuffer类​​​​基本信息​​​​属性​​​​构造方法​​​​部分方法​​​​length​......
  • String类【JDK源码分析】
    String类【JDK源码分析】​​前言​​​​推荐​​​​说明​​​​String类​​​​基本信息​​​​属性​​​​部分方法​​​​charAt​​​​equals​​​​getBytes......