首页 > 其他分享 >深入探索令牌桶限流的原理与实践

深入探索令牌桶限流的原理与实践

时间:2024-05-30 10:22:36浏览次数:23  
标签:令牌 log 探索 -- redis tokens 限流

在当今的互联网时代,随着用户数量和请求量的不断增加,系统的性能和稳定性面临着巨大的挑战。限流算法作为保障系统稳定性的重要手段之一,被广泛应用于各种服务和应用中。限流的核心目的是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。

常见限流算法对比

常见的限流算法有四种:

● 令牌桶算法(Token Bucket)

· 原理:令牌桶算法是一种基于令牌的限流算法,它维护一个固定容量的令牌桶,按照固定速率往桶中添加令牌,每当有请求到来时,消耗一个令牌,如果桶中没有足够的令牌,则拒绝该请求。

· 优点:平滑限流,可以应对突发流量;灵活控制流量速率。

· 缺点:对于突发流量,需要足够的令牌桶容量用来应对,否则可能会出现丢弃部分请求的情况。

● 漏桶算法(Leaky Bucket)

· 原理:漏桶算法是一种基于漏桶的限流算法,它维护一个固定容量的漏桶,按照固定速率漏水,当有请求到来时,漏桶中的水量增加,如果漏桶已满,则拒绝该请求。

· 优点:平滑限流,可以限制请求的处理速率。

· 缺点:对于突发流量,可能会出现短时的请求丢失。

● 计数器算法(Fixed Window)

· 原理:计数器算法是一种简单的限流算法,通过统计单位时间内的请求数量,当请求达到设定的阈值时,拒绝后续请求。

· 优点:简单易实现,对于固定流量场景适用。

· 缺点:无法处理突发流量,可能导致性能下降或服务不可用。

● 漏斗算法(Funnel Algorithm)

· 原理:漏斗算法是一种基于漏斗的限流算法,它模拟了一个漏斗的过程,请求进入漏斗后,根据漏斗的容量和速率来控制请求的处理速度。

· 优点:可以动态调整漏斗的容量和速率,适应不同的流量情况。

· 缺点:相对复杂,实现和调优难度较大。

令牌桶算法

在微服务架构中,Spring Cloud Gateway 作为服务网关,承担着流量管理和路由分发的关键角色。为了增强其限流能力,Spring Cloud Gateway 官方提供了 RequestRateLimiterGatewayFilterFactory 类,该类通过集成 Redis 和使用 Lua 脚本来实现高效的令牌桶限流策略

在众多限流算法中,令牌桶算法以其独特的优点和灵活性,成为了业界广泛采用的一种方案。数栈「数据服务平台DataAPI」基于 Spring Cloud 的已有限流能力,进一步进行了定制化的开发和优化。

上文简单介绍过,令牌桶算法是一种流量控制算法,用于限制单位时间内通过的请求数量,它能够平滑地限制请求速率,防止突发大量请求对系统造成过载。以下是令牌桶算法的基本原理和工作流程:

● 令桶牌

算法中的“令牌桶”类似于一个存放令牌的容器。在每个固定的时间间隔(例如每秒),系统会往令牌桶中添加一定数量的令牌。

● 请求处理

当一个请求到达时,系统会尝试从令牌桶中获取一个令牌。如果令牌桶中有足够的令牌,则系统允许处理该请求,并从令牌桶中取走一个令牌。如果令牌桶中没有足够的令牌,则系统拒绝该请求或者将其放入队列等待。

● 令牌生成

令牌桶中的令牌数量会受到限制,它不能无限制地增长。当令牌桶已经满了时,不会再继续产生新的令牌,多余的令牌会被丢弃。

file

通过上述工作流程,令牌桶算法可以实现对请求的流量控制,使得系统能够以稳定的速率处理请求。这种算法适合于需要限制请求速率的场景,如接口调用频率控制、网络流量控制等。

自定义限流

通过继承 AbstractRateLimiter 重写 isAllowed 方法实现自定义限流。

file

通过继承 AbstractGatewayFilterFactory 重写 apply 方法调用具体的限流方法。

file
file

isAllowed 方法分析:

