首页 > 编程语言 >Java开发者的Golang进修指南:从0->1带你实现协程池

Java开发者的Golang进修指南:从0->1带你实现协程池

时间:2024-01-22 13:44:28浏览次数:67  
标签:Java Worker worker Golang 协程池 job 协程 id

在Java编程中,为了降低开销和优化程序的效率,我们常常使用线程池来管理线程的创建和销毁,并尽量复用已创建的对象。这样做不仅可以提高程序的运行效率,还能减少垃圾回收器对对象的回收次数。

在Golang中,我们知道协程(goroutine)由于其体积小且效率高,在高并发场景中扮演着重要的角色。然而,资源始终是有限的,即使你的内存很大,也会有一个限度。因此,协程池在某些情况下肯定也会有其独特的适用场景。毕竟,世上并不存在所谓的银弹,只有在特定情况下才能找到最优的解决方案。因此,在Golang中,我们仍然需要考虑使用协程池的情况,并根据具体场景来选择最佳的解决方案。

今天,我们将从Java线程池的角度出发,手把手地带你实现一个Golang协程池。如果你觉得有些困难,记住我们的宗旨是使用固定数量的协程来处理所有的任务。这样做的好处是可以避免协程数量过多导致资源浪费,也能确保任务的有序执行。让我们一起开始,一步步地构建这个Golang协程池吧!

协程池

首先,让我们编写一个简单的多协程代码,并逐步进行优化,最终实现一个简化版的协程池。假如我们有10个任务需要处理。我们最简单做法就是直接使用go关键字直接去生成相应的协程即可,比如这样:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg  sync.WaitGroup
    wg.Add(5)
    for i := 1; i <= 5; i++ {
        go func(index int) {
            fmt.Printf("Goroutine %d started\n", index)
            time.Sleep(1 * time.Second)
            fmt.Printf("Goroutine %d finished\n", index)
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println("All goroutines finished")
}

我们当前的用法非常简单,无需专门维护goroutine容量大小。每当有一个任务出现,我们就创建一个goroutine来处理。现在,让我们进一步优化这种用法。

优化版协程池

现在我们需要首先创建一个pool对象和一个专门用于运行协程的worker对象。如果你熟悉Java中线程池的源码,对于worker这个名称应该不会陌生。worker是一个专门用于线程复用的对象,而我们的worker同样负责运行任务。

实体Job没啥好说的,就是我们具体的任务。我们先来定义一个简单的任务实体Job:

type Job struct {
    id  int
    num int
}

我们来看下主角Worker定义:

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

你可以看到,我的实体拥有一个channel。这个channel类似于我们想要获取的任务队列。与Java中使用链表形式并通过独占锁获取共享链表这种临界资源的方式不同,我选择使用了Golang中的channel来进行通信。因为Golang更倾向于使用channel进行通信,而不是共享资源。所以,我选择了使用channel。为了避免在添加任务时直接阻塞,我特意创建了一个带有任务缓冲的channel。

在这里,Start()方法是一个真正的协程,我会从channel中持续获取任务,以保持这个协程不被销毁,直到没有任务可获取为止。这个逻辑类似于Java中的线程池。

让我们进一步深入地探讨一下Pool池的定义:

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

他的定义可能有点复杂,但是我可以用简单的语言向你解释,就是那些东西,只是用Golang的写法来实现,与Java的实现方式类似。

首先,我定义了一个名为worker的数组,用于存储当前存在的worker数量。这里并没有实现核心和非核心worker的区分。另外,我还创建了一个独立的channel,用于保存可缓冲任务的大小。这些参数在初始化时是必须提供的。

Start方法的主要目的是启动worker开始执行任务。首先,它会启动一个核心协程,等待任务被派发。然后,dispatchJobs方法会开始监听channel是否有任务,如果有任务,则会将任务分派给worker进行处理。在整个过程中,通过channel进行通信,没有使用链表或其他共享资源实体。需要注意的是,dispatchJobs方法在另一个协程中被调用,这是为了避免阻塞主线程。

getLeastBusyWorker方法是用来获取阻塞任务最少的worker的。这个方法的主要目标是保持任务平均分配。而AddJob方法则是用来直接向channel中添加job任务的,这个方法比较简单,不需要过多解释。

协程池最终实现

image

经过一系列的反复修改和优化,我们终于成功实现了一个功能完善且高效的Golang协程池。下面是最终版本的代码:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    id  int
    num int
}

