首页 > 其他分享 >[转]Golang线程池实现百万级高并发

[转]Golang线程池实现百万级高并发

时间:2023-11-19 23:23:12浏览次数:32  
标签:WorkerPool Worker JobQueue Golang 并发 Job 线程

 

转,原文: https://lk668.github.io/2021/03/22/2021-03-22-Golang%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E7%99%BE%E4%B8%87%E7%BA%A7%E9%AB%98%E5%B9%B6%E5%8F%91/

--------------

Golang线程池实现百万级高并发

本文基于Golang实现线程池,从而可以达到百万级别的高并发请求。本文实现的代码在https://github.com/lk668/threadpool可见。

1. Golang并发简介

Golang原生的goroutine可以很轻松实现并发编程。Go语言的并发是基于用户态的并发,这种并发方式就变得非常轻量,能够轻松运行几万并发逻辑。Go 的并发属于 CSP 并发模型的一种实现,CSP 并发模型的核心概念是:不要通过共享内存来通信,而应该通过通信来共享内存。

2. 并发方案演进

2.1 直接使用goroutine

直接使用goroutine启动一个新的线程来进行任务的执行

1
go handler(request)

2.2 缓冲队列

利用channel实现一个缓冲队列,每次请求先放入缓冲队列,然后从缓冲队列读取数据,一次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Job interface{
Run()
}

// 长度为1000的缓冲队列
jobQueue := make(chan Job, 1000)

// 启动一个协程来读取缓冲队列的数据
go func(){
for {
select {
case job := <- jobQueue:
job.Run()
}
}
}()

// 请求发送
job := Job{}
jobQueue <- job

该方案在请求量低于缓冲队列长度时,可以应对并发请求。但是当并发请求量大于缓冲队列长度时,channel会出现阻塞情况。

2.3 线程池实现百万级高并发

更好的实现方案是利用job队列+线程池来实现,具体如下所示:

有个全局JobQueue,用来存储需要执行的Job,有个WorkerPool的线程池,用来存储空闲的Worker。当JobQueue中有Job时,从JobQueue获取Job对象,从WorkerPool获取空闲Worker,将Job对象发送给Worker,进行执行。每个Worker都是一个独立的Goroutine。从而真正意义上实现高并发。

代码实现主要分为三部分

2.3.1 Job定义

Job是一个interface,其下有个函数RunTask,用户定制化的任务,需要实现RunTask函数。JobChan是一个Job channel结构。Job是高并发需要执行的任务

1
2
3
4
5
type Job interface {
RunTask(request interface{})
}

type JobChan chan Job

2.3.2 Worker定义

Worker就是高并发里面的一个线程,启动的时候是一个Goroutine。Worker结构一需要一个JobChan,用来接收从全局JobQueue里面获取的Job对象。有个Quit的channel,用来接收退出信号。需要一个Start函数,将自己注册到WorkerPool,然后监听Job,有Job传入时,处理Job的RunTask,处理完成之后,重新将自己添加回WorkerPool。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Worker结构体
// WorkerPool随机选取一个Worker,将Job发送给Worker去执行
type Worker struct {
// 不需要带缓冲的任务队列
JobQueue JobChan
//退出标志
Quit chan bool
}

// 创建一个新的Worker对象
func NewWorker() Worker {
return Worker{
make(JobChan),
make(chan bool),
}
}

// 启动一个Worker,来监听Job事件
// 执行完任务,需要将自己重新发送到WorkerPool
func (w Worker) Start(workerPool *WorkerPool) {
// 需要启动一个新的协程,从而不会阻塞
go func() {
for {
// 将worker注册到线程池
workerPool.WorkerQueue <- &w
select {
case job := <-w.JobQueue:
job.RunTask(nil)
// 终止当前worker
case <-w.Quit:
return
}
}
}()
}

2.3.3 WorkerPool定义

WorkerPool是一个线程池,用来存储Worker。所以需要一个Size变量,用来表示这个Pool存储的Worker的个数。需要一个JobQueue,用来充当全局JobQueue的作用,所有的job先存储到该全局JobQueue中。有个WorkerQueue是个channel,用来存储空闲的Worker,有Job来的时候,从WorkerQueue里面取出一个Worker,执行相应的任务,执行完成以后,重新将Worker放回WorkerQueue。

WorkerPool需要一个启动函数,一个是来启动Size数量的Worker线程,一个是需要启动一个新的线程,来接收Job。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 线程池
type WorkerPool struct {
// 线程池大小
Size int
// 不带缓冲的任务队列,任务到达后,从workerQueue随机选取一个Worker来执行Job
JobQueue JobChan
WorkerQueue chan *Worker
}

func NewWorkerPool(poolSize, jobQueueLen int) *WorkerPool {
return &WorkerPool{
poolSize,
make(JobChan, jobQueueLen),
make(chan *Worker, poolSize),
}
}

