首页 > 其他分享 >go-zero 是如何实现令牌桶限流的?

go-zero 是如何实现令牌桶限流的?

时间:2023-08-10 21:36:24浏览次数:39  
标签:return lim redis rate zero 限流 go now

原文链接: go-zero 是如何实现令牌桶限流的?

上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。

但是采用固定窗口实现的限流器会有两个问题:

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。

// core/limit/tokenlimit.go

// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 当前时间戳
local now = tonumber(ARGV[3])
// 请求数量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time = capacity/rate
// 向下取整,ttl 为填满时间 2 倍
local ttl = math.floor(fill_time*2)
// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end

// 上次请求时间戳,如果为 nil 则赋值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end

// 距离上一次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed = filled_tokens >= requested
// 被请求消耗掉之后,更新剩余 token 数量
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end

// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call("setex", KEYS[2], ttl, now)

return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
    // 生成 token 速率
    rate           int
    // 桶容量
    burst          int
    store          *redis.Redis
    // 桶 key
    tokenKey       string
    // 桶刷新时间 key
    timestampKey   string
    rescueLock     sync.Mutex
    // redis 健康标识
    redisAlive     uint32
    // redis 健康监控启动状态
    monitorStarted bool
    // 内置单机限流器
    rescueLimiter  *xrate.Limiter
}

// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。

提供了四个可调用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
    return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
    return lim.reserveN(ctx, now, n)
}

最终调用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
    // 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }

    // 执行限流脚本
    resp, err := lim.store.EvalCtx(ctx,
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    if err == redis.Nil {
        return false
    }
    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
        logx.Errorf("fail to use rate limiter: %s", err)
        return false
    }
    if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 如果有异常的话,会启动进程内限流
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

最后看一下进程内限流的启动与恢复:

func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()

    // 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动
    if lim.monitorStarted {
        return
    }

    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)

    go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 更新监控进程的状态
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // 对 redis 进行健康监测,如果 redis 服务恢复了
        // 则更新 redisAlive 标识,并退出 goroutine
        if lim.store.Ping() {
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

  • https://juejin.cn/post/7052171117116522504
  • https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

推荐阅读:

标签:return,lim,redis,rate,zero,限流,go,now
From: https://blog.51cto.com/alwaysbeta/7039956

相关文章

  • go kratos protobuf 接收动态JSON数据
    前言google.protobuf.Struct是GoogleProtocolBuffers中的一种特殊类型,用于表示动态的键值对数据。它可以存储任意类型的数据,并提供了方便的方法来访问和操作这些数据。Struct类型通常用于在不事先知道数据结构的情况下传递和处理配置、参数或其他动态数据。https://pkg.g......
  • Python基础day63Django操作session和中间件使用
    Django操作cookie#设置cookie#获取cookieset_cookie('key','value',max_age=5,expires=5)参数:●key,键●value=’’,值●max_age=None,超时时间cookie需要延续的时间(以秒为单位)如果参数是\None``,这个cookie会延续到浏览器关闭为止expires=None,超时时间(......
  • go-zero 是如何实现计数器限流的?
    原文链接:如何实现计数器限流?上一篇文章go-zero是如何做路由管理的?介绍了路由管理,这篇文章来说说限流,主要介绍计数器限流算法,具体的代码实现,我们还是来分析微服务框架go-zero的源码。在微服务架构中,一个服务可能需要频繁地与其他服务交互,而过多的请求可能导致性能下降或系......
  • 如何将 Google Ads 与 Google Analytics(分析)相关联
    将GoogleAds帐号与GoogleAnalytics(分析)媒体资源相关联,以便洞悉从首次互动到转化的完整客户周期将GoogleAds帐号与GoogleAnalytics(分析)媒体资源(包括子媒体资源和总览媒体资源)相关联,便可以洞悉完整的客户周期,从用户如何与您的营销内容互动(比如点击广告),到他们最后如何在您的......
  • 什么是迭代器,生成器,装饰器;django的信号用过吗?如何用,干过什么;什么是深拷贝,什么是浅拷贝
    什么是迭代器,生成器,装饰器;django的信号用过吗?如何用,干过什么;什么是深拷贝,什么是浅拷贝,如何使用什么是迭代器,生成器,装饰器#迭代器-迭代:一种不依赖于索引取值的方式,我们不需要关注它的位置,只要能够一个个取值,它就称之为迭代,python中就是for循环,内部调用对象.__next__()-可迭......
  • Golang - 原生go-sql-driver:出现invalid connection报错
    在使用go-sql-driver/msqyl驱动过程中,偶现invalidconnection错误,字面上看就是无效连接的意思。开始以为是数据库压力问题或是网络不好,后来发现服务器和数据库是走内网的,网络出现问题几率非常小;只是在测试服务器上跑,没多少连接,不存在压力问题。golang数据库驱动维护一个连接池,如......
  • Google C++ 风格指南记录
    最近在看谷歌的C++风格指南发现了一些有意思的知识点,遂记录下1.第六章第二小节介绍了右值引用只在定义移动构造函数与移动赋值操作时使用右值引用.不要使用 std::forward.定义:右值引用是一种只能绑定到临时对象的引用的一种,其语法与传统的引用语法相似.例如, void......
  • Mongodb Write Concern
    写关注点描述了MongoDB对独立mongod、副本集或分片集群进行写操作时请求的确认级别。在分片集群中,mongos实例将写关注点传递给分片。对于多文档事务,可以在事务级别设置写关注点,而不是在单个操作级别。不要显式地为事务中的各个写操作设置写关注点。从MongoDB4.4开始,副本集和分片集......
  • Argo CD
    ArgoCD服务什么是ArgoCD?ArgoCD是一个为Kubernetes而生的,遵循声明式GitOps理念的持续部署工具。ArgoCD可在Git存储库更改时自动同步和部署应用程序如何工作ArgoCD遵循GitOps模式,使用Git仓库作为定义所需应用程序状态的真实来源,ArgoCD支持多种Kubernet......
  • Go语言中的匿名接口
    匿名接口在Go语言中提供了一种定义接口但不给它命名的方式。使用它们有其优缺点:优点:简洁性:在你只需要在一个地方使用接口时,匿名接口可以避免创建一个新的命名接口。局部性:匿名接口定义在使用它的地方,这使得读代码的人可以立即看到所需的方法,而不必在代码的其他地方查找命名......