分布式计算框架Ray介绍
当我们要构建一个涉及大规模数据处理或者复杂计算的应用,传统的方式是使用现成的大数据框架,例如 Apache Flink 和 Apache Spark。这些系统提供的API通常基于某种特定的计算范式(例如DataStream、DataSet),要求用户基于这些特定的计算范式实现应用逻辑。对于传统的数据清洗、数据分析等应用,这种用法能够很好地适用。但是,随着分布式应用的逻辑越来越复杂(例如分布式机器学习应用),许多应用的逻辑并不能直接套用现有的计算范式。在这种情况下,开发者如果想要细粒度地控制系统中的任务流,就需要自己从头编写一个分布式应用。但是现实中,开发一个分布式应用并不简单。除了应用本身的代码逻辑,我们还需要处理许多分布式系统中常见的难题,例如:分布式组件通信、服务部署、服务发现、监控、异常恢复等。处理这些问题,通常需要开发者在分布式系统领域有较深的经验,否则很难保证系统的性能和健壮性。为了简化分布式编程,Ray提供了一套简单、通用的分布式编程API,屏蔽了分布式系统中的这些常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式系统。
云原生场景下如何利用Ray快速构建分布式系统Ray 的通用性体现在哪里呢?Ray的设计思想是不绑定任何计算模式,把单机编程中的基本概念分布式化。从API 的设计可以看出,Ray并不是一个大数据系统,尤其是Ray Core这一层没有任何大数据相关的算子,而是从单机编程的基本概念进行分布式化的。具体如何分布式化?我们在单机编程中经常用到两个非常核心的概念,一个叫Function,一个叫Class,在面性对象的编程语言里面,基本上大家会围绕这两个概念进行代码开发,在Ray中会将这两个基本概念进行分布式化,对应到分布式系统就叫Task和Actor。
设计
Ray 的框架中最为重要的两个部分是 Ray Core 和 Ray AIR:
- Ray Core 是底层的分布式的计算框架,使用基于 actor 模型来实现的一套计算框架,它可以将 Python 的一个 Class 或者一个 Function 转成分布式的 actor 和 task,在所有的机器上分布式地进行运行,并且 tasks/actor 之间可以通过分布式的对象存储能力来达到共享的能力。
-
通用分布式编程API:无状态计算单元Task
- Task是对单机编程中的Function进行分布式化,是一个无状态的计算单元。Ray可以把一个任意的Python函数或Java静态方法转换成一个Task。在这个过程中,Ray会在一个远程进程中执行这个函数。并且这个过程是异步的,这意味着我们可以并行地执行多个Task。PS:和python线程、进程的异步执行很像。
-
- `square`是一个普通的Python函数,`@ray.remote`装饰器表示我们可以把这个函数转化成Ray task,可以远程执行
- @ray.remote
def square(x):
return x * x
obj_refs = []
- `square.remote` 会异步地远程执行`square`函数(该function就会被调度到远程节点的某个进程中执行)。通过下面两行代码,我们并发地执行了5个Ray task。`square.remote`的返回值是一个`ObjectRef`对象,表示Task执行结果的引用。
- for i in range(5):
obj_refs.append(square.remote(i))
- 实际的task执行结果存放在Ray的分布式object store里,我们可以通过`ray.get`接口,同步地获取这些数据。
- assert ray.get(obj_refs) == [0, 1, 4, 9, 16]
通用分布式编程API:分布式object
- Obect Store是Ray架构中的一个关键组件,Task计算的中间结果会存放在分布式Object Store中。除此之外,我们也可以使用put接口,显式地把一个Python或Java对象存放到Object Store中。
-
- 我们在Node 1运行heavy_compute function,这个 function 会使用remote通过Ray底层的调度系统调度到Node 2, Node 2会执行这个function,执行完成后,把结果put到本地的object store中,object store 是Ray中的一个核心组件,最终结果返回到Caller端是通过Ray底层的 object store之间的object传输,把结果返回来给Caller端。
- 从整个的流程看, heavy_compute.remote 返回的是一个ObjectRef,并不是最终的结果。ObjectRef类似于单机编程中的future,只不过它是分布式的future,可以通过ray.get获取最终结果。
- Ray的分布式 object store是非常核心的组件,完美支撑了Ray整套分布式API 的设计,其特点如下:
- 可以实现多节点之间object 传输;
- 同节点内是基于shared memory的设计,在此基础上,分布式系统的online传输,如果发生在单机两个进程之间的话,理论上可以达到 Zero Copy 的效果;
- Ray object store 有一套比较完整的自动垃圾回收机制,可以保证分布式系统运算过程中一旦有ObjectRef在系统中没有引用的时候,会触发对该object 进行GC;
- Object store有object spilling 的功能,可以自动将内存中的object spill到磁盘上,从而达到扩容整个分布式系统存储的目的。 PS: 分布式计算的核心就是状态中心、任务的分发与收集。
通用分布式编程API:有状态计算单元Actor
- Actor将单机编程的Class概念进行分布式化。Ray使用Actor来表示一个有状态的计算单元。在Ray中,我们可以基于任意一个Python class或Java class创建Actor对象。这个Actor对象运行在一个远程的Python或者Java进程中。同时,我们可以通过ActorHandle远程调用这个Actor的任意一个方法(每次调用称为一个Actor Task),多个Actor Task在Actor进程中会顺序执行,并共享Actor的状态。
- `Counter`是一个普通的Python类,`@ray.remote`装饰器表示我们可以把这个类转化成Ray actor.
- @ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
def get_value(self):
return self.value
- `Counter.remote`会基于`Counter`类创建一个actor对象,这个actor对象会运行在一个远程的Python进程中。在另外一台机器的另外一个节点上面去实例化这个class。
- counter = Counter.remote()
- `Counter.remote`的返回值是一个`ActorHandle`对象。通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。通过.remote实现远程调用。
- [counter.increment.remote() for _ in range(5)]
- Actor task的返回值也是一个`ObjectRef`对象。同样地,我们通过`ray.get`获取实际的数据。
- assert ray.get(counter.get_value.remote()) == 5
其它
- 在Ray中,我们可以把一个Task输出的ObjectRef传递给另一个Task(包括Actor task)。在这种情况下,Ray会等待第一个Task执行结束之后,再开始执行第二个Task。同时,我们也可以把一个ActorHandle传递给一个Task,从而实现在多个远程Worker中同时远程调用一个Actor。通过这些方式,我们可以动态地、灵活地构建各种各样复杂的分布式任务流。
- 通过把一个task输出的`ObjectRef`传递给另一个task,我们定义了两个task的依赖关系。Ray会等待第一个task执行结束之后,再开始执行第二个task。
- obj1 = square.remote(2)
obj2 = square.remote(obj1)
assert ray.get(obj2) == 16
- 除了Task和Actor两个基本概念,Ray还提供了一系列高级功能,来帮助开发者更容易地开发分布式应用。这些高级功能包括但不限于:设置Task和Actor所需的资源、Actor生命周期管理、Actor自动故障恢复、自定义调度策略、Python/Java跨语言编程。
- 如果没有Ray,在纯云原生的实现思路中,资源定制是写到 yaml 里边的。比如说训练需要多少GPU 或者计算节点需要多少CPU,都是在 yaml 中定制 container 的规格。Ray提供了另外一个选择,完全无感知的代码化的配置,用户可以在 runtime 的时候,或者在Ray的Task 或 Actor 的decorator 中加一个参数,就可以通过Ray系统的调度能力分配相应的资源,达到整个分布式系统资源定制的目的。Ray的资源定制除了支持GPU、CPU、Memory 之外,还可以插入自定义资源。然后Ray的调度还有一些高级功能,比如资源组,或者亲和性和反亲和性的调度,目前都是支持的。
- @ray.remote(num_gpus=1)
def train_and_evaluate(model,train_indices,test_indices):
- 在分布式系统中,往往不同分布式系统的组件对环境的要求是不一样的。如果使用常规思路,就需要把环境固化到image里面,通过 Dockerfile 去定制环境。Ray实现了更灵活的选择,也是代码化的,可以在runtime创建Task或Actor之前的任意时刻定制指定计算单元的运行时环境。上图中给worker 的 Task 设定一个runtime_env,定制一个专属的Python版本,并在该版本里面装入一些pip包,完成面向Python的隔离环境的定制。这时Ray集群内部会在创建这个Task之前去准备该环境,然后将该Task调度到该环境执行。
- @ray.remote(runtime_env={"python_version":"3.9","pip"=["scikit-learn"]})
def train_and_evaluate(model,train_indices,test_indices):
- Ray的运行时环境是插件化的设计,用户可以根据自己的需求实现不同的插件,在Ray中原生支持了一些插件如Pip、Conda、Container等,只要是跟环境相关,不只是代码依赖,也可以是数据依赖,都可以通过插件去实现。
-
- Ray中用户可以根据自己的环境定制的需求选择需要定制的环境的粒度。以Python为例,隔离性的支持力度有如下几个维度,一个是 Process 级别的隔离,第二是 Virtual env 级别的隔离,第三是 Conda 级别的隔离,最后是 Container级别隔离。从隔离性来说,从右到左是由弱到强的,Process 的隔离性是非常弱的,Container 隔离性是更强的。从用户体验来说,环境定制上 Container 是更重的而Process 是更轻的。
架构
基于 Ray 的大规模离线推理 Ray 项目是 UC Berkeley 的 RISElab 实验室在 2017 年前后发起的,定位是通用的分布式编程框架——Python-first。理论上通过 Ray 引擎用户可以轻松地把任何 Python 应用做成分布式,尤其是机器学习的相关应用,目前 Ray 主攻的一个方向就是机器学习。Ray 的架构分为三层
- 最下面一层是各种云基础设施,也就是说 Ray 帮用户屏蔽了底层的基础设施,用户拉起一个 Ray Cluster之后就可以立即开始分布式的编程,不用考虑底层的云原生或各种各样的环境;
- 中间层是 Ray Core 层。这一层是 Ray 提供的核心基础能力,主要是提供了 Low-level 的非常简洁的分布式编程 API。基于这套 API,用户可以非常容易地把现有的 Python 的程序分布式化。值得注意的是,这一层的 API 是 Low-level,没有绑定任何的计算范式,非常通用;
- 最上层是 Ray 社区基于 Ray Core 层做的丰富的机器学习库,这一层的定位是做机器学习 Pipeline。比如,数据加工读取、模型训练、超参优化、推理,强化学习等,都可以直接使用这些库来完成整个的 Pipeline,这也是 Ray 社区目前主攻的一个方向。
- 上图展示的是 Ray Cluster 的基本架构,每一个大框就是一个节点。(这里的节点是一个虚拟的概念,可以是一个物理机,一个 VM 或一个 Linux 的 Docker。比如在 K8s 上,一个节点就是一个 Pod。)
- Head 节点:是 Ray Cluster 的调度中心,比较核心的组件是 GCS(Global Control Service),GCS主要负责整个集群的资源调度和节点管理,类似于Hadoop架构中Yarn里边的 Resource Manager。Head节点也有可观测性 Dashboard。
- Worker 节点:除了 Head 节点之外,其他都是 Worker 节点,承载具体的工作负载。
- Raylet:每个节点上面都有一个守护进程 Raylet,它是一个 Local Scheduler,负责 Task 的调度以及 Worker 的管理。
- Object Store 组件:每个节点上都有 Object Store 组件,负责节点之间 Object 传输。在整个 Cluster 中每个节点的 Object Store 组件组成一个全局的分布式内存。同时,在单个节点上,Object Store 在多进程之间通过共享内存的方式减少 copy。
- Driver:当用户向 Ray Cluster 上提交一个 Job,或者用 Notebook 连接的时候,Ray挑选节点来运行 Driver 进行,执行用户代码。作业结束后 Driver 销毁。
- Worker:是 Ray 中 Task 和 Actor 的载体。
Ray 的Low-level和 High-level API
在部署 Ray 时,开源社区有完整的解决方案 Kuberay 项目。每个 Ray Cluster 由 Head 节点和 Worker 节点组成,每个节点是一份计算资源,可以是物理机、Docker 等等,在 K8s 上即为一个 Pod。启动 Ray Cluster 时,使用 Kuberay 的 Operator 来管理整个生命周期,包括创建和销毁 Cluster 等等。Kuberay 同时也支持自动扩展和水平扩展。Ray Cluster 在内部用于收集负载的 Metrics,并根据 Metrics 决定是否扩充更多的资源,如果需要则触发 Kuberay 拉起新的 Pod 或删除闲置的 Pod。
-
用户可以通过内部的平台使用 Ray,通过提交 Job 或使用 Notebook 进行交互式编程。平台通过 Kuberay 提供的 YAML 和 Restful API 这两种方式进行操作。
ray 如何控制并发度?Ray 的调度器会动态地根据当前系统资源和任务负载情况来决定启动几个 raylet 进程。Ray 的设计目标之一是能够自适应地处理不同规模和负载的工作负载。 工作负载可以通过@ray.remote 指定 运行它所需要的cpu/gpu等资源。 PS: 不一定对。