首页 > 其他分享 >用go实现master/worker模型

用go实现master/worker模型

时间:2023-03-16 16:15:27浏览次数:37  
标签:tasks func worker Worker Task master context go

package schedule

import (
	"container/list"
	"context"
	"fmt"
	"sync"
)

/**********************************\
	Task任务接口
\**********************************/

type Task interface {
	// Process 任务处理逻辑. 通过ctx查知worker是否关闭.
	// 若有业务侧的"cancel/ack"关闭,请在process封装!
	Process(ctx context.Context) error
}

/**********************************\
	Worker结构
\**********************************/

type Worker struct {
	wctx context.Context
	wcnf context.CancelFunc
}

func NewWorker(parent context.Context, tasks chan Task) *Worker {
	worker := new(Worker)
	worker.wctx, worker.wcnf = context.WithCancel(parent)
	go worker.consume(tasks)
	return worker
}

func (w *Worker) Cancel() {
	w.wcnf()
}

func (w *Worker) consume(tasks chan Task) {
	for {
		select {
		case <-w.wctx.Done():
			return
		default:
			task, ok := <-tasks
			if !ok {
				return
			}
			if !w.process(task) {
				// 未处理成功需返还tasks
				tasks <- task
			}
		}
	}
}

func (w *Worker) process(task Task) bool {
	defer func() {
		if prr := recover(); prr != nil {
			// 集中处理panic
			fmt.Printf("worker process panic: %v", prr)
			// 最好堆栈一块打印方便排查
		}
	}()
	// 二次校验worker是否结束
	select {
	case <-w.wctx.Done():
		return false
	default:
		// process task
		err := task.Process(w.wctx)
		if err != nil {
			// 集中处理error
			fmt.Printf("worker process error: %v", err)
		}
		return true
	}
}

/**********************************\
	Master结构
\**********************************/

// Master 管理者数据结构
type Master struct {
	mutex sync.Mutex // 同步控制
	works *list.List // 工作者列表
	tasks chan Task  // 任务管道
	mctx  context.Context
	mcnf  context.CancelFunc
}

func NewMaster(parent context.Context, tasks chan Task, works int) *Master {
	master := new(Master)
	master.works = list.New()
	master.tasks = tasks
	master.mctx, master.mcnf = context.WithCancel(parent)
	master.Stretch(works)
	return master
}

func (m *Master) Stretch(tasks int) {

	m.mutex.Lock()
	defer m.mutex.Unlock()

	for m.works.Len() < tasks {
		// 新增
		work := NewWorker(m.mctx, m.tasks)
		m.works.PushBack(work)
	}

	for m.works.Len() > tasks {
		// 缩减
		elem := m.works.Back()
		if work, ok := elem.Value.(*Worker); ok {
			work.Cancel()
		}
		m.works.Remove(elem)
	}
}

func (m *Master) Cancel() {
	m.mcnf()
}

标签:tasks,func,worker,Worker,Task,master,context,go
From: https://www.cnblogs.com/zolo/p/17222976.html

相关文章

  • 数据库同步,MongoDB、ES,寻求免费的可写入型的ODBC驱动
    大家好,我们开发的数据库同步软件DBSync,能同步多种数据库,支持增量同步、异构同步。但对于NoSQL的MongoDB,Elasticsearch,一直有个问题:官方的ODBC驱动只能读数据库,不能写数据......
  • 0007 ALGO1000-kAc给糖果你吃
    试题算法训练kAc给糖果你吃比较简单,排序后拿数量多的importjava.util.Arrays;importjava.util.Scanner;/***@authorHuaWang135608*@date2023.03.1613:1......
  • go-使用jwt生成令牌
    /*jwt-生成token测试1.首先需要定义一个结构体,这个结构体选哟继承jwt.StandardClain,s结构体2.定义key和过期时间3.实例化存储token的结构体4.使用指定的......
  • Expectation-Maximization algorithm
    1.IntroductionTheExpectation-Maximization(EM)algorithmisawidelyusedstatisticalalgorithmformaximumlikelihoodestimationincaseswherethereismi......
  • go微服务开发:go-zero入门教程
    以下内容,参考了go-zero官方文档,是对官方文档的进阶指南章节的梳理汇总。go-zero的进阶指南,请参考https://go-zero.dev/cn/docs/advance/business-dev第一步:下载并解压go......
  • 容器化docker部署nginx代理的go-web应用
    通常我们的web应用程序部署的时候不会直接暴露,通过nginx反向代理,一是隐藏真实后端,二是通过nginx的epoll网络IO多路复用,获取高性能的网络访问。今天我们分享个通过nginx代......
  • 10、Master高可用nginx+keepalived布署
    1、前言#这里因为演示,用master2、master3主机做为高可用布署2、keepalived2.1、安装yuminstallgccgcc-c++makeautomakeautoconflibtoolpcrepcre-develzl......
  • Golang-微服务-紧急升级-缓存
    1.背景API服务器timewait一直下不去尝试阿里云优化方案,修改内核配置,也不管用前端表现为:页面访问卡顿页面丢失(接口504)Postmanstart_trasfer参数忽高忽......
  • android的google三方登录一直返回10
    标题:com.Google.android.gms.common.api.apiException:10:为何一直报错,这个问题的来源是可参考这里的状态码:googleStatusCodes配置其实都很简单,可参考:googleAndroid登录......
  • 4、单个Master配置
    1、初始化Master1.1、初始化命令kubeadminit--kubernetes-version=1.25.7\--apiserver-advertise-address=192.168.10.26\--service-cidr=10.96.0.0/12\--pod-n......