type Worker struct {
    id         int
    jobChannel chan Job
    wg         *sync.WaitGroup
}

func NewWorker(id int, wga *sync.WaitGroup) *Worker {
    return &Worker{
        id:         id,
        jobChannel: make(chan Job, 1),
        wg:         wga,
    }
}

func (w *Worker) Start() {
    go func() {
        
        for job := range w.jobChannel {
            fmt.Printf("Worker %d is processing job %d\n", w.id, job.id)
            // 模拟任务处理
            fmt.Println("jobs 模拟任务处理")
            time.Sleep(2 * time.Second) // 休眠2秒钟
            result := job.num * 2
            fmt.Printf("Worker %d finished job %d, result: %d\n", w.id, job.id, result)
            w.wg.Done()
        }
    }()
}

type Pool struct {
    workers []*Worker
    jobChan chan Job
    wg      sync.WaitGroup
}

func NewPool(numWorkers, jobQueueSize int) *Pool {
    pl := &Pool{
        workers: make([]*Worker, numWorkers),
        jobChan: make(chan Job, jobQueueSize),
    }
    for i := 0; i < numWorkers; i++ {
        pl.workers[i] = NewWorker(i+1, &pl.wg)
    }
    return pl
}

func (p *Pool) Start() {
    for _, worker := range p.workers {
        worker.Start()
    }
    go p.dispatchJobs()
}

func (p *Pool) dispatchJobs() {
    for job := range p.jobChan {
        worker := p.getLeastBusyWorker()
        worker.jobChannel <- job
    }
}

func (p *Pool) getLeastBusyWorker() *Worker {
    leastBusy := p.workers[0]
    for _, worker := range p.workers {
        if len(worker.jobChannel) < len(leastBusy.jobChannel) {
            leastBusy = worker
        }
    }
    return leastBusy
}

func (p *Pool) AddJob(job Job) {
    fmt.Println("jobs add")
    p.wg.Add(1)
    p.jobChan <- job
}

func main() {
    pool := NewPool(3, 10)
    pool.Start()
    // 添加任务到协程池
    for i := 1; i <= 15; i++ {
        pool.AddJob(Job{
            id:  i,
            num: i,
        })
    }

    // 等待所有任务完成
    pool.wg.Wait()
    close(pool.jobChan)
    fmt.Println("All jobs finished")
}

三方协程池

在这种场景下,既然已经有一个存在的场景,那么显然轮子是肯定有的。不论使用哪种编程语言,我们都可以探索一下,以下是Golang语言中关于三方协程池的实现工具。

ants: ants是一个高性能的协程池实现,支持动态调整协程池的大小,可以通过简单的API调用来将任务提交给协程池进行执行。官方地址:https://github.com/panjf2000/ants

gopool:gopool 是一个高性能的 goroutine 池,旨在重用 goroutine 并限制 goroutine 的数量。官方地址:https://github.com/bytedance/gopkg/tree/develop/util/gopool

这些库都提供了简单易用的API,可以方便地创建和管理协程池,同时也支持动态调整协程池的大小,以满足不同场景下的需求。

总结

当然,我写的简易版协程池还有很多可以优化的地方,比如可以实现动态扩容等功能。今天我们要简单总结一下协程池的优势,主要是为了降低资源开销。协程池的好处在于可以重复利用协程,避免频繁创建和销毁协程,从而减少系统开销,提高系统性能。此外,协程池还可以提高响应速度,因为一旦接收到任务,可以立即执行,不需要等待协程创建的时间。另外,协程池还具有增强可管理性的优点,可以对协程进行集中调度和统一管理,方便进行性能调优。

