参考
https://gist.github.com/vitan/aedb628a40478cf8b6a33dc87a5ff52f
https://gist.github.com/mochow13/74ee57078d58536929575ab481dd9693
1
package main
import (
"errors"
"fmt"
"math"
"reflect"
"sync"
)
const (
ITEM_COUNT = 5
EMPTY_VAL = math.MaxInt64
ERROR_QUEUE_CLOSED = "error-closed-queue"
)
// Priority Queue Implement
type PriorityQueue struct {
queues []chan int
capacity int
opening_q_counts int
mutex *sync.Mutex
}
func (pQ *PriorityQueue) NewPriorityQueue(prioritys int, capacity int) *PriorityQueue {
pQ.queues = []chan int{}
pQ.capacity = capacity
pQ.opening_q_counts = prioritys
pQ.mutex = &sync.Mutex{}
for i := 0; i < prioritys; i++ {
pQ.queues = append(pQ.queues, make(chan int, capacity))
}
return pQ
}
func (pQ *PriorityQueue) Enqueue(priority int, val int) error {
if priority >= len(pQ.queues) || priority < 0 {
return errors.New("out of index")
}
idx := len(pQ.queues) - priority - 1
pQ.queues[idx] <- val
return nil
}
func (pQ *PriorityQueue) Dequeue() (int, error) {
cases := make([]reflect.SelectCase, len(pQ.queues))
for i, q := range pQ.queues {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(q)}
}
for pQ.opening_q_counts > 0 {
chosen, value, ok := reflect.Select(cases)
if !ok {
cases[chosen].Chan = reflect.ValueOf(nil)
pQ.mutex.Lock()
pQ.opening_q_counts -= 1
pQ.mutex.Unlock()
} else {
return int(value.Int()), nil
}
}
return EMPTY_VAL, errors.New(ERROR_QUEUE_CLOSED)
}
// Producer&Consumer avatar
func producer(wg *sync.WaitGroup, priority int, pQ *PriorityQueue) {
defer wg.Done()
for i := 0; i < ITEM_COUNT; i++ {
//(wtzhou) Q: why priority*10+i?
// A: make consumer output readable. change me if needed
value := priority*10 + i
if err := pQ.Enqueue(priority, value); err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
}
fmt.Printf("Produced item: %d on priority %d\n", i, priority)
}
}
func consumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
defer wg.Done()
for {
val, err := pQ.Dequeue()
if err != nil {
if err.Error() == ERROR_QUEUE_CLOSED {
break
}
} else {
fmt.Printf("Dequeue value: %d\n", val)
}
}
}
// Sample: produce some value to different priority
func SpawnProducer(wg *sync.WaitGroup, pQ *PriorityQueue) {
for i := 0; i < 8; i++ {
wg.Add(1)
go producer(wg, i, pQ)
}
}
// Sample: consume some value
func SpawnConsumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
wg.Add(1)
go consumer(wg, pQ)
wg.Add(1)
go consumer(wg, pQ)
}
func main() {
fmt.Println("Starting Producer, Consumer")
pQ := &PriorityQueue{}
pQ = pQ.NewPriorityQueue(10, 10)
producer_wg := &sync.WaitGroup{}
SpawnProducer(producer_wg, pQ)
producer_wg.Wait()
// close all the queue
for _, q := range pQ.queues {
close(q)
}
consumer_wg := &sync.WaitGroup{}
SpawnConsumer(consumer_wg, pQ)
consumer_wg.Wait()
fmt.Println("Exited successfully")
}
2
package main
import (
"fmt"
"sync"
)
var messages = [][]string{
{
"The world itself's",
"just one big hoax.",
"Spamming each other with our",
"running commentary of bullshit,",
},
{
"but with our things, our property, our money.",
"I'm not saying anything new.",
"We all know why we do this,",
"not because Hunger Games",
"books make us happy,",
},
{
"masquerading as insight, our social media",
"faking as intimacy.",
"Or is it that we voted for this?",
"Not with our rigged elections,",
},
{
"but because we wanna be sedated.",
"Because it's painful not to pretend,",
"because we're cowards.",
"- Elliot Alderson",
"Mr. Robot",
},
}
const producerCount int = 4
const consumerCount int = 3
func produce(link chan<- string, id int, wg *sync.WaitGroup) {
defer wg.Done()
for _, msg := range messages[id] {
link <- msg
}
}
func consume(link <-chan string, id int, wg *sync.WaitGroup) {
defer wg.Done()
for msg := range link {
fmt.Printf("Message \"%v\" is consumed by consumer %v\n", msg, id)
}
}
func main() {
link := make(chan string)
wp := &sync.WaitGroup{}
wc := &sync.WaitGroup{}
wp.Add(producerCount)
wc.Add(consumerCount)
for i := 0; i < producerCount; i++ {
go produce(link, i, wp)
}
for i := 0; i < consumerCount; i++ {
go consume(link, i, wc)
}
wp.Wait()
close(link)
wc.Wait()
}
标签:wg,priority,pQ,消费者,生产者,sync,golang,int,PriorityQueue
From: https://www.cnblogs.com/liujitao79/p/17729334.html