首页 > 其他分享 >golang 多生产者+多消费者模式

golang 多生产者+多消费者模式

时间:2023-09-26 09:11:07浏览次数:54  
标签:wg priority pQ 消费者 生产者 sync golang int PriorityQueue

参考

https://gist.github.com/vitan/aedb628a40478cf8b6a33dc87a5ff52f
https://gist.github.com/mochow13/74ee57078d58536929575ab481dd9693

1

package main

import (
	"errors"
	"fmt"
	"math"
	"reflect"
	"sync"
)

const (
	ITEM_COUNT         = 5
	EMPTY_VAL          = math.MaxInt64
	ERROR_QUEUE_CLOSED = "error-closed-queue"
)

// Priority Queue Implement
type PriorityQueue struct {
	queues           []chan int
	capacity         int
	opening_q_counts int
	mutex            *sync.Mutex
}

func (pQ *PriorityQueue) NewPriorityQueue(prioritys int, capacity int) *PriorityQueue {
	pQ.queues = []chan int{}
	pQ.capacity = capacity
	pQ.opening_q_counts = prioritys
	pQ.mutex = &sync.Mutex{}
	for i := 0; i < prioritys; i++ {
		pQ.queues = append(pQ.queues, make(chan int, capacity))
	}
	return pQ
}

func (pQ *PriorityQueue) Enqueue(priority int, val int) error {
	if priority >= len(pQ.queues) || priority < 0 {
		return errors.New("out of index")
	}
	idx := len(pQ.queues) - priority - 1
	pQ.queues[idx] <- val
	return nil
}

func (pQ *PriorityQueue) Dequeue() (int, error) {
	cases := make([]reflect.SelectCase, len(pQ.queues))
	for i, q := range pQ.queues {
		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(q)}
	}
	for pQ.opening_q_counts > 0 {
		chosen, value, ok := reflect.Select(cases)
		if !ok {
			cases[chosen].Chan = reflect.ValueOf(nil)
			pQ.mutex.Lock()
			pQ.opening_q_counts -= 1
			pQ.mutex.Unlock()
		} else {
			return int(value.Int()), nil
		}
	}
	return EMPTY_VAL, errors.New(ERROR_QUEUE_CLOSED)
}

// Producer&Consumer avatar
func producer(wg *sync.WaitGroup, priority int, pQ *PriorityQueue) {
	defer wg.Done()
	for i := 0; i < ITEM_COUNT; i++ {
		//(wtzhou) Q: why priority*10+i?
		// A: make consumer output readable. change me if needed
		value := priority*10 + i

		if err := pQ.Enqueue(priority, value); err != nil {
			fmt.Printf("ERROR: %s\n", err.Error())
		}
		fmt.Printf("Produced item: %d on priority %d\n", i, priority)
	}
}

func consumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	defer wg.Done()
	for {
		val, err := pQ.Dequeue()
		if err != nil {
			if err.Error() == ERROR_QUEUE_CLOSED {
				break
			}
		} else {
			fmt.Printf("Dequeue value: %d\n", val)
		}
	}
}

// Sample: produce some value to different priority
func SpawnProducer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	for i := 0; i < 8; i++ {
		wg.Add(1)
		go producer(wg, i, pQ)
	}
}

// Sample: consume some value
func SpawnConsumer(wg *sync.WaitGroup, pQ *PriorityQueue) {
	wg.Add(1)
	go consumer(wg, pQ)
	wg.Add(1)
	go consumer(wg, pQ)
}

func main() {
	fmt.Println("Starting Producer, Consumer")

	pQ := &PriorityQueue{}
	pQ = pQ.NewPriorityQueue(10, 10)

	producer_wg := &sync.WaitGroup{}
	SpawnProducer(producer_wg, pQ)

	producer_wg.Wait()
	// close all the queue
	for _, q := range pQ.queues {
		close(q)
	}

	consumer_wg := &sync.WaitGroup{}
	SpawnConsumer(consumer_wg, pQ)
	consumer_wg.Wait()

	fmt.Println("Exited successfully")
}

2

package main

import (
	"fmt"
	"sync"
)

var messages = [][]string{
	{
		"The world itself's",
		"just one big hoax.",
		"Spamming each other with our",
		"running commentary of bullshit,",
	},
	{
		"but with our things, our property, our money.",
		"I'm not saying anything new.",
		"We all know why we do this,",
		"not because Hunger Games",
		"books make us happy,",
	},
	{
		"masquerading as insight, our social media",
		"faking as intimacy.",
		"Or is it that we voted for this?",
		"Not with our rigged elections,",
	},
	{
		"but because we wanna be sedated.",
		"Because it's painful not to pretend,",
		"because we're cowards.",
		"- Elliot Alderson",
		"Mr. Robot",
	},
}

const producerCount int = 4
const consumerCount int = 3

func produce(link chan<- string, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for _, msg := range messages[id] {
		link <- msg
	}
}