标签:Java,Worker,worker,Golang,协程池,job,协程,id
From: https://www.cnblogs.com/guoxiaoyu/p/17953496

相关文章

  • 使用Javamail接收imaps协议的邮件
    网上的消息不能说大多,只能说基本都过时了,连imap和imaps都不分了本文基于apache-james项目搭建的邮件服务器,其他邮件服务器仅供参考首先是依赖,这里需要引入两个依赖,如下<dependency><groupId>javax.mail</groupId><artifactId>javax.mail-api</artifactId>......
  • 到底什么样的 Java 项目用 Solon 好???
    什么样的Java项目用Solon好就像华为讲的,不要因为爱国而特意买华为手机。Solon也是,有需要就用不需要就跳过(按正常的需求选择):信创需要国产化,应该用Solon或者SolonCloud(有案例)军工项目要国产化,应该用Solon或者SolonCloud(有案例)嵌入式设备,内存有限,算力差,可以用Solo......
  • JavaScript 中的展开运算符是什么?
    展开运算符(SpreadOperator)是JavaScript中的一种语法,用于将可迭代对象(如数组或字符串)展开为独立的元素。它使用三个连续的点号(...)作为操作符。展开运算符可以在多种情况下使用,包括数组、对象和函数调用等。下面是一些展开运算符的用法示例:1:展开数组:使用展开运算符可以将一......
  • Java 如何将Excel转换为TXT文本格式
    TXT文件是一种非常简单、通用且易于处理的文本格式。在处理大规模数据时,将Excel转为TXT纯文本文件可以提高处理效率。此外,许多编程语言和数据处理工具都有内置的函数和库来读取和处理TXT文件,因此将Excel文件转换为TXT还可以简化数据导入过程。本文将介绍如何使用Java将Excel转为TX......
  • [转]一篇搞懂javascript正则表达式
    原文地址:一篇搞懂javascript正则表达式-知乎最近在看vue源码的时候发现一个令人头疼的问题,就是正则表达式,在此之前我对正则只有一知半解,没有深入了解,所以看到正则高级写法都不知是什么含义,哎...,所以就去查看相关资料和博主写的,特意整理记录一下学习的过程并用通俗易懂的文章分......
  • html,css,javaSript
    html,css,javaSript1.认识结构:对应的是HTML语言表现:对应的是CSS语言行为:对应的是JavaScript语言2.标签标题:h1-h6横线效果:hr字体:font(face,color,size)换行br段落p加粗b斜体i下划线u文本居中center图片img(src,height,width)音频audio(src,controls)视频vide......
  • java实体类转化geojson的工具类
    1.用到的技术、工具:反射+geotools2.代码实现packageorg.jeecg.modules.web.util.geoutils;importcn.hutool.core.util.ReflectUtil;importcn.hutool.core.util.StrUtil;importcn.hutool.json.JSONArray;importcn.hutool.json.JSONObject;importcn.hutool.json.JSON......
  • Java并发基础:Executor接口和Executors类的区别
    Executor是Java中的一个接口,它定义了一种将任务提交与任务执行机制(包括线程管理、调度等)分离的方式,Executors是一个工具类,它提供了多个静态工厂方法,用于创建不同类型的Executor实例。代码案例下面展示了如何使用Executor接口来执行异步任务,如下代码案例:importjava.util.conc......
  • [转]Java Stream API入门篇
    原文地址:JavaStreamAPI入门篇-CarpenterLee-博客园本文github地址你可能没意识到Java对函数式编程的重视程度,看看Java8加入函数式编程扩充多少功能就清楚了。Java8之所以费这么大功夫引入函数式编程,原因有二:代码简洁,函数式编程写出的代码简洁且意图明确,使用stream接口......
  • [转]Java Stream API进阶篇
    原文地址:JavaStreamAPI进阶篇-CarpenterLee-博客园本文github地址上一节介绍了部分Stream常见接口方法,理解起来并不困难,但Stream的用法不止于此,本节我们将仍然以Stream为例,介绍流的规约操作。规约操作(reductionoperation)又被称作折叠操作(fold),是通过某个连接动作将所有......