首页 > 其他分享 >Golang实现简易的顺序执行协程池

Golang实现简易的顺序执行协程池

时间:2023-12-12 14:47:02浏览次数:28  
标签:task name 简易 int Golang 协程池 func executor iCountableTask

countable_executor.go

// 一个可计数的单线程顺序任务执行器
type CountableExecutor struct {
    name       string              // 名称
    taskQueue  chan iCountableTask // 任务队列
    bufferSize int                 // 缓冲区大小
}

// 一个可计数的单线程任务执行器
type ICountableExecutor interface {
    GetName() string
    Execute(func())
    GetQueueLength() int
    DelayedExecute(delayedTime time.Duration, runnable func())
    beforeOfferTask(iCountableTask)
    beforeTaskStart(iCountableTask)
    beforeTaskEnd(iCountableTask)
}

func NewCountableExecutor(name string, bufferSize int) ICountableExecutor {
    executor := &CountableExecutor{
        name:       name,
        taskQueue:  make(chan iCountableTask, bufferSize),
        bufferSize: bufferSize,
    }
    go executor.startTaskWheel()
    return executor
}

func (executor *CountableExecutor) startTaskWheel() {
    for task := range executor.taskQueue {
        executor.beforeTaskStart(task)
        task.run()
    }
}

func (executor *CountableExecutor) Execute(runnable func()) {
    task := newCountableTask(executor, runnable)
    executor.beforeOfferTask(task)
    executor.taskQueue <- task
}

/*
@Param delayedTime     任务延时执行的时间
*/
func (executor *CountableExecutor) DelayedExecute(delayedTime time.Duration, runnable func()) {
    time.AfterFunc(delayedTime, func() {
        executor.Execute(runnable)
    })
}

func (executor *CountableExecutor) beforeOfferTask(task iCountableTask) {
    queueLength := len(executor.taskQueue)
    if queueLength > 20 {
        logger.Warnf("入队检查【%s】:队列长度过长,请检查服务器状态!length:%d\n", executor.name, queueLength)
    }
}

func (executor *CountableExecutor) beforeTaskStart(task iCountableTask) {
    task.setStartTime(NowTimestampMs())
    if task.getWaitMs() > 300 {
        logger.Warnf("执行任务前检查【%s】:任务等待过久,请检查服务器状态!waitMs:%d\n", executor.name, task.getWaitMs())
    }
}

func (executor *CountableExecutor) beforeTaskEnd(task iCountableTask) {
    if task.getExeMs() > 200 {
        logger.Warnf("执行任务后检查【%s】:任务执行时间过久,请检查服务器状态!exeMs:%d\n", executor.name, task.getExeMs())
    }
}

func (executor *CountableExecutor) GetQueueLength() int {
    return len(executor.taskQueue)
}

func (executor *CountableExecutor) GetName() string {
    return executor.name
}

// =============== 任务 ===============
type countableTask struct {
    executor     ICountableExecutor
    offerTime    int64 // 加入队列的时间
    exeStartTime int64 // 开始执行任务的时间
    exeEndTime   int64 // 执行任务结束时间
    runnable     func()
}
type iCountableTask interface {
    setStartTime(int64)
    run()             // 执行任务
    getWaitMs() int64 // 等待执行耗时
    getExeMs() int64  // 实际执行耗时
}

func newCountableTask(executor ICountableExecutor, runnable func()) iCountableTask {
    return &countableTask{
        executor:  executor,
        runnable:  runnable,
        offerTime: NowTimestampMs(),
    }
}

// 执行任务
func (task *countableTask) run() {
    defer task.recoverErr()
    defer task.executor.beforeTaskEnd(task)
    task.runnable()
    task.exeEndTime = NowTimestampMs()
}

func (task *countableTask) recoverErr() {
    if e := recover(); e != nil {
        var stackBuf [1024]byte
        stackBufLen := runtime.Stack(stackBuf[:], false)
        logger.Error("协程池【", task.executor.GetName(), "】执行任务发生异常:", e)
        logger.Errorf("==> %s\n", string(stackBuf[:stackBufLen]))

    }
}

// 设置任务开始执行的时间
func (task *countableTask) setStartTime(exeStartTime int64) {
    task.exeStartTime = exeStartTime
}

// 获取执行任务的耗时
func (task *countableTask) getWaitMs() int64 {
    return task.exeStartTime - task.offerTime
}

// 获取执行任务的耗时
func (task *countableTask) getExeMs() int64 {
    return task.exeEndTime - task.exeStartTime
}

 

hash.go

import (
    "fmt"
    "hash/crc32"
)

// 使用crc32作为一致性hash算法
func CalcHash(key interface{}) uint32 {
    crc32q := crc32.MakeTable(crc32.Castagnoli)
    return crc32.Checksum([]byte(fmt.Sprint(key)), crc32q)
}

// 使用crc32作为一致性hash算法,返回值不包含bucketNum,范围 0~bucketNum-1
func CalcHashRange(key interface{}, bucketNum int) int {
    hash := CalcHash(key)
    return int(hash % uint32(bucketNum))
}

 

countable_executors.go 



import (
    "fmt"
)

