首页 > 其他分享 >Go简单实现几种常用的限流

Go简单实现几种常用的限流

时间:2024-09-06 09:35:53浏览次数:17  
标签:令牌 几种 tokens 限流 int64 tbl func time Go

固定窗口

package main
 
import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)
 
// 定义限流结构体
type RateLimiter struct {
    interval time.Duration // 时间窗口
    tokens   int32         // 令牌总数
    lastTime int64         // 上次更新时间
    mu       sync.Mutex    // 互斥锁
}
 
// 创建新的限流器
func NewRateLimiter(interval time.Duration, tokens int32) *RateLimiter {
    return &RateLimiter{
        interval: interval,
        tokens:   tokens,
        lastTime: time.Now().UnixNano(),
    }
}
 
// 尝试获取令牌
func (l *RateLimiter) TryAcquire() bool {
    l.mu.Lock()
    defer l.mu.Unlock()
 
    now := time.Now().UnixNano()
    // 计算当前窗口内的剩余时间
    elapsed := now - l.lastTime
    // 更新时间窗口,并添加新令牌
    l.lastTime = now
    l.tokens += elapsed / int64(l.interval)
 
    // 判断是否有足够的令牌
    if l.tokens < 1 {
        return false
    }
    l.tokens-- // 获取令牌,减1
    return true
}
 
func main() {
    // 创建一个时间窗口为1秒,总共10个令牌的限流器
    limiter := NewRateLimiter(time.Second, 10)
 
    // 模拟50个并发请求
    var wg sync.WaitGroup
    wg.Add(50)
    for i := 0; i < 50; i++ {
        go func() {
            defer wg.Done()
            if limiter.TryAcquire() {
                fmt.Println("请求被处理")
            } else {
                fmt.Println("请求被限流")
            }
        }()
    }
    wg.Wait()
}

计数器

package main

import (
	"fmt"
	"sync"
	"time"
)

type CountLimiter struct {
	rate  int64         // 计数周期内运行的最大请求数
	begin time.Time     // 当前轮计数开始时间
	count int64         // 当前计数周期累计的请求数
	cycle time.Duration // 计数周期,如统计1秒内的总请求数,那么计数周期就是1秒
	lock  sync.Mutex    // 判断能否放行时需要加锁操作
}

func NewCountLimiter(rate int64, cycle time.Duration) *CountLimiter {
	return &CountLimiter{
		rate:  rate,
		begin: time.Now(),
		count: 0,
		cycle: cycle,
		lock:  sync.Mutex{},
	}
}

func (c *CountLimiter) Allow() bool {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.count == c.rate { // 达到最大限流时,判断时间是否也超过统计周期了,超了可以重置限流器了,没有超说明还在当前统计周期内,应该拦截
		if time.Now().Sub(c.begin) > c.cycle {
			c.Reset()
			return true
		} else {
			return false
		}
	} else { // 还没有达到最大限流数,那么不管时间周期是否超出统计计数周期,都可以放行
		c.count++
		return true
	}
}

func (c *CountLimiter) Reset() {
	c.begin = time.Now()
	c.count = 0
}

func main() {
	countLimiter := NewCountLimiter(3, time.Second) // 1s内不可超过3个请求
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(i int) {
			defer wg.Done()
			if countLimiter.Allow() {
				fmt.Println(fmt.Sprintf("当前时间%s,请求%d通过了", time.Now().String(), i))
			} else {
				fmt.Println(fmt.Sprintf("当前时间%s,请求%d被限流了", time.Now().String(), i))
			}
		}(i)
		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}

令牌桶

package main

import (
	"fmt"
	"sync"
	"time"
)

type TokenBucketLimiter struct {
	lock     sync.Mutex
	rate     time.Duration // 多长时间放入一个令牌,即放入令牌的速率
	capacity int64         // 令牌桶的容量,控制最多放入多少令牌,也即突发最大并发量
	tokens   int64         // 当前桶中已有的令牌数量
	lastTime time.Time     // 上次放入令牌的时间,避免开启协程定时去放入令牌,而是请求到来时懒加载的方式(now - lastTime) / rate放入令牌
}

