package main
import (
"fmt"
"github.com/rfyiamcool/go-timewheel"
"sync"
"sync/atomic"
"time"
)
var (
counter int64 = 0
loopNum = 50
tw = newTimeWheel()
wg1 = sync.WaitGroup{}
log = new(logger)
)
func incrCounter() {
atomic.AddInt64(&counter, 1)
}
func main() {
go printCounter()
batchRun5s()
log.info("add finish")
wg1.Wait()
}
func batchRun5s() {
worker := 10000
delay := 5
beforeDiff := 1
afterDiff := 2
processTimer(worker, delay, beforeDiff, afterDiff)
loop := 5
taskNum := 50
go processCallbackLoop(loop, taskNum, delay, beforeDiff, afterDiff)
}
func newTimeWheel() *timewheel.TimeWheel {
tw, err := timewheel.NewTimeWheel(1*time.Second, 120)
if err != nil {
panic(err)
}
tw.Start()
return tw
}
func processTimer(worker, delay, beforeDiff, afterDiff int) {
for index := 0; index < worker; index++ {
wg1.Add(1)
var (
htimer = tw.NewTimer(time.Duration(delay) * time.Second)
incr = 0
)
go func(idx int) {
defer wg1.Done()
for incr < loopNum {
now := time.Now()
target := now.Add(time.Duration(delay) * time.Second)
select {
case <-htimer.C:
htimer.Reset(time.Duration(delay) * time.Second)
end := time.Now()
if end.Before(target.Add(time.Duration(-beforeDiff) * time.Second)) {
log.error("timer befer: %s, delay: %d", time.Since(now).String(), delay)
}
if end.After(target.Add(time.Duration(afterDiff) * time.Second)) {
log.error("timer after: %s, delay: %d", time.Since(now).String(), delay)
}
incrCounter()
// fmt.Println("id: ", idx, "cost: ", end.Sub(now))
}
incr++
}
}(index)
}
}
func processCallbackLoop(loop, taskCount, delay, beforeDiff, afterDiff int) {
wg1.Add(1)
for index := 0; index < loop; index++ {
processCallback(taskCount, delay, beforeDiff, afterDiff)
time.Sleep(time.Duration(delay) * time.Second)
}
wg1.Done()
}
func processCallback(taskCount, delay, beforeDiff, afterDiff int) {
for index := 0; index < taskCount; index++ {
now := time.Now()
cb := func() {
target := now.Add(time.Duration(delay) * time.Second)
end := time.Now()
if end.Before(target.Add(time.Duration(-beforeDiff) * time.Second)) {
log.error("cb befer: %s, delay: %d", time.Since(now).String(), delay)
}
if end.After(target.Add(time.Duration(afterDiff) * time.Second)) {
log.error("cb after: %s, delay: %d", time.Since(now).String(), delay)
}
incrCounter()
// log.info("cost: %s, delay: %d", end.Sub(now), delay)
}
tw.Add(time.Duration(delay)*time.Second, cb)
}
}
func printCounter() {
now := time.Now()
for {
n := atomic.LoadInt64(&counter)
log.info("start_time: %s, since_time: %s, counter %d", now.String(), time.Since(now).String(), n)
time.Sleep(10 * time.Second)
}
}
type logger struct{}
func (l *logger) info(format string, args ...interface{}) {
v := fmt.Sprintf(format, args...)
fmt.Println(v)
}
func (l *logger) error(format string, args ...interface{}) {
v := fmt.Sprintf(format, args...)
color := fmt.Sprintf("%c[%d;%d;%dm %s %c[0m", 0x1B, 5, 40, 31, v, 0x1B)
fmt.Println(color)
}
标签:afterDiff,tw,delay,beforeDiff,时间,func,time,Go
From: https://www.cnblogs.com/qcy-blog/p/18451858