首页 > 其他分享 >13、从0到1实现SECS协议之优先级队列(SECS-I)

13、从0到1实现SECS协议之优先级队列(SECS-I)

时间:2023-08-27 14:35:12浏览次数:54  
标签:Priority 13 pq 优先级 items PriorityQueue item SECS func

13、从0到1实现SECS协议之优先级队列(SECS-I)

逻辑和HSMS协议中的优先级队列一样,只不过存储的数据变了而已。

1、并发安全的优先级队列

package queue

import (
	"secs-gem/common"
	"secs-gem/secs/packets"
	"secs-gem/secsgem"

	"container/heap"
	"context"
	"sync"
	"time"
)

/*---------------------------------------
		优先队列(提供并发安全接口)
---------------------------------------*/

func NewPriorityQueue() *PriorityQueue {
	q := &PriorityQueue{
		SQueueView: &secsgem.SQueueView{},
		fch:        make(chan interface{}, 1),
	}
	return q
}

type PriorityQueue struct {
	*secsgem.SQueueView
	//队列(最小堆)
	items []*QueueItem     //堆
	fch   chan interface{} //推送状态
	rmu   sync.RWMutex     //更新锁
}

// 数据项
type QueueItem struct {
	Value    *packets.SecsPacket   // value
	Priority int                   // 优先级(越小越靠前)
	JoinTime time.Time             // 加入时间
	Blocks   []*packets.SecsPacket //sendblock
	Cursor   int                   //发送索引
	index    int                   // 索引
}

/*---------------------------------------
		heap接口
---------------------------------------*/

func (pq PriorityQueue) Len() int { return len(pq.items) }

func (pq PriorityQueue) Less(i, j int) bool {
	// priority越小越靠前
	if pq.items[i].Priority != pq.items[j].Priority {
		return pq.items[i].Priority < pq.items[j].Priority
	} else {
		return pq.items[i].JoinTime.Sub(pq.items[j].JoinTime) < 0
	}
}

func (pq PriorityQueue) Swap(i, j int) {
	pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
	pq.items[i].index = i
	pq.items[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(pq.items)
	item := x.(*QueueItem)
	item.index = n
	pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old.items)
	if n == 0 {
		return nil
	}
	item := old.items[n-1]
	old.items[n-1] = nil // avoid memory leak
	item.index = -1      // for safety
	pq.items = old.items[0 : n-1]
	return item
}

// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *QueueItem, value *packets.SecsPacket, priority int) {
	item.Value = value
	item.Priority = priority
	heap.Fix(pq, item.index)
}

/*---------------------------------------
		同步接口
---------------------------------------*/

func (pq *PriorityQueue) Get(ctx context.Context, timeout time.Duration) *QueueItem {
	timer := common.GlobalTimerPool.Get(timeout)
	defer common.GlobalTimerPool.Put(timer)

	for {
		//是否已存在
		if item := pq.PopItem(); item != nil {
			return item
		}

		select {
		case <-pq.fch:
			if item := pq.PopItem(); item != nil {
				return item
			}
		case <-ctx.Done():
			return nil
		case <-timer.C:
			return nil
		}
	}
}

func (pq *PriorityQueue) PopItem() *QueueItem {
	pq.rmu.Lock()
	defer pq.rmu.Unlock()

	if pq.Len() <= 0 {
		return nil
	}
	item := heap.Pop(pq)
	if resultItem, ok := item.(*QueueItem); ok {
		return resultItem
	}
	return nil
}

func (pq *PriorityQueue) PutNoWait(item *QueueItem) {
	pq.rmu.Lock()
	heap.Push(pq, item)
	pq.rmu.Unlock()

	//通知
	pq.kickWaiter()
}

func (pq *PriorityQueue) kickWaiter() {
	//通知
	select {
	case pq.fch <- 1:
	default:
	}
}

func (pq *PriorityQueue) QSize() int {
	pq.rmu.RLock()
	size := pq.Len()
	pq.rmu.RUnlock()
	return size
}

2、测试用例

package queue

import (
	"context"
	"fmt"
	"sync"
	"testing"
	"time"
)

func TestDataSpace(t *testing.T) {
	pq := NewPriorityQueue()
	pq.items = make([]*QueueItem, 10000000)
	for i := 0; i < 10000000; i++ {
		pq.items[i] = &QueueItem{
			Priority: i,
		}
	}

	time.Sleep(time.Second * 30)
}

func TestSendQueuePutAndGet(t *testing.T) {
	/*
		测试put get
	*/
	pq := NewPriorityQueue()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 10000000; i++ {
			pq.PutNoWait(&QueueItem{
				Priority: i,
			})
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		j := 0
		for j < 10000000 {
			if item := pq.Get(context.Background(), time.Second*5); item != nil {
				if item.Priority%1000 == 0 {
					fmt.Println(item.Priority, len(pq.items))
				}
				j += 1
			}
		}
	}()
	wg.Wait()
	fmt.Println("over")
	time.Sleep(time.Second * 10)
}

