标签:WorkerPool Worker JobQueue Golang 并发 Job 线程
转,原文: https://lk668.github.io/2021/03/22/2021-03-22-Golang%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E7%99%BE%E4%B8%87%E7%BA%A7%E9%AB%98%E5%B9%B6%E5%8F%91/
--------------
Golang线程池实现百万级高并发
2021-03-22
本文基于Golang实现线程池,从而可以达到百万级别的高并发请求。本文实现的代码在https://github.com/lk668/threadpool可见。
1. Golang并发简介
Golang原生的goroutine可以很轻松实现并发编程。Go语言的并发是基于用户态的并发,这种并发方式就变得非常轻量,能够轻松运行几万并发逻辑。Go 的并发属于 CSP 并发模型的一种实现,CSP 并发模型的核心概念是:不要通过共享内存来通信,而应该通过通信来共享内存。
2. 并发方案演进
2.1 直接使用goroutine
直接使用goroutine启动一个新的线程来进行任务的执行
2.2 缓冲队列
利用channel实现一个缓冲队列,每次请求先放入缓冲队列,然后从缓冲队列读取数据,一次执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
type Job interface{ Run() }
// 长度为1000的缓冲队列 jobQueue := make(chan Job, 1000)
// 启动一个协程来读取缓冲队列的数据 go func(){ for { select { case job := <- jobQueue: job.Run() } } }()
// 请求发送 job := Job{} jobQueue <- job
|
该方案在请求量低于缓冲队列长度时,可以应对并发请求。但是当并发请求量大于缓冲队列长度时,channel会出现阻塞情况。
2.3 线程池实现百万级高并发
更好的实现方案是利用job队列+线程池来实现,具体如下所示:
有个全局JobQueue,用来存储需要执行的Job,有个WorkerPool的线程池,用来存储空闲的Worker。当JobQueue中有Job时,从JobQueue获取Job对象,从WorkerPool获取空闲Worker,将Job对象发送给Worker,进行执行。每个Worker都是一个独立的Goroutine。从而真正意义上实现高并发。
代码实现主要分为三部分
2.3.1 Job定义
Job是一个interface,其下有个函数RunTask,用户定制化的任务,需要实现RunTask函数。JobChan是一个Job channel结构。Job是高并发需要执行的任务
1 2 3 4 5
|
type Job interface { RunTask(request interface{}) }
type JobChan chan Job
|
2.3.2 Worker定义
Worker就是高并发里面的一个线程,启动的时候是一个Goroutine。Worker结构一需要一个JobChan,用来接收从全局JobQueue里面获取的Job对象。有个Quit的channel,用来接收退出信号。需要一个Start函数,将自己注册到WorkerPool,然后监听Job,有Job传入时,处理Job的RunTask,处理完成之后,重新将自己添加回WorkerPool。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
// Worker结构体 // WorkerPool随机选取一个Worker,将Job发送给Worker去执行 type Worker struct { // 不需要带缓冲的任务队列 JobQueue JobChan //退出标志 Quit chan bool }
// 创建一个新的Worker对象 func NewWorker() Worker { return Worker{ make(JobChan), make(chan bool), } }
// 启动一个Worker,来监听Job事件 // 执行完任务,需要将自己重新发送到WorkerPool func (w Worker) Start(workerPool *WorkerPool) { // 需要启动一个新的协程,从而不会阻塞 go func() { for { // 将worker注册到线程池 workerPool.WorkerQueue <- &w select { case job := <-w.JobQueue: job.RunTask(nil) // 终止当前worker case <-w.Quit: return } } }() }
|
2.3.3 WorkerPool定义
WorkerPool是一个线程池,用来存储Worker。所以需要一个Size变量,用来表示这个Pool存储的Worker的个数。需要一个JobQueue,用来充当全局JobQueue的作用,所有的job先存储到该全局JobQueue中。有个WorkerQueue是个channel,用来存储空闲的Worker,有Job来的时候,从WorkerQueue里面取出一个Worker,执行相应的任务,执行完成以后,重新将Worker放回WorkerQueue。
WorkerPool需要一个启动函数,一个是来启动Size数量的Worker线程,一个是需要启动一个新的线程,来接收Job。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
// 线程池 type WorkerPool struct { // 线程池大小 Size int // 不带缓冲的任务队列,任务到达后,从workerQueue随机选取一个Worker来执行Job JobQueue JobChan WorkerQueue chan *Worker }
func NewWorkerPool(poolSize, jobQueueLen int) *WorkerPool { return &WorkerPool{ poolSize, make(JobChan, jobQueueLen), make(chan *Worker, poolSize), } }
func (wp *WorkerPool) Start() {
// 将所有worker启动 for i := 0; i < wp.Size; i++ { worker := NewWorker() worker.Start(wp) }
// 监听JobQueue,如果接收到请求,随机取一个Worker,然后将Job发送给该Worker的JobQueue // 需要启动一个新的协程,来保证不阻塞 go func() { for { select { case job := <-wp.JobQueue: worker := <-wp.WorkerQueue worker.JobQueue <- job } } }()
}
|
2.3.4 代码调用举例
接下来,举例分析如何使用该线程池。首先需要定义你要执行的任务,实现RunTask函数。然后初始化一个WorkerPool,将模拟百万请求的数据发送给全局JobQueue。交给线程池进行任务处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
//需要执行任务的结构体 type Task struct { Number int }
// 实现Job这个interface的RunTask函数 func (t Task) RunTask(request interface{}) { fmt.Println("This is task: ", t.Number) //设置个等待时间 time.Sleep(1 * time.Second) }
func main() {
// 设置线程池的大小 poolNum := 100 * 100 * 20 jobQueueNum := 100 workerPool := threadpool.NewWorkerPool(poolNum, jobQueueNum) workerPool.Start()
// 模拟百万请求 dataNum := 100 * 100 * 100
go func() { for i := 0; i < dataNum; i++ { task := Task{Number: i} workerPool.JobQueue <- task } }()
// 阻塞主线程 for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
|
参考
标签:WorkerPool,
Worker,
JobQueue,
Golang,
并发,
Job,
线程
From: https://www.cnblogs.com/oxspirt/p/17842957.html