@Override
    @SuppressWarnings("unchecked")
    public Mono<Response> isAllowed(String routeId, String id) {
        // 初始化判断
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        }

        // How many requests per second do you want a user to be allowed to do?
        int replenishRate = 0;

        // How much bursting do you want to allow?
        int burstCapacity = 0;

        // How many tokens are requested per request?
        int requestedTokens = 1;

        final RedisLimitDefinition limitDefinition;

        if(LimitListRouteDefinitionLocator.ID_PREFIX.equals(routeId.split(ValidateRequestGlobalFilter.SPLIT)[0])){
            limitDefinition = limitConfig.getLimitInfoMap().get(id);
            if (ObjectUtils.isEmpty(limitDefinition) ){
                throw new IllegalArgumentException("No Configuration found for uri " + id);
            }
            replenishRate = limitDefinition.getReplenishRate();

            // How much bursting do you want to allow?
            burstCapacity = limitDefinition.getBurstCapacity();
        } else{
            // 自定义逻辑,分为分钟级限流和秒级限流
            String[] routeIdArr = routeId.split(ValidateRequestGlobalFilter.SEPARATOR);
            if(routeIdArr.length > 1){
                replenishRate = Integer.parseInt(routeIdArr[0]);
                burstCapacity = replenishRate * 60;
                limitDefinition = new RedisLimitDefinition(replenishRate, replenishRate, null, null);
                requestedTokens = 60;
            } else{
                replenishRate = Integer.parseInt(routeIdArr[0]);
                burstCapacity = replenishRate;
                limitDefinition = new RedisLimitDefinition(replenishRate, replenishRate, null, null);
                requestedTokens = 1;
            }
        }


        try {
            List<String> keys = getKeys(id);

            // The arguments to the LUA script. time() returns unixtime in seconds.
            List<String> scriptArgs = Arrays.asList(replenishRate + "",
                    burstCapacity + "", Instant.now().getEpochSecond() + "",
                    requestedTokens + "");
            // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
            // 调用lua脚本并返回结果
            Flux<List<Long>> flux = this.reactiveStringRedisTemplate.execute(this.script, keys,
                    scriptArgs);
            return flux.onErrorResume(throwable -> {
                if (log.isDebugEnabled()) {
                    log.debug("Error calling rate limiter lua", throwable);
                }
                return Flux.just(Arrays.asList(1L, -1L));
            }).reduce(new ArrayList<Long>(), (longs, l) -> {
                longs.addAll(l);
                return longs;
            }).map(results -> {
                boolean allowed = results.get(0) == 1L;
                Long tokensLeft = results.get(1);

                Response response = new Response(allowed,
                        getHeaders(limitDefinition, tokensLeft));

                if (log.isDebugEnabled()) {
                    log.debug("response: " + response);
                }
                return response;
            });
        }
        catch (Exception e) {
            /*
             * We don't want a hard dependency on Redis to allow traffic. Make sure to set
             * an alert so you know if this is happening too much. Stripe's observed
             * failure rate is 0.01%.
             */
            log.error("Error determining if user allowed from redis", e);
        }
        return Mono.just(new Response(true, getHeaders(limitDefinition, -1L)));
    }

LUA 脚本

LUA 脚本是内置在数栈后端服务中的,位置如图所示:

file

接下来进行脚本代码分析:

-- 基于唯一id生成的token key和timestamp key
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

-- 对应的脚本入参,分别为
-- 往令牌桶放置的速率,每秒n个
local rate = tonumber(ARGV[1])
-- 令牌桶最大容量
local capacity = tonumber(ARGV[2])
-- 当前时间戳,秒级
local now = tonumber(ARGV[3])
-- 每次请求消耗的令牌数量
local requested = tonumber(ARGV[4])

-- 放满令牌桶是的时长
local fill_time = capacity/rate
-- redis key过期时间
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

-- 这里对于last_tokens和last_refreshed的操作既是赋值也有初始化的作用
-- 获取令牌桶的数量,如果为空,将令牌桶容量赋值
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
-- 获取最后的更新时间戳,如果为空,设置为0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

-- 计算出上次到这次统计的时间间隔
local delta = math.max(0, now-last_refreshed)
-- 上次剩余的令牌数量+新生成的令牌,这个值不会大于令牌桶容量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 总剩余令牌是否能满足一次请求消耗的令牌
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
-- 请求是否允许通过的标志位
local allowed_num = 0
-- 满足的话将总数减去单次消耗的令牌记为当前桶容量,并重新放置到redis key中
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

