首页 > 其他分享 >【Go-多线程】Golang的channel实现消息的批量处理

【Go-多线程】Golang的channel实现消息的批量处理

时间:2024-06-05 17:44:15浏览次数:29  
标签:option AggregatorOption aggregator items Golang func time Go 多线程

【Go-多线程】Golang的channel实现消息的批量处理。当消息量特别大时,使用kafka之类的message queue是首选,但这是更加轻量的方案

channelx.go

//这个方案需要实现以下几点:
//1.消息聚合后处理(最大条数为BatchSize),核心:
//(1)带buffer的channel相当于一个FIFO的队列
//(2)多个常驻的goroutine来提高并发
//(3)goroutine之间是并行的,但每个goroutine内是串行的,所以对batch操作是不用加锁的。
//2.延迟处理(延迟时间为LingerTime)
//  注意:为什么使用time.Timer而不是time.After,是因为time.After在for select中使用时,会发生内存泄露。
//3.自定义错误处理
//4.并发处理

package channelx

import (
	"runtime"
	"sync"
	"time"
)

// Represents the aggregator
type Aggregator struct {
	option         AggregatorOption
	wg             *sync.WaitGroup
	quit           chan struct{}
	eventQueue     chan interface{}
	batchProcessor BatchProcessFunc
}

// Represents the aggregator option
type AggregatorOption struct {
	BatchSize         int
	Workers           int
	ChannelBufferSize int
	LingerTime        time.Duration
	ErrorHandler      ErrorHandlerFunc
	Logger            Logger
}

// the func to batch process items
type BatchProcessFunc func([]interface{}) error

// the func to set option for aggregator
type SetAggregatorOptionFunc func(option AggregatorOption) AggregatorOption

// the func to handle error
type ErrorHandlerFunc func(err error, items []interface{}, batchProcessFunc BatchProcessFunc, aggregator *Aggregator)

// Creates a new aggregator
func NewAggregator(batchProcessor BatchProcessFunc, optionFuncs ...SetAggregatorOptionFunc) *Aggregator {
	option := AggregatorOption{
		BatchSize:  8,
		Workers:    runtime.NumCPU(),
		LingerTime: 1 * time.Minute,
	}

	for _, optionFunc := range optionFuncs {
		option = optionFunc(option)
	}

	if option.ChannelBufferSize <= option.Workers {
		option.ChannelBufferSize = option.Workers
	}

	return &Aggregator{
		eventQueue:     make(chan interface{}, option.ChannelBufferSize),
		option:         option,
		quit:           make(chan struct{}),
		wg:             new(sync.WaitGroup),
		batchProcessor: batchProcessor,
	}
}

// Try enqueue an item, and it is non-blocked
func (agt *Aggregator) TryEnqueue(item interface{}) bool {
	select {
	case agt.eventQueue <- item:
		return true
	default:
		if agt.option.Logger != nil {
			agt.option.Logger.Warnc("Aggregator", nil, "Event queue is full and try reschedule")
		}

		runtime.Gosched()

		select {
		case agt.eventQueue <- item:
			return true
		default:
			if agt.option.Logger != nil {
				agt.option.Logger.Warnc("Aggregator", nil, "Event queue is still full and %+v is skipped.", item)
			}
			return false
		}
	}
}

// Enqueue an item, will be blocked if the queue is full
func (agt *Aggregator) Enqueue(item interface{}) {
	agt.eventQueue <- item
}

// Start the aggregator
func (agt *Aggregator) Start() {
	for i := 0; i < agt.option.Workers; i++ {
		index := i
		go agt.work(index)
	}
}

// Stop the aggregator
func (agt *Aggregator) Stop() {
	close(agt.quit)
	agt.wg.Wait()
}

// Stop the aggregator safely, the difference with Stop is it guarantees no item is missed during stop
func (agt *Aggregator) SafeStop() {
	if len(agt.eventQueue) == 0 {
		close(agt.quit)
	} else {
		ticker := time.NewTicker(50 * time.Millisecond)
		for range ticker.C {
			if len(agt.eventQueue) == 0 {
				close(agt.quit)
				break
			}
		}
		ticker.Stop()
	}
	agt.wg.Wait()
}

