背景
组内有很多项目都涉及复杂的任务流场景:
- 集群创建、删除等生命周期管理
- k8s 资源申请销毁
- ....
这些场景都有几个共同的特点:
- 流程耗时且步骤复杂,需要几十步操作,其中包含云资源申请、脚本执行、接口调用等,且相互存在依赖关系。
- 任务量随着业务增长而快速迭代,比如每个集群每天都会自动备份等任务需要调度执行。
- 运维难度大,需要标准的框架约束业务实现,并基于此框架提供建设标准的运维体系,尽最大可能支持 SLA
方案调研
在 go 体系内的各种方案
- 硬编码结合定时 Timer Worker 实现
虽然工作量较小,但是只能满足某个场景下的特定工作流,没有可复用性,暂不具备扩展性,无法建立标准。 - argo
基于 k8s,api-server 实现,当下多云体系、混合云体系的部署环境复杂,切换至 k8s 落地难度高,另外会导致服务依赖更多的组件,增加复杂性。 - Temporal(https://temporal.io/):
这套开源框架能力很多,大而全,但是由于在业务团队人力有限,没有精力钻研透彻,缺乏把控力,在出现问题时,难以 cover。 - Fastflow(https://github.com/Wenne/fastflow/tree/develop):
一个 基于golang协程、支持水平扩容的分布式高性能工作流框架,它仅仅是一个基础框架,而不是一个完整的产品,这意味着你可以将其很低成本融入到遗留项目而无需部署、依赖另一个项目,这既是它的优点也是缺点。
但是代码极少,在进行一定程度的魔改之后,完全足够支持中小体量的业务,特别适合缺乏基础架构团队的初创公司使用。
所以评估采用 Fastflow 作为 Workflow Engine,对于业务团队来说性价比极高
Fastflow 介绍
Fastflow 是什么?用一句话来定义它:一个 基于golang协程、支持水平扩容的分布式高性能工作流框架。 它具有以下特点:
- 易用性:工作流模型基于 DAG 来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能
- 高性能:得益于 golang 的协程 与 channel 技术,fastflow 可以在单实例上并行执行数百、数千乃至数万个任务
- 可观测性:fastflow 基于 Prometheus 的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。
- 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
- 可扩展性:fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)
- 轻量:它仅仅是一个基础框架,而不是一个完整的产品,这意味着你可以将其很低成本融入到遗留项目而无需部署、依赖另一个项目,这既是它的优点也是缺点——当你真的需要一个开箱即用的产品时(比如 airflow),你仍然需要少量的代码开发才能使用
架构
首先 fastflow 是一个分布式的框架,意味着你可以部署多个实例来分担负载,而实例被分为两类角色:
- Leader:此类实例在运行过程中只会存在一个,从 Worker 中进行选举而得出,它负责给 Worker 实例分发任务,也会监听长时间得不到执行的任务将其调度到其他节点等
- Worker:此类实例会存在复数个,它们负责解析 DAG 工作流并以 协程 执行其中的任务
而不同节点能够承担不同的功能,其背后是不同的 模块 在各司其职,不同节点所运行的模块如下图所示:
其中各个模块的职责如下:
- Keeper: 每个节点都会运行 负责注册节点到存储中,保持心跳,同时也会周期性尝试竞选 Leader,防止上任 Leader 故障后阻塞系统,这个模块同时也提供了 分布式锁 功能,我们也可以实现不同存储的 Keeper 来满足特定的需求,比如 Etcd or Zookeepper,目前支持的 Keeper 实现只有 Mongo
- Store: 每个节点都会运行 负责解耦 Worker 对底层存储的依赖,通过这个组件,我们可以实现利用 Mongo, Mysql 等来作为 fastflow 的后端存储,目前已经都魔改实现支持
- Parser:Worker 节点运行 负责监听分发到自己节点的任务,然后将其 DAG 结构重组为一颗 Task 树,并渲染好各个任务节点的输入,接下来通知 Executor 模块开始执行 Task
- Commander:每个节点都会运行 负责封装一些常见的指令,如停止、重试、继续等,下发到节点去运行
- Executor: Worker 节点运行 按照 Parser 解析好的 Task 树以 goroutine 运行单个的 Task
- Dispatcher:Leader节点才会运行 负责监听等待执行的 DAG,并根据 Worker 的健康状况均匀地分发任务
- WatchDog:Leader节点才会运行 负责监听执行超时的 Task 将其更新为失败,同时也会重新调度那些一直得不到执行的 DagInstance 到其他 Worker
魔改点
- 支持了 MySQL 引擎
- 支持了 tag 标签功能
- 支持了 watcher 角色
如何接入业务
基于 mysql 运行的 fastflow 插件接入业务实操如下
Interface define
首先在业务侧定义 interface 包装 Fastflow 框架提供的能力,供同步业务流程中调度使用
type Workflow interface {
Close()
// RegisterDag register a dag to workflow
RegisterDag(dag *entity.Dag, actions []run.Action) error
// RunDag start a dag instance
RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error)
// RunDagWithTags start a dag instance with tags
RunDagWithTags(dagId string, specVar map[string]string, tags map[string]string) (*entity.DagInstance, error)
// RetryDagIns retry a dag instance
RetryDagIns(dagInsId string, ops ...mod.CommandOptSetter) error
// CancelDagIns cancel a dag instance
CancelDagIns(dagInsId string, ops ...mod.CommandOptSetter) error
// ListDagIns list dag instances
ListDagIns(input *mod.ListDagInstanceInput) ([]*entity.DagInstance, error)
}
实现如下:
const watcherName = "watcher-255"
type workflowImpl struct {
// store the metadata interface of workflow
store mod.Store
// keeper the data interface of workflow
keeper mod.Keeper
// commander the command interface of workflow
commander mod.Commander
}
func NewWorkflow(isWorker bool) (workflow.Workflow, error) {
store, err := initStore(isWorker)
if err != nil {
return nil, err
}
keeper, err := initKeeper(isWorker)
if err != nil {
return nil, err
}
commander := initCommander()
return &workflowImpl{
store: store,
keeper: keeper,
commander: commander,
}, nil
}
func initKeeper(isWorker bool) (mod.Keeper, error) {
podName, err := queryPodName(isWorker)
if err != nil {
return nil, err
}
keeper := mysqlKeeper.NewKeeper(&mysqlKeeper.KeeperOption{
Key: *podName,
MySQLConfig: &mysql.Config{
Addr: fmt.Sprintf("%s:%d", service_config.Global().MysqlConfig.Host, service_config.Global().MysqlConfig.Port),
User: service_config.Global().MysqlConfig.User,
Passwd: service_config.Global().MysqlConfig.Password,
DBName: service_config.Global().MysqlConfig.Database,
},
MigrationSwitch: isWorker,
WatcherFlag: !isWorker,
})
if err := keeper.Init(); err != nil {
return nil, err
}
mod.SetKeeper(keeper)
return keeper, nil
}
func queryPodName(isWorker bool) (*string, error) {
if !isWorker {
return pointer.To(watcherName), nil
}
podName := os.Getenv("POD_NAME")
if podName == "" {
return nil, fmt.Errorf("POD_NAME environment variable is empty")
}
return &podName, nil
}
func initCommander() *mod.DefCommander {
commander := &mod.DefCommander{}
mod.SetCommander(commander)
return commander
}
func initStore(isWorker bool) (mod.Store, error) {
store := mysqlStore.NewStore(&mysqlStore.StoreOption{
MySQLConfig: &mysql.Config{
Addr: fmt.Sprintf("%s:%d", service_config.Global().MysqlConfig.Host, service_config.Global().MysqlConfig.Port),
User: service_config.Global().MysqlConfig.User,
Passwd: service_config.Global().MysqlConfig.Password,
DBName: service_config.Global().MysqlConfig.Database,
},
MigrationSwitch: isWorker,
})
if err := store.Init(); err != nil {
return nil, err
}
mod.SetStore(store)
return store, nil
}
func (impl *workflowImpl) Close() {
mod.SetKeeper(nil)
mod.SetStore(nil)
mod.SetCommander(nil)
impl.store.Close()
impl.keeper.Close()
}
func (impl *workflowImpl) RegisterDag(dag *entity.Dag, actions []run.Action) error {
fastflow.RegisterAction(actions)
oldDag, err := mod.GetStore().GetDag(dag.ID)
if errors.Is(err, data.ErrDataNotFound) {
if err := mod.GetStore().CreateDag(dag); err != nil {
return err
}
}
if oldDag != nil {
if err := mod.GetStore().UpdateDag(dag); err != nil {
return err
}
}
return nil
}
func (impl *workflowImpl) RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error) {
return impl.commander.RunDag(dagId, specVar)
}
func (impl *workflowImpl) RunDagWithTags(
dagId string, specVar map[string]string, tags map[string]string,
) (*entity.DagInstance, error) {
return impl.commander.RunDagWithTags(dagId, specVar, tags)
}
func (impl *workflowImpl) RetryDagIns(dagInsId string, ops ...mod.CommandOptSetter) error {
return impl.commander.RetryDagIns(dagInsId, ops...)
}
func (impl *workflowImpl) ListDagIns(input *mod.ListDagInstanceInput) ([]*entity.DagInstance, error) {
return impl.store.ListDagInstance(input)
}
func (impl *workflowImpl) CancelDagIns(dagInsId string, ops ...mod.CommandOptSetter) error {
return impl.commander.CancelDagIns(dagInsId, ops...)
}
如何初始化 Worker
这边魔改了插件实现,对于有需要将 fastflow 服务与同步业务拆分开的可以通过构造方法的 isWorker 变量进行隔离
- fastflow 服务使用 true
- 其他只需要触发 fastflow dag instance 的服务使用 false
Ps: 交互通过 mysql 作为介质交互。
func NewWorkflow(isWorker bool) (workflow.Workflow, error) {
store, err := initStore(isWorker)
if err != nil {
return nil, err
}
keeper, err := initKeeper(isWorker)
if err != nil {
return nil, err
}
commander := initCommander()
return &workflowImpl{
store: store,
keeper: keeper,
commander: commander,
}, nil
}
Workflow 推荐使用原则
├── workflow # use workflow engine to manage async task
│ ├── impl
│ │ └── workflow.go
│ ├── action
│ │ ├── cluster
│ │ │ ├── create
│ │ │ └── delete
│ ├── dag
│ │ ├── cluster
│ │ └── register.go
│ └── interface.go
所有 workflow 流程组织、业务实现的代码归档至 workflow 路径下
- workflow 根目录:
遵循依赖抽象,不依赖具体实现的原则, workflow 包下第一层只能放置 interface 以及常量定义文件,用户只需要阅读 interface 即可使用 wfs,另外方便无缝接入到 IOC 容器。 - workflow/impl 目录:
对于 fastflow 工作流插件的适配器层,在这里融合 fastflow 的一些能力已更好的适配业务逻辑。业务逻辑不应放置在该目录下,且对适配层加入逻辑可以评估看是否加入至 fastflow 插件 repo 更为合适。 - workflow/dag 目录
按照业务领域模型进行分类,里面维护改业务下所有的流程 template,用于描述 DAG - workflow/action 目录:
按照业务模型进行分类,里面维护业务下所有 dag template 需要使用的 template,内部的分类建议一个 dag template 放置在同一目录,可以复用的 action 可以向上抽取一层。