func (wp *WorkerPool) Start() {

// 将所有worker启动
for i := 0; i < wp.Size; i++ {
worker := NewWorker()
worker.Start(wp)
}

// 监听JobQueue,如果接收到请求,随机取一个Worker,然后将Job发送给该Worker的JobQueue
// 需要启动一个新的协程,来保证不阻塞
go func() {
for {
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker.JobQueue <- job
}
}
}()

}

2.3.4 代码调用举例

接下来,举例分析如何使用该线程池。首先需要定义你要执行的任务,实现RunTask函数。然后初始化一个WorkerPool,将模拟百万请求的数据发送给全局JobQueue。交给线程池进行任务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//需要执行任务的结构体
type Task struct {
Number int
}

// 实现Job这个interface的RunTask函数
func (t Task) RunTask(request interface{}) {
fmt.Println("This is task: ", t.Number)
//设置个等待时间
time.Sleep(1 * time.Second)
}

func main() {

// 设置线程池的大小
poolNum := 100 * 100 * 20
jobQueueNum := 100
workerPool := threadpool.NewWorkerPool(poolNum, jobQueueNum)
workerPool.Start()

// 模拟百万请求
dataNum := 100 * 100 * 100

go func() {
for i := 0; i < dataNum; i++ {
task := Task{Number: i}
workerPool.JobQueue <- task
}
}()

// 阻塞主线程
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}

参考

标签:WorkerPool,Worker,JobQueue,Golang,并发,Job,线程
From: https://www.cnblogs.com/oxspirt/p/17842957.html

相关文章

  • [转]手把手教你如何用golang实现一个timewheel时间轮
     转,原文:https://lk668.github.io/2021/04/05/2021-04-05-%E6%89%8B%E6%8A%8A%E6%89%8B%E6%95%99%E4%BD%A0%E5%A6%82%E4%BD%95%E7%94%A8golang%E5%AE%9E%E7%8E%B0%E4%B8%80%E4%B8%AAtimewheel/-------------------------- 手把手教你如何用golang实现一个timewheel时间轮202......
  • 多核CPU条件下的并发和并行理解
    操作系统课本上的并发和并行并发​是指两个或多个事件在同一时间间隔内交替发生并行是指两个或多个事件在同一时刻发生并行编程中的并发和并行在接触并行编程之前,认为多线程是并发的一种,因为一个处理单元每次只能处理一个线程,因此多个线程也只是一个时间间隔内的交替执行而已......
  • 进程与线程
    进程程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理IO的当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。进程就可以视为......
  • Java 多线程事务控制
    Java多线程怎么做事务控制公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。具体操作如下:一、循环操作的代码先写一个最简单的for循环代码,看看耗时情况怎么样。/......
  • go并发 - channel
    概述并发编程是利用多核心能力,提升程序性能,而多线程之间需要相互协作、共享资源、线程安全等。任何并发模型都要解决线程间通讯问题,毫不夸张的说线程通讯是并发编程的主要问题。go使用著名的CSP(CommunicatingSequentialProcess,通讯顺序进程)并发模型,从设计之初Go语言就注重如......
  • 性能测试---并发线程数&QPS&平均耗时&95分位耗时
    文章转发,原文来自:https://cloud.tencent.com/developer/article/1784548?ivk_sa=1024320u【概念解释】并发线程数:指的是施压机施加的同时请求的线程数量。比如,我启动并发线程数100,即我会在施压机器上面启动100个线程,不断地向服务器发请求。QPS:每秒请求数,即在不断向服务器发送请......
  • go并发 - goroutine
    概述Go并发模型独树一帜,简洁、高效。Go语言最小执行单位称为协程(goroutine),运行时可以创建成千万上个协程,这在Java、C等线程模型中是不可想象的,并发模型是Go的招牌能力之一。很多文章描述协程是轻量级的线程,并不准确,两者在底层有本质区别。线程是由操作系统维护,以Linux为例,系统......
  • 【Python自动化】定时自动采集,并发送微信告警通知,全流程案例讲解!
    目录一、概要二、效果演示三、代码讲解3.1爬虫采集行政处罚数据3.2存MySQL数据库3.3发送告警邮件&微信通知3.4定时机制四、总结一、概要您好!我是@马哥python说,一名10年程序猿。我原创开发了一套定时自动化爬取方案,完整开发流程如下:采集数据->筛选数据->存MySQL数据库......
  • c++线程专题
    逐步更新中~~~,参考书籍《C++并发编程实战(第2版)》,不照搬书,只写理解感悟。引入头文件#include<thread>线程启动std::threadt(my_func);若需等待线程执行完毕,才继续之后的代码,用joinif(t.joinable()){t.join();}若不等待,可以分离出去(分离出去的线程被称为守护......
  • 28_rust_无畏并发
    无畏并发Concurrent:程序不同部分之间独立执行;Parallel:程序不同部分同时运行。rust无畏并发:允许编写没有细微Bug的代码。并在不引入新Bug的情况下易于重构。这里所说的“并发”泛指concurrent和parallel。使用线程同时运行代码1:1模型:实现线程的方式:通过调用OS的API创建线程。......