func (agt *Aggregator) work(index int) {
	defer func() {
		if r := recover(); r != nil {
			if agt.option.Logger != nil {
				agt.option.Logger.Errorc("Aggregator", nil, "recover worker as bad thing happens %+v", r)
			}

			agt.work(index)
		}
	}()

	agt.wg.Add(1)
	defer agt.wg.Done()

	batch := make([]interface{}, 0, agt.option.BatchSize)
	lingerTimer := time.NewTimer(0)
	if !lingerTimer.Stop() {
		<-lingerTimer.C
	}
	defer lingerTimer.Stop()

loop:
	for {
		select {
		case req := <-agt.eventQueue:
			batch = append(batch, req)

			batchSize := len(batch)
			if batchSize < agt.option.BatchSize {
				if batchSize == 1 {
					lingerTimer.Reset(agt.option.LingerTime)
				}
				break
			}

			agt.batchProcess(batch)

			if !lingerTimer.Stop() {
				<-lingerTimer.C
			}
			batch = make([]interface{}, 0, agt.option.BatchSize)
		case <-lingerTimer.C:
			if len(batch) == 0 {
				break
			}

			agt.batchProcess(batch)
			batch = make([]interface{}, 0, agt.option.BatchSize)
		case <-agt.quit:
			if len(batch) != 0 {
				agt.batchProcess(batch)
			}

			break loop
		}
	}
}

func (agt *Aggregator) batchProcess(items []interface{}) {
	agt.wg.Add(1)
	defer agt.wg.Done()
	if err := agt.batchProcessor(items); err != nil {
		if agt.option.Logger != nil {
			agt.option.Logger.Errorc("Aggregator", err, "error happens")
		}

		if agt.option.ErrorHandler != nil {
			go agt.option.ErrorHandler(err, items, agt.batchProcessor, agt)
		} else if agt.option.Logger != nil {
			agt.option.Logger.Errorc("Aggregator", err, "error happens in batchProcess and is skipped")
		}
	} else if agt.option.Logger != nil {
		agt.option.Logger.Infoc("Aggregator", "%d items have been sent.", len(items))
	}
}

测试

//aggregator库的使用示例
package channelx

import (
	"errors"
	"sync"
	"testing"
	"time"
)

func TestAggregator_Basic(t *testing.T) {
	batchProcess := func(items []interface{}) error {
		t.Logf("handler %d items", len(items))
		return nil
	}

	aggregator := NewAggregator(batchProcess)

	aggregator.Start()

	for i := 0; i < 1000; i++ {
		aggregator.TryEnqueue(i)
	}

	aggregator.SafeStop()
}

func TestAggregator_Complex(t *testing.T) {
	wg := &sync.WaitGroup{}
	wg.Add(100)

	batchProcess := func(items []interface{}) error {
		defer wg.Add(-len(items))
		time.Sleep(20 * time.Millisecond)
		if len(items) != 4 {
			return errors.New("len(items) != 4")
		}
		return nil
	}

	errorHandler := func(err error, items []interface{}, batchProcessFunc BatchProcessFunc, aggregator *Aggregator) {
		if err == nil {
			t.FailNow()
		}
		t.Logf("Receive error, item size is %d", len(items))
	}

	aggregator := NewAggregator(batchProcess, func(option AggregatorOption) AggregatorOption {
		option.BatchSize = 4
		option.Workers = 2
		option.LingerTime = 8 * time.Millisecond
		option.Logger = NewConsoleLogger()
		option.ErrorHandler = errorHandler
		return option
	})

	aggregator.Start()

	for i := 0; i < 100; i++ {
		for !aggregator.TryEnqueue(i) {
			time.Sleep(10 * time.Millisecond)
		}
	}

	aggregator.SafeStop()
	wg.Wait()
}

func TestAggregator_LingerTimeOut(t *testing.T) {
	wg := &sync.WaitGroup{}
	wg.Add(100)

	batchProcess := func(items []interface{}) error {
		defer wg.Add(-len(items))
		if len(items) != 4 {
			t.Log("linger time out")
		}
		return nil
	}

	aggregator := NewAggregator(batchProcess, func(option AggregatorOption) AggregatorOption {
		option.BatchSize = 4
		option.Workers = 1
		option.LingerTime = 150 * time.Millisecond
		option.Logger = NewConsoleLogger()
		return option
	})

	aggregator.Start()

	for i := 0; i < 100; i++ {
		aggregator.TryEnqueue(i)
		time.Sleep(100 * time.Millisecond)
	}

	aggregator.SafeStop()
	wg.Wait()
}

