首页 > 其他分享 >个人随笔 —— 基于 go 语言实现的轻量化 workflow 分布式引擎插件

个人随笔 —— 基于 go 语言实现的轻量化 workflow 分布式引擎插件

时间:2023-07-21 12:22:58浏览次数:44  
标签:插件 return err nil workflow go string mod

背景

组内有很多项目都涉及复杂的任务流场景:

  • 集群创建、删除等生命周期管理
  • k8s 资源申请销毁
  • ....

这些场景都有几个共同的特点:

  1. 流程耗时且步骤复杂,需要几十步操作,其中包含云资源申请、脚本执行、接口调用等,且相互存在依赖关系。
  2. 任务量随着业务增长而快速迭代,比如每个集群每天都会自动备份等任务需要调度执行。
  3. 运维难度大,需要标准的框架约束业务实现,并基于此框架提供建设标准的运维体系,尽最大可能支持 SLA

方案调研

在 go 体系内的各种方案

  • 硬编码结合定时 Timer Worker 实现
    虽然工作量较小,但是只能满足某个场景下的特定工作流,没有可复用性,暂不具备扩展性,无法建立标准。
  • argo
    基于 k8s,api-server 实现,当下多云体系、混合云体系的部署环境复杂,切换至 k8s 落地难度高,另外会导致服务依赖更多的组件,增加复杂性。
  • Temporal(https://temporal.io/):
    这套开源框架能力很多,大而全,但是由于在业务团队人力有限,没有精力钻研透彻,缺乏把控力,在出现问题时,难以 cover。
  • Fastflow(https://github.com/Wenne/fastflow/tree/develop):
    一个 基于golang协程、支持水平扩容的分布式高性能工作流框架,它仅仅是一个基础框架,而不是一个完整的产品,这意味着你可以将其很低成本融入到遗留项目而无需部署、依赖另一个项目,这既是它的优点也是缺点。
    但是代码极少,在进行一定程度的魔改之后,完全足够支持中小体量的业务,特别适合缺乏基础架构团队的初创公司使用。

所以评估采用 Fastflow 作为 Workflow Engine,对于业务团队来说性价比极高

Fastflow 介绍

Fastflow 是什么?用一句话来定义它:一个 基于golang协程、支持水平扩容的分布式高性能工作流框架。 它具有以下特点:

  • 易用性:工作流模型基于 DAG 来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能
  • 高性能:得益于 golang 的协程 与 channel 技术,fastflow 可以在单实例上并行执行数百、数千乃至数万个任务
  • 可观测性:fastflow 基于 Prometheus 的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。
  • 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
  • 可扩展性:fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)
  • 轻量:它仅仅是一个基础框架,而不是一个完整的产品,这意味着你可以将其很低成本融入到遗留项目而无需部署、依赖另一个项目,这既是它的优点也是缺点——当你真的需要一个开箱即用的产品时(比如 airflow),你仍然需要少量的代码开发才能使用

架构

img
首先 fastflow 是一个分布式的框架,意味着你可以部署多个实例来分担负载,而实例被分为两类角色:

  • Leader:此类实例在运行过程中只会存在一个,从 Worker 中进行选举而得出,它负责给 Worker 实例分发任务,也会监听长时间得不到执行的任务将其调度到其他节点等
  • Worker:此类实例会存在复数个,它们负责解析 DAG 工作流并以 协程 执行其中的任务

而不同节点能够承担不同的功能,其背后是不同的 模块 在各司其职,不同节点所运行的模块如下图所示:
img
其中各个模块的职责如下:

  • Keeper: 每个节点都会运行 负责注册节点到存储中,保持心跳,同时也会周期性尝试竞选 Leader,防止上任 Leader 故障后阻塞系统,这个模块同时也提供了 分布式锁 功能,我们也可以实现不同存储的 Keeper 来满足特定的需求,比如 Etcd or Zookeepper,目前支持的 Keeper 实现只有 Mongo
  • Store: 每个节点都会运行 负责解耦 Worker 对底层存储的依赖,通过这个组件,我们可以实现利用 Mongo, Mysql 等来作为 fastflow 的后端存储,目前已经都魔改实现支持
  • Parser:Worker 节点运行 负责监听分发到自己节点的任务,然后将其 DAG 结构重组为一颗 Task 树,并渲染好各个任务节点的输入,接下来通知 Executor 模块开始执行 Task
  • Commander:每个节点都会运行 负责封装一些常见的指令,如停止、重试、继续等,下发到节点去运行
  • Executor: Worker 节点运行 按照 Parser 解析好的 Task 树以 goroutine 运行单个的 Task
  • Dispatcher:Leader节点才会运行 负责监听等待执行的 DAG,并根据 Worker 的健康状况均匀地分发任务
  • WatchDog:Leader节点才会运行 负责监听执行超时的 Task 将其更新为失败,同时也会重新调度那些一直得不到执行的 DagInstance 到其他 Worker

魔改点

  • 支持了 MySQL 引擎
  • 支持了 tag 标签功能
  • 支持了 watcher 角色

如何接入业务

基于 mysql 运行的 fastflow 插件接入业务实操如下

Interface define

首先在业务侧定义 interface 包装 Fastflow 框架提供的能力,供同步业务流程中调度使用

type Workflow interface {
        Close()
        // RegisterDag register a dag to workflow
        RegisterDag(dag *entity.Dag, actions []run.Action) error
        // RunDag start a dag instance
        RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error)
        // RunDagWithTags start a dag instance with tags
        RunDagWithTags(dagId string, specVar map[string]string, tags map[string]string) (*entity.DagInstance, error)
        // RetryDagIns retry a dag instance
        RetryDagIns(dagInsId string, ops ...mod.CommandOptSetter) error
        // CancelDagIns cancel a dag instance
        CancelDagIns(dagInsId string, ops ...mod.CommandOptSetter) error
        // ListDagIns list dag instances
        ListDagIns(input *mod.ListDagInstanceInput) ([]*entity.DagInstance, error)
}

