package main import ( "fmt" "time" ) /* 有关Task任务相关定义及操作 */ //定义任务Task类型,每一个任务Task都可以抽象成一个函数 type Task struct { f func() error //一个无参的函数类型 } //通过NewTask来创建一个Task func NewTask(f func() error) *Task { t := Task{ f: f, } return &t } //执行Task任务的方法 func (t *Task) Execute() { t.f() //调用任务所绑定的函数 } /* 有关协程池的定义及操作 */ //定义池类型 type Pool struct { EntryChannel chan *Task //对外接收Task的入口 worker_num int //协程池最大worker数量,限定Goroutine的个数 JobsChannel chan *Task //协程池内部的任务就绪队列 } //创建一个协程池 func NewPool(cap int) *Pool { p := Pool{ EntryChannel: make(chan *Task), worker_num: cap, JobsChannel: make(chan *Task), } return &p } //协程池创建一个worker并且开始工作 func (p *Pool) worker(work_ID int) { //worker不断的从JobsChannel内部任务队列中拿任务 for task := range p.JobsChannel { //如果拿到任务,则执行task任务 task.Execute() fmt.Println("worker ID ", work_ID, " 执行完毕任务") } } //让协程池Pool开始工作 func (p *Pool) Run() { //1,首先根据协程池的worker数量限定,开启固定数量的Worker, // 每一个Worker用一个Goroutine承载 for i := 0; i < p.worker_num; i++ { fmt.Println("开启固定数量的Worker:", i) go p.worker(i) } //2, 从EntryChannel协程池入口取外界传递过来的任务 // 并且将任务送进JobsChannel中 for task := range p.EntryChannel { p.JobsChannel <- task } //3, 执行完毕需要关闭JobsChannel close(p.JobsChannel) fmt.Println("执行完毕需要关闭JobsChannel") //4, 执行完毕需要关闭EntryChannel close(p.EntryChannel) fmt.Println("执行完毕需要关闭EntryChannel") } //主函数 func main() { //创建一个Task t := NewTask(func() error { fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05")) return nil }) //创建一个协程池,最大开启3个协程worker p := NewPool(3) //开一个协程 不断的向 Pool 输送打印一条时间的task任务 go func() { for { p.EntryChannel <- t } }() //启动协程池p p.Run() }
原文链接:https://blog.csdn.net/finghting321/article/details/106492915/
标签:Task,worker,协程池,任务,Pool,func,简单,连接池 From: https://www.cnblogs.com/itsuibi/p/17030508.html