搜狗workflow异步调度框架
参考 https://www.zhihu.com/column/c_1456603443661643776
来源 https://zhuanlan.zhihu.com/p/172485495
虽然我更新本博客比较慢,但是github上的workflow项目本身在持续更新中~来看看和上一篇相比我们都改了什么吧:
- 加上了windows分支,以srpc性能看,网络还能快20%以上!
- 加上了英文Readme,大家可以愉快地分享给歪果小伙伴了
- tutorial里更改了展示使用内部组件waitgroup来等待的用法,让你做C++开发也能感受到GO一般的丝滑~
这里附上我们开源了一周的github地址:sogou/workflow
和这个系列的上一篇:1412:搜狗workflow异步调度框架 - 基本介绍篇
今天我还是要抱着跟大家学习的心态,迫不及待要从整体的角度来写一下workflow的架构。并且郑重声明一下,本篇只是本人作为开发者之一、想尽快和大家学习交流而写的个人梳理,并非我们项目的官方推广。后续如果要出官方介绍,可能会跟本篇的组织表达方式与立足点有所出入~
架构设计必然是从底向上开始,所以我们直接从kernel目录的设计思路开始聊。
1. 封装调度器
上次说到,我们作为异步调度框架,目前支持的异步调度资源分为6种:
这里可以举大家平时接触最多的网络通信框架和计算调度框架作为重点讲解一下。
我们需要封装调度器去操作这些系统资源,简单来说就是操作一批网络连接或者说线程。注意这里说的“操作”,也就是说调度器远不止连接池和线程池那么简单,我们要做的事情是:
- 包含与管理资源池
- 实现如何对一批连接尽可能高性能地响应其读写、如何尽可能快且尽可能通用地给出一个足够灵活的机制去让各线程执行各种计算
- 提供请求接口给上层使用
我们以线程执行器Executor为例来看看具体怎么做。以上第二点尽可能快又足够灵活的机制,就是我们设计的ExecQueue,在以下代码得以体现:
class Executor
{
public:
// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中
int request(ExecSession *session, ExecQueue *queue);
private:
// 执行器和系统资源,是一个包含关系
thrdpool_t *thrdpool;
};
2. 封装调度的基本单位
构思完了调度器,我们需要构思一下被调度的基本单位。
对应每种可以调度对象的系统接口,我们必须封装自己的结构,作为每次与系统资源交互的基本单位,通过调度器提供的请求接口,扔到调度器里被调度。
具体来说,这显然是一次网络交互、或者一次线程需要执行的计算任务。然后每个基本单位上,可以有上下文、供子类做具体实现的接口/函数指针等等。
我们以网络交互为例:
class CommSession
{
// 往连接上要发的数据
virtual CommMessageOut *message_out() = 0;
// 连接上收到数据流,如何切下一个数据包
virtual CommMessageIn *message_in() = 0;
// 本次网络事件被响应的时机
virtual void handle(int state, int error) = 0;
…
// 一般我们的上下文是存在派生类上
};
阶段性总结一下,写到这里,我们就可以愉快地做网络收发或者线程调度了~这些模块都已经是可以单独拆出来用的。
作为框架,我们基于上述的多种调度器和调度单位,可以给用户封装各种具体网络协议和计算算法。但是这还不是我们的串并行任务系统的核心价值。
3. 任务流
我们想要实现任务流(无论是DAG还是串并连),意味着我们需要一套机制去按顺序触发具体的子任务执行、并接管其执行完之后要做的事情。实现的方式有很多,我们做了一套子任务系统来满足抽象的任务调度,而这个任务本身是网络通信还是计算,都不重要。
由于我们的子任务是要给异步框架用的,所以每个任务你不能只有一个接口:execute()之类,我们必须有开始执行的dispatch()和执行完毕的done()两个需要实现,而任务流系统本身只是做按顺序调起你的开始和结束这两个接口的事情。
class SubTask
{
// 子任务被调起的时机
virtual void dispatch() = 0;
// 子任务执行完成的时机
virtual SubTask *done() = 0;
// 内部实现,决定了任务流走向
void subtask_done();
…
};
关于任务流,之后会详细介绍其概念,有做类似事情的小伙伴欢迎多多交流互相学习,我也会多翻阅一些资料再写,这是非常非常有意思的一个主题。
4.可以被任务流执行的基本调度单位
让每个基本单位可以被任务流执行下去,并且被某些调度器调度,做法很简单,从执行单位和子任务共同派生出来就可以了:
class CommRequest : public SubTask, public CommSession
{
// 我们来实现以下SubTask的dispatch接口
// 这个网络任务被调起,我们要做的事情,就是发送网络请求
// 这个通过调用具体通信器的request去发消息
void dispatch()
{
if (this->scheduler->request(this, this->object, this->wait_timeout,
&this->target) < 0)
{
…;
}
}
// 然后是CommSession的handle接口
// 这个接口的意思是网络事件被响应的时机
// 假设我们作为一个client,发送完请求后,我们关注的事件是这个fd上的写事件
// 所以这里被调起意味着有回复了(当然也可能超时
void handle(int state, int error)
{
// 处理各种错误
…
// 我们在这里调用一下Subtask的subtask_done,让后续任务本身得以执行下去
this->subtask_done();
}
};
学习委员划重点:每一个可以被调度的基本单位,想同时具有子任务的属性,则必须子类里执行这个subtask_done(),以此打通任务流。
5.基本任务
我们目前为止,介绍的都是kernel的内容,现在我们来接触一下更为具体的概念:任务。
我们需要一层infrastructure的基本任务层,对接每一种具体的系统资源,比如:
ExecRequest封装出来的任务是个WFThreadTask,而CommRequest封装出来应该是个WFNetworkTask。这里可以看到,资源和任务都是一一对应的,这是目前个人认为框架内部做得比较好的抽象之一。
继续以网络请求看看,派生出来的任务应该长怎么样。看过我们的tutorial的小伙伴应该知道(前面文章也介绍过),我们有任务流Series的概念。所以这一层的基本任务,都需要做的事情是:
- 管理好所在的series(没有的话,默默创建一个,这样别人才能串到你后边~
- 异步所需要的上下文
- 异步所需要的回调函数
template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
// 这个user_data是给开发者用的
void *user_data;
// 这是网络任务本身的上下文:要发送的请求和要接收的回复
REQ req;
RESP resp;
// 回调函数
std::function<void (WFNetworkTask<REQ, RESP> *)> callback;
};
6.用户接口
刚才看到的已经是具体资源所对应的任务了~那么,我们在这些资源上,可以做什么?
- 对于网络任务,我们需要做协议;
- 对于计算任务,我们需要写算法;
网络任务的协议刚才看到,是两个模版类型,即我们通过某种特化就可以指定一种具体协议的网络任务了(显然没有那么简单!但是先这样介绍哈哈哈^_^
using WFHttpTask = WFNetworkTask<protocol::HttpRequest,
protocol::HttpResponse>;
using http_callback_t = std::function<void (WFHttpTask *)>;
using WFRedisTask = WFNetworkTask<protocol::RedisRequest,
protocol::RedisResponse>;
using redis_callback_t = std::function<void (WFRedisTask *)>;
using WFMySQLTask = WFNetworkTask<protocol::MySQLRequest,
protocol::MySQLResponse>;
using mysql_callback_t = std::function<void (WFMySQLTask *)>;
using __WFKafkaTask = WFNetworkTask<protocol::KafkaRequest,
protocol::KafkaResponse>;
using __kafka_callback_t = std::function<void (__WFKafkaTask *)>;
然后,因为我们是不依赖任何第三方协议库的,所以这些协议都是亲手解析的~写好了具体的HttpMessage,我们就可以特化出一个Http任务了。
所有用户通过工厂创建出来的任务,拿到的类型都在图二的User Interface层。
7.具体实现
每种资源所对应的做法都是非常对称的,让我们可以看到计算机世界的美,和巴赫的平均律一样精妙~
- 网络对应的是协议、请求、回复
- 计算对应的则是算法、输入、输出
(P.S. 这里其实我认为“协议”应该改成“通信方式”哈,但是workflow是个成熟的框架了,它自己认为这里应该是“协议”与算法对称)
这里以算法任务来讲一下吧。我们一个排序算法,用户拿到的是个WFSortTask:
// 排序任务是线程的排序算法的特化,输入输出
template<typename T>
using WFSortTask = WFThreadTask<algorithm::SortInput<T>,
algorithm::SortOutput<T>>;
template<typename T>
using sort_callback_t = std::function<void (WFSortTask<T> *)>;
// 算法工厂
class WFAlgoTaskFactory
{
public:
// workflow的所有任务都是要由工厂来create的~
template<typename T, class CB = sort_callback_t<T>>
static WFSortTask<T> *create_sort_task(const std::string& queue_name,
T *first, T *last,
CB callback);
…
// 这个接口可以创建一个具体用来做并行排序算法的任务
template<typename T, class CB = sort_callback_t<T>>
static WFSortTask<T> *create_psort_task(const std::string& queue_name,
T *first, T *last,
CB callback);
…
};
但是,具体到底是创建一个单一的排序任务,还是我可以并行排序,是由调用create_sort_task()
还是create_psort_task()
接口来决定的。这是我们设计框架时谢爷说得最多的一句话:
“一切都是行为派生!”
(P.S. 第二多的话有可能是"颖欣你这里写得不对啊"。。。anyway…
我们就可以看到图二,最上边的这层Implementation,是内部针对不同api所生成的具体实现,但是返回给用户的都是同一类task,这样用户在使用callback的时候,都是同一种参数,比如排序任务,大家都是:
std::function<void (WFSortTask<T> *)>;
8. 进程级资源管理
回到图一最上层: Instance Manager。
刚才说到的执行器,请求接口是把一个要执行的任务扔到一个队列里。这个队列是在哪里创建的呢?
我们全局会有进程级的一些资源,一般是使用单例模式,用户使用到的时候才会创建对应的资源管理器。上周有热心小伙伴提到过各种资源的纵向拆分问题,方便用户只用某种资源的异步调度,但是由于本身如果只用到网络,那么计算调度器是不会被创建的,所以一般来说编译到一起也没问题。如果小伙伴想编译时就拆开,目前来说还得自己改cmake~
标签:搜狗,异步,workflow,调度,接口,任务,using,我们 From: https://www.cnblogs.com/lsgxeva/p/16717683.html