实现如下:

const watcherName = "watcher-255"

type workflowImpl struct {
	// store the metadata interface of workflow
	store mod.Store
	// keeper the data interface of workflow
	keeper mod.Keeper
	// commander the command interface of workflow
	commander mod.Commander
}

func NewWorkflow(isWorker bool) (workflow.Workflow, error) {
	store, err := initStore(isWorker)
	if err != nil {
		return nil, err
	}

	keeper, err := initKeeper(isWorker)
	if err != nil {
		return nil, err
	}

	commander := initCommander()

	return &workflowImpl{
		store:     store,
		keeper:    keeper,
		commander: commander,
	}, nil
}

func initKeeper(isWorker bool) (mod.Keeper, error) {
	podName, err := queryPodName(isWorker)
	if err != nil {
		return nil, err
	}
	keeper := mysqlKeeper.NewKeeper(&mysqlKeeper.KeeperOption{
		Key: *podName,
		MySQLConfig: &mysql.Config{
			Addr:   fmt.Sprintf("%s:%d", service_config.Global().MysqlConfig.Host, service_config.Global().MysqlConfig.Port),
			User:   service_config.Global().MysqlConfig.User,
			Passwd: service_config.Global().MysqlConfig.Password,
			DBName: service_config.Global().MysqlConfig.Database,
		},
		MigrationSwitch: isWorker,
		WatcherFlag:     !isWorker,
	})
	if err := keeper.Init(); err != nil {
		return nil, err
	}
	mod.SetKeeper(keeper)
	return keeper, nil
}

func queryPodName(isWorker bool) (*string, error) {
	if !isWorker {
		return pointer.To(watcherName), nil
	}

	podName := os.Getenv("POD_NAME")
	if podName == "" {
		return nil, fmt.Errorf("POD_NAME environment variable is empty")
	}
	return &podName, nil
}

func initCommander() *mod.DefCommander {
	commander := &mod.DefCommander{}
	mod.SetCommander(commander)
	return commander
}