标签:option,AggregatorOption,aggregator,items,Golang,func,time,Go,多线程
From: https://www.cnblogs.com/qcy-blog/p/18233487

相关文章

  • go语言方法之封装
        一个对象的变量或者方法如果对调用方是不可见的话,一般就被定义为“封装”。封装有时也叫信息隐藏,同时也是面向对象编程的一个方面。    Go语言只有一种控制可见性的手段:大写首字母的标识符会从定义它们的包中被导出,小写的字母则不会。这种限制包内成员的......
  • Golang初学:一些第三方包
    goversiongo1.22.1-- Web开发gorillahttps://gowebexamples.com中的示例有用到。 Routing(usinggorilla/mux)goget-ugithub.com/gorilla/mux-Sessions"github.com/gorilla/sessions"-Websockets$gogetgithub.com/gorilla/websocket- gingoget......
  • c# MongoDB.Driver 连接mongo 数据库失败的解决方法
    在连接数据库的时候连接本的的时候连接字符串是mongodb://localhost:端口号(默认27017)/数据库名(选填)用这种格式的连接字符串去做本地的测试是没问题的,但是连接服务器上面的数据库的时候就要加上用户名和密码,这个时候就需要在字符串的末尾添加后缀:mongodb://用户名:密码(都不......
  • 2024-06-05:用go语言,给定三个正整数 n、x 和 y, 描述一个城市中由 n 个房屋和 n 条街道
    2024-06-05:用go语言,给定三个正整数n、x和y,描述一个城市中由n个房屋和n条街道连接的情况。城市中存在一条额外的街道连接房屋x和房屋y。需要计算对于每个街道数(从1到n),有多少房屋对满足从一个房屋到另一个房屋经过的街道数正好为该街道数。在结果数组中,索引k对......
  • Exp-Golomb指数哥伦布码
    Exp-Golomb指数哥伦布码指数哥伦布码(Exponential-Golomb)属于熵编码,属于无损编码H.264中使用的是0阶指数哥伦布编码,编码方式如下:以待编码码号code_num=3为例:第一步:将code_num+1,即3+1=4第二步:将4写为二进制的形式:100第三步:计算100的比特个数为3,在100前面写(3-1)个0,得到......
  • golang使用OpenCC繁简转换
    https://github.com/longbridgeapp/openccmain.gopackagemainimport( "fmt" "log" "github.com/longbridgeapp/opencc")funcmain(){ s2t,err:=opencc.New("s2t") iferr!=nil{ log.Fatal(err) } in:=`......
  • 多线程实现爬取图片
    importosimportthreadingimportrequestsfromget_img_urlimportget_img_url#下载单张图片方法,方法入参为图片url地址和图片名称defdownload_image(url,filename):response=requests.get(url)withopen(filename,'wb')asf:f.write(respon......
  • CentOS-7.9 安装MongoDB6.0.6-server步骤
    下载解压wgethttps://repo.mongodb.org/yum/redhat/7/mongodb-org/6.0/x86_64/RPMS/mongodb-org-server-6.0.6-1.el7.x86_64.rpmsudorpm-ivhmongodb-org-server-6.0.6-1.el7.x86_64.rpm安装MongoDBsudomkdir-p/usr/local/mongodb/data/usr/local/mongodb/log/usr/lo......
  • 在 django 中使用窗口函数
    问题通过djangoORM实现如下写法的SQL语句:select*,row_number()over(partitionbyc1orderbyc2desc)asrnfrommy_table实现fromdjango.db.modelsimportF,Windowfromdjango.db.models.functionsimportRank,RowNumberMyModel.objects.annotate(rn=W......
  • Stable Diffusion|创意AI LOGO制作
    Logo是一家公司的视觉形象,它代表了公司的品牌形象、理念和文化,是公司形象识别系统的重要组成部分。如果想用Logo去衍生出更多的视觉海报、元素等等。可以尝试以下方法:1LOGO首先需要准备一张具有黑色填充的Logo图片,并确保其背景为白色。再就是需要安装“ControlNet”插......