1.需求
mysql执行时间超过100ms以上打warn日志,但是一分钟以内这种warn日志超过10条就需要告警。
所以需求就是获得一分钟以内mysql的warn的个数。
2.分析
为什么使用环形队列而不使用slice?
因为队列长度固定,所以可以一开始就分配好空间,不用自动扩容,环形的目的就是不用改变数组的值,只用移动一下当前队列下标即可。
3.代码实现
package _queue
import (
"support/util"
"sync"
"time"
)
type TimestampedValue[T any] struct {
Value T
Timestamp int64
}
type CircularQueue[T any] struct {
items []TimestampedValue[T]
start int
end int // end points to the next empty slot
len int
capacity int
expire time.Duration
mu sync.Mutex
}
func NewCircularQueue[T any](capacity int, expire time.Duration) *CircularQueue[T] {
return &CircularQueue[T]{
items: make([]TimestampedValue[T], capacity),
capacity: capacity,
expire: expire,
}
}
func (q *CircularQueue[T]) Push(item T) []T {
q.mu.Lock()
defer q.mu.Unlock()
// Remove values that are older than expireTime
currentTime := util.NowMs()
ts := currentTime - q.expire.Milliseconds()
for q.len > 0 && ts > q.items[q.start].Timestamp {
q.start = (q.start + 1) % q.capacity
q.len--
}
newItem := TimestampedValue[T]{
Value: item,
Timestamp: currentTime,
}
// If the queue is not full, insert the value
if q.len < q.capacity {
q.items[q.end] = newItem
q.end = (q.end + 1) % q.capacity
q.len++
} else {
// When the queue is full, we override the oldest value
q.items[q.start] = newItem
q.start = (q.start + 1) % q.capacity
q.end = (q.end + 1) % q.capacity
}
// queue is full
if q.len == q.capacity {
allValues := make([]T, q.len)
for i := 0; i < q.len; i++ {
index := (q.start + i) % q.capacity
allValues[i] = q.items[index].Value
}
q.clear()
return allValues
}
return nil
}
func (q *CircularQueue[T]) clear() {
// Reset the CircularQueue
q.start = 0
q.end = 0
q.len = 0
}
用泛型的目的是,这个带失效的队列存什么都可以,而且用户不用关注时间这个字段,只用这道这是个带失效的环形队列即可。
加锁的目的是push的时候多线程并发的问题。
每次push的时候先清除expire的数据,此处并不是真正清除,只是改变的队列的长度,以及指针下标。
插入新元素的时候,如果队列不满,直接插入,如果满了,需要把最早的数据给替换掉
该push函数,如果队列不满,返回nil,如果满了返回这n个队列的value值,如果有自己的其他需求可以对应修改
clear也并不需要把元素给清除掉只需要让len=0,以及start = end即可。这里都写0,方便代码维护
4.单元测试
package _queue
import (
"fmt"
"testing"
"time"
)
func TestCircularQueue(t *testing.T) {
queue := NewCircularQueue[string](5, time.Second*10)
// Example usage of the queue
queue.Push("1")
queue.Push("2")
//time.Sleep(3 * time.Second)
Print(queue)
queue.Push("3")
queue.Push("4")
//time.Sleep(3 * time.Second)
Print(queue)
v := queue.Push("5")
fmt.Println("Value returned:", v)
queue.Push("6")
//time.Sleep(3 * time.Second)
Print(queue)
queue.Push("7")
v = queue.Push("8")
fmt.Println("Value returned:", v)
time.Sleep(3 * time.Second)
Print(queue)
queue.Push("9")
Print(queue)
}
func Print(q *CircularQueue[string]) {
q.mu.Lock()
defer q.mu.Unlock()
fmt.Println("Queue contents:")
index := q.start
for i := 0; i < q.len; i++ {
item := q.items[index]
fmt.Printf("Index %d: %v (stored at %v)\n", index, item.Value, item.Timestamp)
index = (index + 1) % q.capacity
}
}
5.实际应用
初始化环形队列的长度,以及时效
package mysql
import (
"context"
"fmt"
"strings"
"support/collection/_queue"
"time"
)
const maxWarnLogLen = 10
const expireTime = time.Minute
var warnLogList = _queue.NewCircularQueue[string](maxWarnLogLen, expireTime)
func logPrint(l *PlasoMysqlLogger, v []string) {
var builder strings.Builder
title := fmt.Sprintf("the recent %d slow sql is ", maxWarnLogLen)
builder.WriteString(title)
for i := 0; i < len(v); i++ {
builder.WriteString(" ############## ")
builder.WriteString(v[i])
}
l.Error(context.Background(), builder.String())
}
push以及打印
该功能主要给慢sql使用的,后面会有文章来讲解gorm,sqlx等如何实现慢sql自定义功能,
如果该篇文章给你带来了帮助,请给辛苦的小编点个赞吧,码字不易,且行且珍惜
标签:queue,capacity,时效,len,golang,start,详解,time,Push From: https://blog.51cto.com/u_12040959/9638912