固定窗口
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 定义限流结构体
type RateLimiter struct {
interval time.Duration // 时间窗口
tokens int32 // 令牌总数
lastTime int64 // 上次更新时间
mu sync.Mutex // 互斥锁
}
// 创建新的限流器
func NewRateLimiter(interval time.Duration, tokens int32) *RateLimiter {
return &RateLimiter{
interval: interval,
tokens: tokens,
lastTime: time.Now().UnixNano(),
}
}
// 尝试获取令牌
func (l *RateLimiter) TryAcquire() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now().UnixNano()
// 计算当前窗口内的剩余时间
elapsed := now - l.lastTime
// 更新时间窗口,并添加新令牌
l.lastTime = now
l.tokens += elapsed / int64(l.interval)
// 判断是否有足够的令牌
if l.tokens < 1 {
return false
}
l.tokens-- // 获取令牌,减1
return true
}
func main() {
// 创建一个时间窗口为1秒,总共10个令牌的限流器
limiter := NewRateLimiter(time.Second, 10)
// 模拟50个并发请求
var wg sync.WaitGroup
wg.Add(50)
for i := 0; i < 50; i++ {
go func() {
defer wg.Done()
if limiter.TryAcquire() {
fmt.Println("请求被处理")
} else {
fmt.Println("请求被限流")
}
}()
}
wg.Wait()
}
计数器
package main
import (
"fmt"
"sync"
"time"
)
type CountLimiter struct {
rate int64 // 计数周期内运行的最大请求数
begin time.Time // 当前轮计数开始时间
count int64 // 当前计数周期累计的请求数
cycle time.Duration // 计数周期,如统计1秒内的总请求数,那么计数周期就是1秒
lock sync.Mutex // 判断能否放行时需要加锁操作
}
func NewCountLimiter(rate int64, cycle time.Duration) *CountLimiter {
return &CountLimiter{
rate: rate,
begin: time.Now(),
count: 0,
cycle: cycle,
lock: sync.Mutex{},
}
}
func (c *CountLimiter) Allow() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.count == c.rate { // 达到最大限流时,判断时间是否也超过统计周期了,超了可以重置限流器了,没有超说明还在当前统计周期内,应该拦截
if time.Now().Sub(c.begin) > c.cycle {
c.Reset()
return true
} else {
return false
}
} else { // 还没有达到最大限流数,那么不管时间周期是否超出统计计数周期,都可以放行
c.count++
return true
}
}
func (c *CountLimiter) Reset() {
c.begin = time.Now()
c.count = 0
}
func main() {
countLimiter := NewCountLimiter(3, time.Second) // 1s内不可超过3个请求
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func(i int) {
defer wg.Done()
if countLimiter.Allow() {
fmt.Println(fmt.Sprintf("当前时间%s,请求%d通过了", time.Now().String(), i))
} else {
fmt.Println(fmt.Sprintf("当前时间%s,请求%d被限流了", time.Now().String(), i))
}
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}
令牌桶
package main
import (
"fmt"
"sync"
"time"
)
type TokenBucketLimiter struct {
lock sync.Mutex
rate time.Duration // 多长时间放入一个令牌,即放入令牌的速率
capacity int64 // 令牌桶的容量,控制最多放入多少令牌,也即突发最大并发量
tokens int64 // 当前桶中已有的令牌数量
lastTime time.Time // 上次放入令牌的时间,避免开启协程定时去放入令牌,而是请求到来时懒加载的方式(now - lastTime) / rate放入令牌
}
func NewTokenBucketLimiter(rate time.Duration, capacity int64) *TokenBucketLimiter {
if capacity < 1 {
panic("token bucket capacity must be large 1")
}
return &TokenBucketLimiter{
lock: sync.Mutex{},
rate: rate,
capacity: capacity,
tokens: 0,
lastTime: time.Time{},
}
}
func (tbl *TokenBucketLimiter) Allow() bool {
tbl.lock.Lock() // 加锁避免并发错误
defer tbl.lock.Unlock()
// 如果 now 与上次请求的间隔超过了 token rate
// 则增加令牌,更新lastTime
now := time.Now()
if now.Sub(tbl.lastTime) > tbl.rate {
tbl.tokens += int64((now.Sub(tbl.lastTime)) / tbl.rate) // 放入令牌
if tbl.tokens > tbl.capacity {
tbl.tokens = tbl.capacity // 总令牌数不能大于桶的容量
}
tbl.lastTime = now // 更新上次往桶中放入令牌的时间
}
if tbl.tokens > 0 { // 令牌数是否充足
tbl.tokens -= 1
return true
}
return false // 令牌不足,拒绝请求
}
func main() {
tbl := NewTokenBucketLimiter(10, 5) // 每10ms放一个令牌,1s放100个,桶容量(最大突发流量)为5
for i := 0; i < 10; i++ {
fmt.Println(tbl.Allow()) // 模拟突发流量10个请求,超过桶容量5
}
time.Sleep(100 * time.Millisecond)
fmt.Println(tbl.Allow())
}
漏斗
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Result struct {
Msg string // 根据实际情况定义返回结果需要哪些字段
}
type Handler func() Result // 处理函数的形式也应该根据具体需要而定
type Task struct {
id int64 // 任务id
result chan Result // 任务的执行结果,即请求的响应结果
handler Handler // 请求的执行函数
}
func NewTask(id int, handler Handler) Task {
return Task{
handler: handler,
result: make(chan Result),
id: int64(id),
}
}
type LeakyBucketLimiter struct {
bucketSize int64 // 桶的大小
workerNum int64 // 工作者数量,即最大并发数
taskChan chan Task // 用于存放请求
}
func NewLeakyBucketLimiter(bucketSize, workerNum int64) *LeakyBucketLimiter {
if capacity < 1 {
panic("capacity must be large 1")
}
if workerNum < 1 {
panic("workerNum must be large 1")
}
return &LeakyBucketLimiter{
bucketSize: bucketSize,
workerNum: workerNum,
taskChan: make(chan Task, bucketSize),
}
}
func (lbl *LeakyBucketLimiter) AddTask(task Task) bool { // 类似其他限流算法的Allow方法
// 如果木桶已经满了,或者任务执行失败或超时了,返回false
select {
case lbl.taskChan <- task: // 利用了select的特性判断是否能往通道中添加任务
default:
fmt.Printf("请求%d被拒绝了\n", task.id)
return false
}
// 如果成功入桶,调用者会等待Task的Handler执行结果
// 由于Task的result是无缓冲的通道,不应该让其无限等待阻塞,否则出现问题时,不往该chan写,就会一直阻塞在这里了,泄漏
// 因此设置一个超时时间
//resp := <-task.result
//fmt.Printf("请求%d运行成功,结果为:%v\n", task.id, resp)
select {
case resp := <-task.result:
fmt.Printf("请求%d运行成功,结果为:%v\n", task.id, resp)
case <-time.After(5 * time.Second): // 超时时间可以稍微设置长一点点,因为任务放入桶中后,可能需要排队一点时间才被拉取出来执行
return false // 这里超时当被限流处理
}
return true
}
func (lbl *LeakyBucketLimiter) Start(ctx context.Context) {
// 开启workerNum个协程从木桶拉取任务执行
for i := 0; int64(i) < lbl.workerNum; i++ {
go func(ctx context.Context) {
defer func() { // 铁则:开启的子协程一定要捕获异常,否则一旦出现异常不捕获,会导致程序退出
if err := recover(); err != any(nil) {
fmt.Println("捕获到异常")
}
}()
for { // 持续监听,拉取任务执行
select {
case <-ctx.Done():
fmt.Println("退出工作")
return
default:
task := <-lbl.taskChan
result := task.handler()
task.result <- result // 处理结果写入对应Task的结果通道
}
}
}(ctx)
}
}
func main() {
bucket := NewLeakyBucketLimiter(10, 4)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bucket.Start(ctx) // 开启消费者
// 模拟20个并发请求
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 20; i++ {
go func(id int) {
defer wg.Done()
task := NewTask(id, func() Result { // 这里的func应该根据实际需要定义为handler,写具体的业务逻辑和返回的result
time.Sleep(300 * time.Millisecond) // 模拟业务逻辑消耗的时间
return Result{}
})
bucket.AddTask(task) // 请求入桶
}(i)
}
wg.Wait()
time.Sleep(10 * time.Second)
}
标签:令牌,几种,tokens,限流,int64,tbl,func,time,Go
From: https://www.cnblogs.com/qcy-blog/p/18399638