func consume(link <-chan string, id int, wg *sync.WaitGroup) {
	defer wg.Done()
	for msg := range link {
		fmt.Printf("Message \"%v\" is consumed by consumer %v\n", msg, id)
	}
}

func main() {
	link := make(chan string)
	wp := &sync.WaitGroup{}
	wc := &sync.WaitGroup{}

	wp.Add(producerCount)
	wc.Add(consumerCount)

	for i := 0; i < producerCount; i++ {
		go produce(link, i, wp)
	}

	for i := 0; i < consumerCount; i++ {
		go consume(link, i, wc)
	}

	wp.Wait()
	close(link)
	wc.Wait()
}

标签:wg,priority,pQ,消费者,生产者,sync,golang,int,PriorityQueue
From: https://www.cnblogs.com/liujitao79/p/17729334.html

相关文章

  • Golang 的骚操作:go:linkname
    背景在看源码时,一些源码方法没有方法体,难道说明这些方法为空?例如:time.Now调用的now(),time.Sleep,reflect.makechan//Providedbypackageruntime.funcnow()(secint64,nsecint32,monoint64)funcSleep(dDuration)funcmakechan(typ*rtype,sizeint)(ch......
  • golang 1.18 workspace mode
    why?为什么需要workspace历史发展和版本依赖的管理GOPATH最开始的模式开发者需要设置一个环境变量GOPATH,用于指定项目的工作空间。GOPATH是一个目录路径,其中包括了三个重要的子目录:src、bin和pkg通过goget命令,GOPATH/src下的相应目录中缺点:必须指定GOPATH......
  • golang 的循环导入
    内容来自对chatgpt的咨询循环导入概念在Go语言中,循环导入是一个需要避免的问题。它发生在两个或更多的包彼此导入对方,形成一个导入循环,导致编译器无法处理。例如,假设你有两个包,包A和包B。包A导入了包B,然后包B又导入了包A,这就形成了一个循环导入。在这种情况下,编译器将无法......
  • golang 怎么使用接口中声明的方法
    假设你有一个golang的interface,里面声明了1个函数,怎么调用这个函数typeManinterface{ Eat(ctxcontext.Context,foodstring})error}使用方式创建结构体实现这个接口声明的方法,然后创建该结构体对象,调用方法案例首先,我们需要创建一个实现了这个接口的结构体:type......
  • golang 使用redis设置分布式锁 demo
    内容来自对chatgpt的咨询分布式锁是在多个节点上运行的应用程序中协调工作的一种常用方法,而Redis是实现分布式锁的流行选择。以下是使用Go语言和github.com/go-redis/redis库来设置Redis分布式锁的一个简单示例:首先,确保你已经安装了该库:goget-ugithub.com/go-redis/redi......
  • golang 对字符串进行base64编解码、md5 编码
    内容来自对chagpt的咨询一、对字符串进行base64编解码base64编码要在Go语言中对字符串进行base64编码,你可以使用标准库中的encoding/base64包。以下是一个简单的示例:packagemainimport( "encoding/base64" "fmt")funcmain(){ data:="Hello,World!" enc......
  • golang 把内容写到 csv 文件或者 xlsx 文件里
    内容来自对chatgpt的咨询csv格式csv格式的文件使用wps或者office打开后是一个excel表格的形式,很容易看到表格里的数据。csv格式跟markdown格式有点像,只需要按照固定的语法放置文本,保存后,用对应的渲染软件打开,就能得到想要的效果。比如下面这段json,用文本编辑器......
  • 从一个golang 员工emp数组中,找到其中name相同的元素,把结果放到一个新数组里,代码实现
    内容来自对chatgpt的咨询为了找到具有相同名称的员工,并将结果放入一个新的数组中,我们可以首先使用一个映射(map)来存储每个名称及其出现的次数。然后,我们可以遍历原始数组并使用映射来判断是否有重复的名称。以下是一个示例代码,演示如何实现这一目标:packagemainimport( ......
  • Kafka消息生产者拦截器配置最佳实践
    介绍Kafka是一个分布式的消息队列系统,它具有高吞吐量、可扩展性、容错性等优点。在Kafka中,消息生产者可以通过拦截器(interceptor)来对消息进行预处理,例如添加额外的信息、修改消息内容等。本文将深入探讨Kafka消息生产者拦截器配置的最佳实践。拦截器配置在Kafka中,消息生产者可以......
  • Docker - ERROR: failed to solve: golang:latest: error getting credentials - err:
    Dockerfile:FROMgolang:latestWORKDIR/appADD..RUNgoenv-wGOPROXY=https://goproxy.io,directRUNgogetRUNgobuild-oapp.CMD["/app/app"] zzh@ZZHPC:/zdata/MyPrograms/Go/aaa$dockerbuild-ttest:v1.[+]Building1.3s(3/3)FINI......