首页 > 编程语言 >Ray 源码分析系列(8)—RuntimeEnv

Ray 源码分析系列(8)—RuntimeEnv

时间:2025-01-03 09:29:30浏览次数:3  
标签:Ray RuntimeEnv 源码 conda env pip runtime ray

前言

运行时的环境管理是最容易被大家忽略的部分,如果只是一个人使用,确实不会是什么大问题。但如果是几百人使用,同时单任务涉及到数十个分布式节点呢?答案显而易见,很容易形成木桶效应,还有就是本机磁盘容易OOM。

使用示例

假如没有使用过ray,这里来个简单的示例,大家理解起来可能会更直观一点

方法总结

# ----------- 方法一:ray 初始化的时候统一环境
from ray.runtime_env import RuntimeEnv
# 使用 Ray 客户端连接到远程集群
ray.init("ray://123.456.7.89:10001", runtime_env=RuntimeEnv(...))

# ----------- 方法二:每个actor或者task 实例化以后选择不同的环境
from ray.runtime_env import RuntimeEnv
# 调用在指定运行时环境中运行的远程任务。
f.options(runtime_env=RuntimeEnv(...)).remote()

# 实例化在指定运行时环境中运行的actor。
actor = SomeClass.options(runtime_env=RuntimeEnv(...)).remote()

# ----------- 方法三:每个actor或者task初始化的时候就选择不同的环境
# 在任务定义中指定运行时环境。通过 `g.remote()` 的未来调用使用此运行时环境,除非通过使用
# `.options()` 进行覆盖。
@ray.remote(runtime_env=RuntimeEnv(...))
def g():
    pass

# 在定义中指定运行时环境。通过 `MyClass.remote()` 的未来实例化使用此运行时环境,除非通过
# 使用 `.options()` 进行覆盖。
@ray.remote(runtime_env=RuntimeEnv(...))
class MyClass:
    pass
   

详细示例

# 使用 conda 的示例
RuntimeEnv(conda={
    "channels": ["defaults"], "dependencies": ["codecov"]})
RuntimeEnv(conda="pytorch_p36")   # 在 DLAMIs 上找到

# 使用容器的示例
RuntimeEnv(
    container={"image": "anyscale/ray-ml:nightly-py38-cpu",
    "run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]})

# 设置环境变量的示例
RuntimeEnv(env_vars={"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"})

# 设置 pip 的示例
RuntimeEnv(
    pip={"packages":["tensorflow", "requests"], "pip_check": False,
    "pip_version": "==22.0.2;python_version=='3.8.11'"})

# 使用 image_uri 的示例
RuntimeEnv(
    image_uri="rayproject/ray:2.39.0-py312-cu123")

参数说明

  • py_modules: URIs 列表(可以在 GCS 或外部存储中),每个都是一个 zip 文件,Ray 解压并插入到工作节点的 PYTHONPATH 中。
  • working_dir: URI(可以在 GCS 或外部存储中)指向一个 zip 文件,Ray 在每个任务/演员的目录中解压。
  • pip: 可以是 pip 包的列表、包含 pip requirements.txt 文件路径的字符串,或具有三个字段的 Python 字典:
    • 1)packages(必需,List[str]):pip 包的列表,
    • 2)pip_check(可选,bool):是否在 pip 安装结束时启用 pip 检查,默认为 False。
    • 3)pip_version(可选,str):pip 的版本,Ray 在 pip_version 前加上包名 “pip” 以形成最终的需求字符串,需求规范的语法在 PEP 508 中定义。
  • uv: 可以是 pip 包的列表,或具有一个字段的 Python 字典:1)packages(必需,List[str])。
  • conda: 可以是 conda YAML 配置、本地 conda 环境的名称(例如 “pytorch_p36”),或指向 conda environment.yaml 文件的路径。Ray 会自动将依赖项注入到 conda 环境中,以确保与 Ray 集群的兼容性。Ray 可能会自动修改 conda 名称以避免运行时环境之间的冲突。此字段不能与 ‘pip’ 字段同时指定。要与 conda 一起使用 pip,请在 conda YAML 配置中指定您的 pip 依赖项.
  • container: 要求给定的(Docker)容器镜像,Ray 工作进程在此镜像中运行。此参数只能单独使用,或与 configenv_vars 参数一起使用。run_options 列表规范在这里:
    https://docs.docker.com/engine/reference/run/
  • env_vars: 要设置的环境变量。
  • worker_process_setup_hook: (实验性)在工作进程启动后、任务和演员调度之前调用的设置钩子。可以传递模块名称(字符串类型)或可调用对象(函数)。当传递模块名称时,Ray 工作进程应该能够访问该模块名称。当传递可调用对象时,可调用对象应该是可序列化的。当通过作业提交 API 指定运行时环境时,仅允许模块名称(字符串)。
  • nsight: 将 nsight 配置文件选项名称映射到其值的字典。
  • config: 运行时环境的配置。可以是字典或 RuntimeEnvConfig。字段:(1)setup_timeout_seconds,运行时环境创建的超时,超时以秒为单位。
  • image_uri: 指向容器镜像的 URI。Ray 工作进程在此镜像中运行。此参数只能单独使用,或与 configenv_vars 参数一起使用。

