13、从0到1实现SECS协议之优先级队列(SECS-I)
逻辑和HSMS协议中的优先级队列一样,只不过存储的数据变了而已。
1、并发安全的优先级队列
package queue
import (
"secs-gem/common"
"secs-gem/secs/packets"
"secs-gem/secsgem"
"container/heap"
"context"
"sync"
"time"
)
/*---------------------------------------
优先队列(提供并发安全接口)
---------------------------------------*/
func NewPriorityQueue() *PriorityQueue {
q := &PriorityQueue{
SQueueView: &secsgem.SQueueView{},
fch: make(chan interface{}, 1),
}
return q
}
type PriorityQueue struct {
*secsgem.SQueueView
//队列(最小堆)
items []*QueueItem //堆
fch chan interface{} //推送状态
rmu sync.RWMutex //更新锁
}
// 数据项
type QueueItem struct {
Value *packets.SecsPacket // value
Priority int // 优先级(越小越靠前)
JoinTime time.Time // 加入时间
Blocks []*packets.SecsPacket //sendblock
Cursor int //发送索引
index int // 索引
}
/*---------------------------------------
heap接口
---------------------------------------*/
func (pq PriorityQueue) Len() int { return len(pq.items) }
func (pq PriorityQueue) Less(i, j int) bool {
// priority越小越靠前
if pq.items[i].Priority != pq.items[j].Priority {
return pq.items[i].Priority < pq.items[j].Priority
} else {
return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) < 0
}
}
func (pq PriorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].index = i
pq.items[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(pq.items)
item := x.(*QueueItem)
item.index = n
pq.items = append(pq.items, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old.items)
if n == 0 {
return nil
}
item := old.items[n-1]
old.items[n-1] = nil // avoid memory leak
item.index = -1 // for safety
pq.items = old.items[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.SecsPacket, priority int) {
item.Value = value
item.Priority = priority
heap.Fix(pq, item.index)
}
/*---------------------------------------
同步接口
---------------------------------------*/
func (pq *PriorityQueue) Get(ctx context.Context, timeout time.Duration) *QueueItem {
timer := common.GlobalTimerPool.Get(timeout)
defer common.GlobalTimerPool.Put(timer)
for {
//是否已存在
if item := pq.PopItem(); item != nil {
return item
}
select {
case <-pq.fch:
if item := pq.PopItem(); item != nil {
return item
}
case <-ctx.Done():
return nil
case <-timer.C:
return nil
}
}
}
func (pq *PriorityQueue) PopItem() *QueueItem {
pq.rmu.Lock()
defer pq.rmu.Unlock()
if pq.Len() <= 0 {
return nil
}
item := heap.Pop(pq)
if resultItem, ok := item.(*QueueItem); ok {
return resultItem
}
return nil
}
func (pq *PriorityQueue) PutNoWait(item *QueueItem) {
pq.rmu.Lock()
heap.Push(pq, item)
pq.rmu.Unlock()
//通知
pq.kickWaiter()
}
func (pq *PriorityQueue) kickWaiter() {
//通知
select {
case pq.fch <- 1:
default:
}
}
func (pq *PriorityQueue) QSize() int {
pq.rmu.RLock()
size := pq.Len()
pq.rmu.RUnlock()
return size
}
2、测试用例
package queue
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
func TestDataSpace(t *testing.T) {
pq := NewPriorityQueue()
pq.items = make([]*QueueItem, 10000000)
for i := 0; i < 10000000; i++ {
pq.items[i] = &QueueItem{
Priority: i,
}
}
time.Sleep(time.Second * 30)
}
func TestSendQueuePutAndGet(t *testing.T) {
/*
测试put get
*/
pq := NewPriorityQueue()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10000000; i++ {
pq.PutNoWait(&QueueItem{
Priority: i,
})
}
}()
wg.Add(1)
go func() {
defer wg.Done()
j := 0
for j < 10000000 {
if item := pq.Get(context.Background(), time.Second*5); item != nil {
if item.Priority%1000 == 0 {
fmt.Println(item.Priority, len(pq.items))
}
j += 1
}
}
}()
wg.Wait()
fmt.Println("over")
time.Sleep(time.Second * 10)
}
func TestPriorityQueue(t *testing.T) {
// 测试 PriorityQueue 功能
var want [1000]int
pq := NewPriorityQueue()
for i := 0; i < 1000; i++ {
i := i
go func(x int) {
pq.PutNoWait(&QueueItem{
Priority: x,
})
want[x] = x
}(i)
}
var got [1000]int
for i := 0; i < 1000; i++ {
qItem := pq.Get(context.Background(), time.Second*1)
got[i] = qItem.Priority
}
if got != want {
t.Fatalf("PriorityQueue test failed, want != got")
}
}
标签:Priority,13,pq,优先级,items,PriorityQueue,item,SECS,func
From: https://www.cnblogs.com/huageyiyangdewo/p/17660259.html