模拟计算流程:

file

针对上述流程,由于设定的是每分钟允许三次请求,可以看到当桶内令牌满是只能通过三次请求。当桶内令牌空时,每隔20秒会生成刚好够一次请求的令牌数,实现对流量的平滑控制。

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057?src=szsm

《数栈产品白皮书》下载地址:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky

标签:令牌,log,探索,--,redis,tokens,限流
From: https://www.cnblogs.com/DTinsight/p/18221858

相关文章

  • 深入探索Java HashMap底层源码:结构、原理与优化
    引言简述HashMap在Java集合框架中的地位及其应用场景。阐明学习HashMap底层原理的重要性,特别是在面试、性能调优和解决并发问题方面的价值。1.HashMap基础概念数据结构:介绍HashMap的核心——哈希表,包括数组加链表/红黑树的结构。线程安全性:强调HashMap是非线程安全的,以及在......
  • 【数据结构】探索树中的奇妙世界
    专栏介绍:哈喽大家好,我是野生的编程萌新,首先感谢大家的观看。数据结构的学习者大多有这样的想法:数据结构很重要,一定要学好,但数据结构比较抽象,有些算法理解起来很困难,学的很累。我想让大家知道的是:数据结构非常有趣,很多算法是智慧的结晶,我希望大家在学习数据结构的过程是一种愉......
  • Vue 组件生命周期:探索钩子
    title:Vue组件生命周期:探索钩子date:2024/5/2718:42:38updated:2024/5/2718:42:38categories:前端开发tags:生命周期异步加载通信原理父子通信兄弟通信跨层通信性能优化第1章:介绍与背景1.1什么是Vue组件生命周期?Vue组件生命周期是指Vue组件从创建......
  • AI大模型探索之路-实战篇10:数据预处理的艺术:构建Agent智能数据分析平台的基础
    系列篇章......
  • HarmonyOS SDK助力中国建设银行探索金融领域创新场景
    今年年初,中国建设银行(以下简称建行)手机银行首批适配HarmonyOSNEXT,并高效实现其应用的核心功能迁移。同时,建行手机银行在HarmonyOSSDK的加持下,充分发挥鸿蒙原生应用在原生智能方面的优势,让用户尽享更高效便捷的线上金融服务。HarmonyOSSDK场景化控件助力建行高效开发自建行加......
  • 探索 ChatboxAI:智能对话的新时代
    在人工智能迅速发展的今天,智能对话已经成为了我们日常生活中不可或缺的一部分。从智能助理到聊天机器人,AI技术正在改变我们与世界互动的方式。今天,我们要介绍的是一个全新且功能强大的平台——ChatboxAI。什么是ChatboxAI?ChatboxAI是一个基于先进自然语言处理技术的对......
  • 探索数据结构:单链表的实践和应用
    ......
  • 深入探索C语言动态内存分配
    在编程的广阔天地里,C语言以其直接操控底层的能力和高效性能,至今仍占据着不可替代的地位。而在C语言的众多特性中,动态内存分配无疑是一项核心而又充满挑战的技术。本文将引领您深入探索这一技术的奥秘,从理论到实践,揭示动态内存分配的魅力所在。一、动态内存分配的必要性在程序......
  • 深入探索 MongoDB:高级索引解析与优化策略
    MongoDB是一种非常流行的NoSQL数据库,它支持丰富的索引类型和功能,以提高数据查询的效率和性能。本文将详细介绍MongoDB的高级索引,包括基本语法、常用命令、示例、应用场景、注意事项和总结。基本语法在MongoDB中,可以使用createIndex()方法创建索引,语法如下:db.col......
  • 423世界读书日,探索「读书艺术」与「工作技能」提升
    本文从个人阅读的书籍当中,挑选了以下主题相关书籍以供探索:书籍选择的艺术、高效阅读策略、时间与精力管理的智慧、思维模式、提升工作技能的诀窍,以及引人入胜的小说人物传记。希望能够激发你的思考,助力你的工作成长旅程。本文最初撰写于2021年1月份,本周进行了大纲整理和调整。......