func initStore(isWorker bool) (mod.Store, error) {
	store := mysqlStore.NewStore(&mysqlStore.StoreOption{
		MySQLConfig: &mysql.Config{
			Addr:   fmt.Sprintf("%s:%d", service_config.Global().MysqlConfig.Host, service_config.Global().MysqlConfig.Port),
			User:   service_config.Global().MysqlConfig.User,
			Passwd: service_config.Global().MysqlConfig.Password,
			DBName: service_config.Global().MysqlConfig.Database,
		},
		MigrationSwitch: isWorker,
	})
	if err := store.Init(); err != nil {
		return nil, err
	}
	mod.SetStore(store)
	return store, nil
}

func (impl *workflowImpl) Close() {
	mod.SetKeeper(nil)
	mod.SetStore(nil)
	mod.SetCommander(nil)
	impl.store.Close()
	impl.keeper.Close()
}

func (impl *workflowImpl) RegisterDag(dag *entity.Dag, actions []run.Action) error {
	fastflow.RegisterAction(actions)
	oldDag, err := mod.GetStore().GetDag(dag.ID)
	if errors.Is(err, data.ErrDataNotFound) {
		if err := mod.GetStore().CreateDag(dag); err != nil {
			return err
		}
	}
	if oldDag != nil {
		if err := mod.GetStore().UpdateDag(dag); err != nil {
			return err
		}
	}
	return nil
}

func (impl *workflowImpl) RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error) {
	return impl.commander.RunDag(dagId, specVar)
}

func (impl *workflowImpl) RunDagWithTags(
	dagId string, specVar map[string]string, tags map[string]string,
) (*entity.DagInstance, error) {
	return impl.commander.RunDagWithTags(dagId, specVar, tags)
}

func (impl *workflowImpl) RetryDagIns(dagInsId string, ops ...mod.CommandOptSetter) error {
	return impl.commander.RetryDagIns(dagInsId, ops...)
}

func (impl *workflowImpl) ListDagIns(input *mod.ListDagInstanceInput) ([]*entity.DagInstance, error) {
	return impl.store.ListDagInstance(input)
}

func (impl *workflowImpl) CancelDagIns(dagInsId string, ops ...mod.CommandOptSetter) error {
	return impl.commander.CancelDagIns(dagInsId, ops...)
}

如何初始化 Worker

这边魔改了插件实现,对于有需要将 fastflow 服务与同步业务拆分开的可以通过构造方法的 isWorker 变量进行隔离

  • fastflow 服务使用 true
  • 其他只需要触发 fastflow dag instance 的服务使用 false

Ps: 交互通过 mysql 作为介质交互。

func NewWorkflow(isWorker bool) (workflow.Workflow, error) {
	store, err := initStore(isWorker)
	if err != nil {
		return nil, err
	}

	keeper, err := initKeeper(isWorker)
	if err != nil {
		return nil, err
	}

	commander := initCommander()

	return &workflowImpl{
		store:     store,
		keeper:    keeper,
		commander: commander,
	}, nil
}

Workflow 推荐使用原则

├── workflow # use workflow engine to manage async task
│   ├── impl
│   │   └── workflow.go
│   ├── action
│   │   ├── cluster
│   │   │   ├── create
│   │   │   └── delete
│   ├── dag
│   │   ├── cluster
│   │   └── register.go
│   └── interface.go

所有 workflow 流程组织、业务实现的代码归档至 workflow 路径下

  • workflow 根目录:
    遵循依赖抽象,不依赖具体实现的原则, workflow 包下第一层只能放置 interface 以及常量定义文件,用户只需要阅读 interface 即可使用 wfs,另外方便无缝接入到 IOC 容器。
  • workflow/impl 目录:
    对于 fastflow 工作流插件的适配器层,在这里融合 fastflow 的一些能力已更好的适配业务逻辑。业务逻辑不应放置在该目录下,且对适配层加入逻辑可以评估看是否加入至 fastflow 插件 repo 更为合适。
  • workflow/dag 目录
    按照业务领域模型进行分类,里面维护改业务下所有的流程 template,用于描述 DAG
  • workflow/action 目录:
    按照业务模型进行分类,里面维护业务下所有 dag template 需要使用的 template,内部的分类建议一个 dag template 放置在同一目录,可以复用的 action 可以向上抽取一层。