func NewTokenBucketLimiter(rate time.Duration, capacity int64) *TokenBucketLimiter {
	if capacity < 1 {
        panic("token bucket capacity must be large 1")
    }
	return &TokenBucketLimiter{
		lock:     sync.Mutex{},
		rate:     rate,
		capacity: capacity,
		tokens:   0,
		lastTime: time.Time{},
	}
}

func (tbl *TokenBucketLimiter) Allow() bool {
	tbl.lock.Lock() // 加锁避免并发错误
	defer tbl.lock.Unlock()

	// 如果 now 与上次请求的间隔超过了 token rate
	// 则增加令牌,更新lastTime
	now := time.Now()
	if now.Sub(tbl.lastTime) > tbl.rate {
		tbl.tokens += int64((now.Sub(tbl.lastTime)) / tbl.rate) // 放入令牌
		if tbl.tokens > tbl.capacity {
			tbl.tokens = tbl.capacity // 总令牌数不能大于桶的容量
		}
		tbl.lastTime = now // 更新上次往桶中放入令牌的时间
	}

	if tbl.tokens > 0 { // 令牌数是否充足
		tbl.tokens -= 1
		return true
	}

	return false // 令牌不足,拒绝请求
}

func main() {
	tbl := NewTokenBucketLimiter(10, 5) // 每10ms放一个令牌,1s放100个,桶容量(最大突发流量)为5
	for i := 0; i < 10; i++ {
		fmt.Println(tbl.Allow())  // 模拟突发流量10个请求,超过桶容量5
	}
	time.Sleep(100 * time.Millisecond)
	fmt.Println(tbl.Allow())
}

漏斗

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Result struct {
	Msg string // 根据实际情况定义返回结果需要哪些字段
}

type Handler func() Result // 处理函数的形式也应该根据具体需要而定

type Task struct {
	id      int64       // 任务id
	result  chan Result // 任务的执行结果,即请求的响应结果
	handler Handler     // 请求的执行函数
}

func NewTask(id int, handler Handler) Task {
	return Task{
		handler: handler,
		result:  make(chan Result),
		id:      int64(id),
	}
}

type LeakyBucketLimiter struct {
	bucketSize int64     // 桶的大小
	workerNum  int64     // 工作者数量,即最大并发数
	taskChan   chan Task // 用于存放请求
}

func NewLeakyBucketLimiter(bucketSize, workerNum int64) *LeakyBucketLimiter {
	if capacity < 1 {
        panic("capacity must be large 1")
    }
    if workerNum < 1 {
        panic("workerNum must be large 1")
    }
	return &LeakyBucketLimiter{
		bucketSize: bucketSize,
		workerNum:  workerNum,
		taskChan:   make(chan Task, bucketSize),
	}
}

func (lbl *LeakyBucketLimiter) AddTask(task Task) bool { // 类似其他限流算法的Allow方法
	// 如果木桶已经满了,或者任务执行失败或超时了,返回false
	select {
	case lbl.taskChan <- task:  // 利用了select的特性判断是否能往通道中添加任务
	default:
		fmt.Printf("请求%d被拒绝了\n", task.id)
		return false
	}

	// 如果成功入桶,调用者会等待Task的Handler执行结果
	// 由于Task的result是无缓冲的通道,不应该让其无限等待阻塞,否则出现问题时,不往该chan写,就会一直阻塞在这里了,泄漏
	// 因此设置一个超时时间
	//resp := <-task.result
	//fmt.Printf("请求%d运行成功,结果为:%v\n", task.id, resp)
	select {
	case resp := <-task.result:
		fmt.Printf("请求%d运行成功,结果为:%v\n", task.id, resp)
	case <-time.After(5 * time.Second): // 超时时间可以稍微设置长一点点,因为任务放入桶中后,可能需要排队一点时间才被拉取出来执行
		return false // 这里超时当被限流处理
	}

	return true
}

