首页 > 其他分享 >golang批量处理M个Task设置消费者只有N个然后逐个消费

golang批量处理M个Task设置消费者只有N个然后逐个消费

时间:2022-11-08 11:25:19浏览次数:42  
标签:Task func ctx args list golang interface context 逐个

1、消费逻辑封装:

package utils

import (
    "context"
    "errors"
    "fmt"
)

// ConsumerFunc param second is list.item
type ConsumerFunc func(context.Context, interface{}) error

func Run(ctx context.Context, consumerNumber int, list []interface{}, f ConsumerFunc) (chan string, error) {
    if consumerNumber == 0 {
        return nil, errors.New("consumer number must gt 0")
    }
    result := make(chan string, len(list))
    defer close(result)
    taskList := make(chan interface{}, len(list))
    defer close(taskList)
    for _, v := range list {
        taskList <- v
    }
    finishList := make(chan interface{}, len(list))
    go func() {
        for i := 0; i < consumerNumber; i++ {
            go func() {
                for v := range taskList {
                    func() {
                        defer func() {
                            finishList <- v
                        }()
                        err := f(ctx, v)
                        if err != nil {
                            result <- fmt.Sprintf("%s catch exception %s", fmt.Sprint(v), err.Error())
                        }
                    }()
                }
            }()
        }
    }()
    finishCount := 0
L:
    for {
        select {
        case <-ctx.Done():
            break L
        case <-finishList:
            finishCount++
            if finishCount == len(list) {
                break L
            }
        }
    }
    return result, nil
}

 

2、调用示例:

package utils

import (
    "context"
    "fmt"
    "strconv"
    "testing"
    "time"
)

func C(ctx context.Context, i interface{}) error {
    z := i.(string)
    s, _ := strconv.Atoi(z)
    time.Sleep(time.Second * time.Duration(s))
    fmt.Println(i)
    return nil
}

func TestRun(t *testing.T) {
    type args struct {
        ctx            context.Context
        consumerNumber int
        list           []interface{}
        f              func(context.Context, interface{}) error
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
    defer cancel()
    tests := []struct {
        name string
        args args
    }{
        {
            name: "test one consumer",
            args: args{
                ctx:            ctx,
                consumerNumber: 1,
                list:           []interface{}{"1", "2", "3", "1", "2", "1", "2"},
                f:              C,
            },
        },
        {
            name: "test seven consumer",
            args: args{
                ctx:            ctx,
                consumerNumber: 7,
                list:           []interface{}{"1", "2", "3", "1", "2", "1", "2"},
                f:              C,
            },
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            result, err := Run(tt.args.ctx, tt.args.consumerNumber, tt.args.list, tt.args.f)
            if err != nil {
                t.Log(err)
            }
            for v := range result {
                t.Logf("%v", v)
            }
        })
    }
}

 

3、推荐的官方包:

 

https://pkg.go.dev/golang.org/x/sync/errgroup

 

标签:Task,func,ctx,args,list,golang,interface,context,逐个
From: https://www.cnblogs.com/xuweiqiang/p/16869048.html

相关文章

  • C#里的多线程,一网打尽thread,task,parallel
    C#里的多线程,一网打尽1.Syncawait2.Thread3.Threadpool4.Task5.Parallel Tasktask=newTask(()=>{});task.Start();Tasktask=Task.Run(()=>{};TaskFactoryta......
  • golang-ssh-01:执行远程命令
    golang-ssh-01:执行远程命令Go&Rust......
  • Golang认识和环境搭建
    1.Go语言学习参考网站Go语言:https://golang.org/Go语言中文网:https://studygolang.com/Go语言包管理:https://gopm.io/2.Golang认识为什么Golang在近几年越来越火?go是......
  • Golang学习之路3-基础认识(下)
    @目录前言一、数组1.定长数组2.不定长数组二、map1.使用关键字map来声明2.使用make来声明3.添加元素4.检索key的value是否存在5.删除元素6.遍历map7.map的注意点三、指......
  • 异步编程-Task类
    异步编程-Task类1.Task<Task>WhenAny(IEnumerable<Task>tasks)等,任何一个Task完成,Task就完成2.Task<TResult[]>WhenAll<TResult>(paramsTask<TResult>[]tasks)等,......
  • golang的变量介绍与使用
    变量变量的使用步骤:声明、赋值、使用packagemainimport"fmt"funcmain(){ //1.变量的声明 varageint //2.变量的赋值 age=18 //3.变量的使用 fmt.Pr......
  • task3
    1.#include<stdio.h>#include<time.h>#include<stdlib.h>#include<windows.h>#defineN80voidprint_text(intline,intcol,chartext[]);voidprint_spaces(intn);......
  • Golang创建项目并启动
    Golang创建项目并启动Golang创建项目并启动使用bee创建文件goods为项目名beenewgoods运行项目beerun项目结构tizi365├──conf-配置文件存......
  • Golang安装包并配置
    Golang安装包并配置Golang安装包并配置安装beego#安装beego核心包goget-ugithub.com/beego/beego/v2#安装orm包用于操作数据库,beego的orm包是独立的模块需要单独......
  • Golang基础入门
    util用于定义常用函数让其他文件来调用db定义对数据库操作的函数打包基本语法:package包名引入包基本语法,import"包路径"使用包函数包名.函数名()接口的基本语......