架构

一图胜千言

关键问题

核心代码在哪里?

ray/python/ray/_private/runtime_env/agent/runtime_env_agent.py

如何做到环境隔离?

  1. 通过docker
  2. Ray 还支持使用 Python 虚拟环境(如 venv 或 virtualenv)来隔离包

从任务提交到完成的过程中运行时package经历了哪些阶段?

1. 任务提交

当用户提交一个任务或演员时,可以通过 @ray.remote 装饰器或 .options() 方法指定 runtime_env。这个 runtime_env 可以包含 Python 包、环境变量、conda 环境、Docker 容器等信息。

2. 任务调度

一旦任务被提交,raylet 会将任务的元数据(包括 runtime_env)放入任务队列中。调度器会根据可用资源和任务的需求来选择合适的工作节点。

3. 运行时环境的解析

在选择了合适的工作节点后,raylet 会解析任务的 runtime_env。这包括:

  • 解析 Python 包:如果 runtime_env 中指定了 pip 或 conda,raylet 会解析这些依赖项,并确保它们在工作节点上可用。
  • 处理环境变量:raylet 会设置指定的环境变量,以确保任务在正确的环境中运行。
  • Docker 容器:如果指定了 Docker 容器,raylet 会确保在工作节点上拉取并运行该容器。

4. 资源分配

在解析完 runtime_env 后,raylet 会分配所需的资源(如 CPU、内存等)给任务。此时,raylet 还会确保所需的运行时环境已经准备好。

5. 任务执行

一旦资源分配完成,raylet 会将任务发送到工作节点。工作节点会在指定的运行时环境中执行任务。这可能涉及到:

  • 安装缺失的 Python 包(如果使用 pip 或 conda)。
  • 设置环境变量。
  • 启动 Docker 容器并在其中运行任务。

6. 结果返回

任务执行完成后,结果会被返回到 raylet,然后再返回给用户。

关键实现细节

  • 依赖管理:raylet 使用 pip 和 conda 的 URI 来下载和安装依赖项。它会在工作节点上执行相应的命令来确保环境的正确性。
  • 容器化支持:通过 Docker,raylet 可以在隔离的环境中运行任务,确保不同任务之间的环境不会相互干扰。
  • 环境变量的设置:raylet 会在任务执行前设置所有指定的环境变量,以确保任务在预期的环境中运行。

自定义镜像是如何调度docker拉起的?

代码里面比较简单粗暴…直接用podman cmd指令。


async def _create_impl(image_uri: str, logger: logging.Logger):
    # Pull image if it doesn't exist
    # Also get path to `default_worker.py` inside the image.
    pull_image_cmd = [
        "podman",
        "run",
        "--rm",
        image_uri,
        "python",
        "-c",
        (
            "import ray._private.workers.default_worker as default_worker; "
            "print(default_worker.__file__)"
        ),
    ]
    logger.info("Pulling image %s", image_uri)
    worker_path = await check_output_cmd(pull_image_cmd, logger=logger)
    return worker_path.strip()

def _modify_context_impl(
    image_uri: str,
    worker_path: str,
    run_options: Optional[List[str]],
    context: RuntimeEnvContext,
    logger: logging.Logger,
    ray_tmp_dir: str,
):
    context.override_worker_entrypoint = worker_path

    container_driver = "podman"
    container_command = [
        container_driver,
        "run",
        "-v",
        ray_tmp_dir + ":" + ray_tmp_dir,
        "--cgroup-manager=cgroupfs",
        "--network=host",
        "--pid=host",
        "--ipc=host",
        "--userns=keep-id",
    ]

    # Environment variables to set in container
    env_vars = dict()
		
		# ...

    # Set environment variables
    for env_var_name, env_var_value in env_vars.items():
        container_command.append("--env")
        container_command.append(f"{env_var_name}='{env_var_value}'")
		

如何调用k8s 修改镜像这些的?