标签:插件,return,err,nil,workflow,go,string,mod
From: https://www.cnblogs.com/BlueMountain-HaggenDazs/p/17570971.html

相关文章

  • GO语言配置管理神器-Viper中文教程
    Viper是适用于Go应用程序的完整配置解决方案。它被设计用于在应用程序中工作,并且可以处理所有类型的配置需求和格式。ViperViper是适用于Go应用程序的完整配置解决方案。它被设计用于在应用程序中工作,并且可以处理所有类型的配置需求和格式。鉴于viper库本身的README已经写的十......
  • Eclipse如何安装JavaEE插件
     Eclipse是Java编程领域最常用的开发工具之一,它提供了丰富的插件来支持各种编程语言和框架。对于JavaEE开发者来说,安装JavaEE插件是非常必要的,因为它可以为我们提供更多的工具和功能,使我们的开发工作更加高效和便捷。本文将一步步教你如何在Eclipse中安装JavaEE插件。首先,我们......
  • MySQL8.0安装Mcafee审计插件,开启审计功能
    最近根据安全审计要求,数据库需要开启审计日志功能社区版本的MySQL8没有官方的审计日志插件,我们可以选择Mcafee提供的插件进行安装1、下载插件首先需要根据数据库版本选择对应的插件;https://github.com/mcafee-enterprise/mysql-audit注意:这里需要与数据库版本严格对应,笔者尝......
  • go 结构体嵌套interface
    packagemainimport"fmt"//结构体嵌套接口,可以在结构体绑定的方法直接实现接口中的方法,直接调用接口中的方法typeaainterface{ a() b()}typeworldstruct{ aa Ageint}func(hworld)a(){ fmt.Println("helloa方法")}func(hworld)b(){ fmt.Println("hello......
  • 插件模式架构图
    实现插件模式架构图前言在软件开发中,插件模式是一种常用的架构设计模式,它可以使代码具备可扩展性和灵活性,允许在不修改原有代码的情况下添加新功能或改变现有功能。本文将介绍如何实现插件模式架构图,帮助刚入行的开发者快速上手。整体流程下表展示了实现插件模式架构图的整体流......
  • 如何理解小程序插件?微信及支付宝官方详解
    一、小程序插件功能介绍1、如何理解插件插件,英文名可称作“Plug-in、Plugin、add-in、addin、add-on、addon或extension”,是一个依附于主程序的辅助程序,透过和主程序的互动,用来代替主程序需要增加一些所需的特定功能。更通俗的来讲,就类似机器的零件,可以“插入”的形式添加到程......
  • 前后端分离实现注册+登录(Vue3.0 + Django3.2)
    博客地址:https://www.cnblogs.com/zylyehuo/一、使用vite+webstorm搭建Vue环境,构建前端1、结构树2、main.jsimport{createApp}from'vue'//import'./style.css'importAppfrom'./App.vue'importrouterfrom"./utils/router";......
  • Django基本数据库操作
    Django基本数据库操作@目录Django基本数据库操作......
  • springboot插件式开发 springboot-plugin-framework-v2.4.5使用文档
    功能介绍简介介绍此框架可在SpringBoot项目上开发出用于扩展项目的插件,可在插件模块中单独定义接口、静态文件、mybatis-xml等扩展功能。核心功能插件配置式插拔于springboot项目。在springboot上可以进行插件式开发,扩展性极强,可以针对不同项目开发不同插件,进行不同插件jar包......
  • 用rsync来同步mongodb的数据,可行吗?
    用rsync来同步mongodb的数据,可行吗?当涉及到数据的备份和同步时,rsync是一个非常常用的工具。那么,我们可以使用rsync来同步mongodb的数据吗?答案是肯定的。在本文中,我将介绍如何使用rsync来备份和同步mongodb的数据,并提供相应的代码示例。首先,让我们来了解一下rsync。rsync是一个......