// 任务执行器池,获取时通过hash散列 type CountableExecutors struct { name string // 名称 executors []ICountableExecutor // 执行器 executorNum int // 执行器数量 } // 一个可计数的单线程任务执行器 type ICountableExecutors interface { GetExecutor(interface{}) ICountableExecutor Execute(interface{}, func()) } func NewCountableExecutors(name string, executorNum int, bufferSize int) ICountableExecutors { countableExecutors := &CountableExecutors{ name: name, executors: make([]ICountableExecutor, executorNum), executorNum: executorNum, } for i := 1; i <= executorNum; i++ { countableExecutors.executors[i-1] = NewCountableExecutor(fmt.Sprint(name, i), bufferSize) } return countableExecutors } func (countableExecutors *CountableExecutors) GetExecutor(res interface{}) ICountableExecutor { executorIndex := CalcHashRange(res, countableExecutors.executorNum) return countableExecutors.executors[executorIndex] } func (countableExecutors *CountableExecutors) Execute(res interface{}, runnable func()) { executorIndex := CalcHashRange(res, countableExecutors.executorNum) countableExecutors.executors[executorIndex].Execute(runnable) }

 

使用示例:

func demo(){
    corePool := myutil.NewCountableExecutors("core-routine-", config.GlobalConfig.Executor.CorePoolSize, 1024)
    userId := 123
    corePool.Execute(userId, func(){
        println("user 123 do something...")
    })
}

 

标签:task,name,简易,int,Golang,协程池,func,executor,iCountableTask
From: https://www.cnblogs.com/feng-gamer/p/17896756.html

相关文章

  • 上机编程-简易DHCP服务器
    题目描述DHCP服务器的功能是为每一个MAC地址分配唯一的IP地址。现假设:分配的IP地址范围从192.168.0.0到192.168.0.255总共256个可用地址(以点分十进制表示)。请实现一个简易的DHCP服务器,功能如下:分配Request:根据输入的MAC地址分配IP地址池中的IP地址:如果对应的IP已分配并......
  • golang之泛型
    Go1.18版本增加了对泛型的支持,泛型也是自Go语言开源以来所做的最大改变。 泛型允许程序员在强类型程序设计语言中编写代码时使用一些以后才指定的类型,在实例化时作为参数指明这些类型。ーー换句话说,在编写某些代码或数据结构时先不提供值的类型,而是之后再提供。泛型是一种......
  • golang按换行符一行一行读取GBK文件
    packageawesomeProject1import( "bufio" "fmt" "github.com/axgle/mahonia" "io" "log" "os")funcmain(){ filename:="/tmp/test.txt" readTbkByLine(filename)}funcreadTbkBy......
  • 基于Redis的简易延时队列
    基于Redis的简易延时队列一、背景在实际的业务场景中,经常会遇到需要延时处理的业务,比如订单超时未支付,需要取消订单,或者是用户注册后,需要在一段时间内激活账号,否则账号失效等等。这些业务场景都可以通过延时队列来实现。最近在实际业务当中就遇到了这样的一个场景,需要实现一个......
  • scrapy框架之自定义简易scrapy框架
    自定义low版Scrapy框架:1fromtwisted.internetimportreactor#事件循环(终止条件,所有的socket都已经移除)2fromtwisted.web.clientimportgetPage#socket对象(如果下载完成..自动从事件循环中移除)3fromtwisted.internetimportdefer#defer.Deferred特殊的soc......
  • beego框架 golang web项目-个人博客系统
    beego框架golangweb项目-个人博客系统beego个人博客系统功能介绍首页分页展示博客博客详情评论文章专栏分类导航资源分享时光轴点点滴滴关于本站后台管理登录系统设置分类添加修改删除管理博文添加修改删除管理基于Go语言和beego框架前端使用layui布局开发的......
  • 简易计算机的搭建
    简易计算机的搭建1、一些无关紧要的前置知识​ 现代计算机类设备的主流架构一般有两种:一为冯诺依曼体系架构;一为哈弗架构。​ 主流计算机采用的架构一般为冯诺依曼体系,是将程序和数据放在一起存储的架构;​ 单片机设备一般采用哈弗架构,是将程序与数据分开存储的一种架构。​ ......
  • Golang标准库:syslog包代码示例
    以下是一个示例代码,展示了如何使用syslog包进行系统日志记录:packagemainimport( "log" "log/syslog")funcmain(){ //创建一个连接到本地系统日志的写入器 writer,err:=syslog.New(syslog.LOG_INFO,"Example") iferr!=nil{ log.Fatal("Failedtoconnect......
  • Golang标准库:expvar 包代码示例
    expvar包提供了一种在运行时公开程序内部变量的方法,以便进行监控和调试。以下是一个示例代码,展示了如何使用expvar包:packagemainimport( "expvar" "fmt" "net/http")funcmain(){ //定义一个expvar.Int变量 counter:=expvar.NewInt("counter") //设置一个......
  • Golang os包代码示例:获取命令行参数、获取环境变量、创建和删除文件、检查文件或目录
    以下是一些示例代码,展示了如何使用os包进行操作系统相关的操作:获取命令行参数:packagemainimport( "fmt" "os")funcmain(){ args:=os.Args fori,arg:=rangeargs{ fmt.Printf("Argument%d:%s\n",i,arg) }}获取环境变量:packagemainimport( "f......