官方文档里面显示,所有的head/worker节点都是通过ray start来启动节点的,官方还为ray定制了一套CRD,方便ray灵活修改环境变量等。也就是说,kuberay提供了一套apiserver给到ray 来调用。

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  annotations:
    # If this annotation is set to "true", KubeRay will respect the container `command` and `args`.
    ray.io/overwrite-container-cmd: "true"

核心代码

RuntimeEnvPlugin

所有的环境相关代码都规整为一个个的Plugin,这是所有人的基类

```python/ray/_private/runtime_env/plugin.py
class RuntimeEnvPlugin(ABC):
    """Abstract base class for runtime environment plugins."""
    @staticmethod
    def validate(runtime_env_dict: dict) -> None:
        # 核心功能:验证用户提供的运行时环境字典
        # 常规场景:检查字典的结构和内容是否符合预期
        # 边缘情况:如果字典缺少必要的字段,抛出ValueError
        pass

    def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: 
        # 核心功能:返回与当前插件相关的URI
        # 常规场景:返回空列表表示没有可用的URI
        return []

    async def create(
        self,
        uri: Optional[str],
        runtime_env,
        context: RuntimeEnvContext,
        logger: logging.Logger,
    ) -> float:
        # 核心功能:创建并安装运行时环境
        # 常规场景:返回0表示没有占用磁盘空间
        # 边缘情况:如果URI无效或创建失败,可能需要处理异常
        return 0

    def modify_context(
        self,
        uris: List[str],
        runtime_env: "RuntimeEnv",  # noqa: F821
        context: RuntimeEnvContext,
        logger: logging.Logger,
    ) -> None:
        # 核心功能:修改上下文以影响工作者的启动行为
        # 常规场景:添加环境变量或改变工作目录
        return

    def delete_uri(self, uri: str, logger: logging.Logger) -> float:
        # 核心功能:删除指定URI的资源
        # 常规场景:返回0表示没有空间被回收
        # 边缘情况:如果URI无效,可能需要处理异常
        return 0

class RuntimeEnvPluginManager:
    """This manager is used to load plugins in runtime env agent."""

    def validate_plugin_class(self, plugin_class: Type[RuntimeEnvPlugin]) -> None:
        # 核心功能:验证插件类是否有效
        # 常规场景:检查插件类是否继承自RuntimeEnvPlugin
        # 边缘情况:如果插件类没有名称或名称冲突,抛出RuntimeError
      

    def validate_priority(self, priority: Any) -> None:
        # 核心功能:验证插件优先级是否在有效范围内
        # 常规场景:检查优先级是否为整数并在允许范围内
        # 边缘情况:如果优先级无效,抛出RuntimeError
       
    def load_plugins(self, plugin_configs: List[Dict]) -> None:
        # 核心功能:加载插件配置并创建URI缓存
        # 常规场景:遍历插件配置并验证每个插件
        # 边缘情况:如果配置无效,抛出RuntimeError
        

    def add_plugin(self, plugin: RuntimeEnvPlugin) -> None:
        # 核心功能:添加插件并创建URI缓存
        # 常规场景:验证插件类和优先级
        # 边缘情况:如果插件类无效,抛出RuntimeError

    def create_uri_cache_for_plugin(self, plugin: RuntimeEnvPlugin) -> URICache:
        # 核心功能:创建URI缓存
        # 常规场景:根据环境变量设置缓存大小
        # 边缘情况:如果环境变量无效,使用默认值
       

    def sorted_plugin_setup_contexts(self) -> List[PluginSetupContext]:
        # 核心功能:返回按优先级排序的插件上下文
        # 常规场景:使用sorted函数进行排序
        return sorted(self.plugins.values(), key=lambda x: x.priority)

核心plugin


from ray._private.runtime_env.conda import CondaPlugin
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.java_jars import JavaJarsPlugin
from ray._private.runtime_env.image_uri import ContainerPlugin
from ray._private.runtime_env.pip import PipPlugin
from ray._private.runtime_env.uv import UvPlugin
from ray._private.runtime_env.plugin import RuntimeEnvPluginManager
from ray._private.runtime_env.py_modules import PyModulesPlugin
from ray._private.runtime_env.working_dir import WorkingDirPlugin
from ray._private.runtime_env.nsight import NsightPlugin
from ray._private.runtime_env.mpi import MPIPlugin

最后

看完以后最大的收获是,知道了如何插入公司内部的环境变量以及内部profiler工具。这一块大部分公司都会有自己的工具链,使用plugin的方式确实很方便。不知不觉已经写了8篇文章了,还是那句话,只要开始了,就永远不晚,共勉!祝看到这里的各位新年快乐!

标签:Ray,RuntimeEnv,源码,conda,env,pip,runtime,ray
From: https://blog.csdn.net/weixin_43956669/article/details/144900507

相关文章