func (lbl *LeakyBucketLimiter) Start(ctx context.Context) {
	// 开启workerNum个协程从木桶拉取任务执行
	for i := 0; int64(i) < lbl.workerNum; i++ {
		go func(ctx context.Context) {
			defer func() { // 铁则:开启的子协程一定要捕获异常,否则一旦出现异常不捕获,会导致程序退出
				if err := recover(); err != any(nil) {
					fmt.Println("捕获到异常")
				}
			}()

			for { // 持续监听,拉取任务执行
				select {
				case <-ctx.Done():
					fmt.Println("退出工作")
					return
				default:
					task := <-lbl.taskChan
					result := task.handler()
					task.result <- result // 处理结果写入对应Task的结果通道
				}
			}
		}(ctx)
	}
}

func main() {
	bucket := NewLeakyBucketLimiter(10, 4)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	bucket.Start(ctx) // 开启消费者
	// 模拟20个并发请求
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go func(id int) {
			defer wg.Done()
			task := NewTask(id, func() Result { // 这里的func应该根据实际需要定义为handler,写具体的业务逻辑和返回的result
				time.Sleep(300 * time.Millisecond)  // 模拟业务逻辑消耗的时间
				return Result{}
			})
			bucket.AddTask(task)  // 请求入桶
		}(i)
	}
	wg.Wait()
	time.Sleep(10 * time.Second)
}

标签:令牌,几种,tokens,限流,int64,tbl,func,time,Go
From: https://www.cnblogs.com/qcy-blog/p/18399638

相关文章

  • Study Plan For Algorithms - Part22
    1.字符串相乘题目链接:https://leetcode.cn/problems/multiply-strings/给定两个以字符串形式表示的非负整数num1和num2,返回num1和num2的乘积,它们的乘积也表示为字符串形式。classSolution:defmultiply(self,num1:str,num2:str)->str:ifnum1==......
  • vue3 tsx 测试几种使用方式
    总论tsxsetup里面定义了returndom元素,则optionsapi的render函数不生效options的render函数生效前提是setup里面不能returndomoptions的render里面可以直接使用this访问setup里面的数据或者ctxtsx一般最好用defineComponent包裹,这样响应式才能生效tsxdom语法使用{}......
  • Go - Web Application 8
    Userauthentication  Openupyourhandlers.gofileandaddplaceholdersforthefive newhandlerfunctionsasfollows:func(app*application)userSignup(whttp.ResponseWriter,r*http.Request){fmt.Fprintln(w,"Displayaformforsigningu......
  • 字符串拼接的几种形式
    字符串拼接的几种形式##一.算术运算符1.//+-*/%(取余)2.     intnum=10+10;//20      intnum1=10-10;//0      intnum2=10*10;//100      intnum3=10/10;//1      intnum4=10%......
  • Go - Web Application 7
    Thehttp.ServerstructAlthoughhttp.ListenAndServe()isveryusefulinshortexamplesandtutorials,inreal-worldapplicationsit’smorecommontomanuallycreateanduseahttp.Serverstruct instead.Doingthisopensuptheopportunitytocustomizethe......
  • golang实现ip地址扫描
    Golang实现IP地址扫描原创 GoOfficialBlog GoOfficialBlog  2024年09月05日18:13 中国香港 听全文你是否想过哪些设备连接到了家里的Wi-Fi网络?无论是出于安全目的还是单纯的好奇心,我们都可以去了解一下家庭网络中的设备情况。在本文中,我们将介绍如何使用......
  • 基于django+vue羽毛球俱乐部管理系统设计与实现【开题报告+程序+论文】-计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着全民健身热潮的兴起,羽毛球作为一项低门槛、高趣味性的运动,深受广大运动爱好者的喜爱。羽毛球俱乐部的数量迅速增长,为满足会员的多元化......
  • 基于django+vue与spring的药品销售管理系统设计与实现【开题报告+程序+论文】计算机毕
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着医药行业的快速发展与市场竞争的日益激烈,药品销售管理成为医药企业提升运营效率、保障药品质量、优化客户服务的关键环节。传统的手工......