func TestPriorityQueue(t *testing.T) {
	// 测试 PriorityQueue 功能

	var want [1000]int

	pq := NewPriorityQueue()
	for i := 0; i < 1000; i++ {
		i := i
		go func(x int) {
			pq.PutNoWait(&QueueItem{
				Priority: x,
			})
			want[x] = x
		}(i)
	}
	var got [1000]int
	for i := 0; i < 1000; i++ {
		qItem := pq.Get(context.Background(), time.Second*1)
		got[i] = qItem.Priority
	}

	if got != want {
		t.Fatalf("PriorityQueue test failed, want != got")
	}
}

标签:Priority,13,pq,优先级,items,PriorityQueue,item,SECS,func
From: https://www.cnblogs.com/huageyiyangdewo/p/17660259.html

相关文章

  • 学习笔记413—python实现BP神经网络进行预测和误差分析(附源代码)
    python实现BP神经网络进行预测和误差分析(附源代码)反向传播算法也称为BP神经网络,是一种带有反馈的神经网络反向学习方法,它可以对神经网络的各层上的各个神经元的各个神经元之间的连接权重进行不断迭代修改,使神经网络将输入数据转换成期望的输出数据 BP神经网络的学习过程由正向......
  • 13、声明式事务
    13、声明式事务13.1、回顾事务要么都成功,要么都失败事务在开发种十分的重要,设计到数据一致性的问题,非常的重要确保完整性和一致性事务的ACID原则原子性一致性隔离性:多个业务操作同一个资源,防止数据损坏持久性:事务一旦提交,无论系统出现什么问题,结果都不会......
  • 【web_逆向13】ob混淆及实战
    什么是ob混淆?OB混淆和变量名混淆是常用的代码混淆技术。OB混淆是指在代码中添加一些无用的代码和函数,以增加代码的复杂度和难以理解性,从而增加代码的保护性。变量名混淆是指将变量名替换为无意义的字符串,增加代码的难度和防止代码的逆向分析。ob混淆的特点它会把常规的......
  • POJ 1308 Is It A Tree?
    这是我做出来的第一道有含量的ACM题,应该好好总结一下!这道1308的题,其实很简单,只要抓住了树的特征,就可以解出来。我的解法的思想是这样的:树的分支数m和树的结点数n有一个关系:n==m+1,只要抓住了这个特征,问题便可迎刃而解! 源代码:#include<stdio.h>#include<cstring>inta,b,m=0,n=0,k=......
  • oracle学习笔记(13)——数据库的启动与关闭
    1、常用的服务(1)OracleServiceSID     数据库服务,这个服务会自动地启动和停止数据库。如果安装了一个数据库,它的缺省启动类型为自动。服务进程为ORACLE.EXE,参数文件initSID.ora,日志文件SIDALRT.log,控制台SVRMGRL.EXE、SQLPLUS.EXE。     注:SID-数据库标识 ......
  • CMU 15-213:DataLab(整数部分)
    本笔记仅仅只是用于记录,内容为提示性,题主做的不一定完全符合规范!!!!。本实验中,只有整型只能使用“+”和位运算符。后面浮点数可以用控制循环。1.异或运算直接用公式,或者像我这样利用真值表凑的/**bitXor-x^yusingonly~and&*Example:bitXor(4,5)=1*Lega......
  • 【主席树】CF813 E. Army Creation
    【主席树】CF813E.ArmyCreation题目链接:https://codeforces.com/contest/813/problem/E题意多次询问,求一个区间内,所有数个数的总和,但相同的数最多被计算k次,强制在线。题解这道题和牛客一道题很像,是那道题的加强版,链接在这:题解链接。那道题可以离线,所以离线处理+树状数组......
  • 【13.0】sqlalchemy 集成到Flask框架
    【在Flask中集成SQLAlchemy】在Flask中集成SQLAlchemy可以通过使用第三方扩展包flask-sqlalchemy来实现,以下是详细的步骤和说明:首先,需要导入SQLAlchemy类以及flask_sqlalchemy模块:fromflask_sqlalchemyimportSQLAlchemy实例化SQLAlchemy对象:db=SQLAlchemy()这个......
  • 1300亿参数,国内首个数学大模型MathGPT上线!多项基准赶超GPT-4
    前言 数学的命运齿轮从此开始转动。国内首个专为数学打造的千亿级大模型MathGPT正式上线,在多项基准测试中碾压GPT-4,刷新SOTA。本文转载自新智元仅用于学术分享,若侵权请联系删除欢迎关注公众号CV技术指南,专注于计算机视觉的技术总结、最新技术跟踪、经典论文解读、CV招聘信息。......
  • LeetCode 131. 分割回文串
    题目链接:LeetCode131.分割回文串题意:给你一个字符串s,请你将s分割成一些子串,使每个子串都是回文串。返回s所有可能的分割方案。回文串是正着读和反着读都一样的字符串。解题思路:dfs算法的过程其实就是一棵递归树,所有的dfs算法的步骤大概有以下几步:找到中止条件:即......