前言
运行时的环境管理是最容易被大家忽略的部分,如果只是一个人使用,确实不会是什么大问题。但如果是几百人使用,同时单任务涉及到数十个分布式节点呢?答案显而易见,很容易形成木桶效应,还有就是本机磁盘容易OOM。
使用示例
假如没有使用过ray,这里来个简单的示例,大家理解起来可能会更直观一点
方法总结
# ----------- 方法一:ray 初始化的时候统一环境
from ray.runtime_env import RuntimeEnv
# 使用 Ray 客户端连接到远程集群
ray.init("ray://123.456.7.89:10001", runtime_env=RuntimeEnv(...))
# ----------- 方法二:每个actor或者task 实例化以后选择不同的环境
from ray.runtime_env import RuntimeEnv
# 调用在指定运行时环境中运行的远程任务。
f.options(runtime_env=RuntimeEnv(...)).remote()
# 实例化在指定运行时环境中运行的actor。
actor = SomeClass.options(runtime_env=RuntimeEnv(...)).remote()
# ----------- 方法三:每个actor或者task初始化的时候就选择不同的环境
# 在任务定义中指定运行时环境。通过 `g.remote()` 的未来调用使用此运行时环境,除非通过使用
# `.options()` 进行覆盖。
@ray.remote(runtime_env=RuntimeEnv(...))
def g():
pass
# 在定义中指定运行时环境。通过 `MyClass.remote()` 的未来实例化使用此运行时环境,除非通过
# 使用 `.options()` 进行覆盖。
@ray.remote(runtime_env=RuntimeEnv(...))
class MyClass:
pass
详细示例
# 使用 conda 的示例
RuntimeEnv(conda={
"channels": ["defaults"], "dependencies": ["codecov"]})
RuntimeEnv(conda="pytorch_p36") # 在 DLAMIs 上找到
# 使用容器的示例
RuntimeEnv(
container={"image": "anyscale/ray-ml:nightly-py38-cpu",
"run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]})
# 设置环境变量的示例
RuntimeEnv(env_vars={"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"})
# 设置 pip 的示例
RuntimeEnv(
pip={"packages":["tensorflow", "requests"], "pip_check": False,
"pip_version": "==22.0.2;python_version=='3.8.11'"})
# 使用 image_uri 的示例
RuntimeEnv(
image_uri="rayproject/ray:2.39.0-py312-cu123")
参数说明
- py_modules: URIs 列表(可以在 GCS 或外部存储中),每个都是一个 zip 文件,Ray 解压并插入到工作节点的 PYTHONPATH 中。
- working_dir: URI(可以在 GCS 或外部存储中)指向一个 zip 文件,Ray 在每个任务/演员的目录中解压。
- pip: 可以是 pip 包的列表、包含 pip requirements.txt 文件路径的字符串,或具有三个字段的 Python 字典:
- 1)
packages
(必需,List[str]):pip 包的列表, - 2)
pip_check
(可选,bool):是否在 pip 安装结束时启用 pip 检查,默认为 False。 - 3)
pip_version
(可选,str):pip 的版本,Ray 在pip_version
前加上包名 “pip” 以形成最终的需求字符串,需求规范的语法在 PEP 508 中定义。
- 1)
- uv: 可以是 pip 包的列表,或具有一个字段的 Python 字典:1)
packages
(必需,List[str])。 - conda: 可以是 conda YAML 配置、本地 conda 环境的名称(例如 “pytorch_p36”),或指向 conda environment.yaml 文件的路径。Ray 会自动将依赖项注入到 conda 环境中,以确保与 Ray 集群的兼容性。Ray 可能会自动修改 conda 名称以避免运行时环境之间的冲突。此字段不能与 ‘pip’ 字段同时指定。要与 conda 一起使用 pip,请在 conda YAML 配置中指定您的 pip 依赖项.
- container: 要求给定的(Docker)容器镜像,Ray 工作进程在此镜像中运行。此参数只能单独使用,或与
config
或env_vars
参数一起使用。run_options
列表规范在这里:
https://docs.docker.com/engine/reference/run/ - env_vars: 要设置的环境变量。
- worker_process_setup_hook: (实验性)在工作进程启动后、任务和演员调度之前调用的设置钩子。可以传递模块名称(字符串类型)或可调用对象(函数)。当传递模块名称时,Ray 工作进程应该能够访问该模块名称。当传递可调用对象时,可调用对象应该是可序列化的。当通过作业提交 API 指定运行时环境时,仅允许模块名称(字符串)。
- nsight: 将 nsight 配置文件选项名称映射到其值的字典。
- config: 运行时环境的配置。可以是字典或 RuntimeEnvConfig。字段:(1)setup_timeout_seconds,运行时环境创建的超时,超时以秒为单位。
- image_uri: 指向容器镜像的 URI。Ray 工作进程在此镜像中运行。此参数只能单独使用,或与
config
或env_vars
参数一起使用。
架构
关键问题
核心代码在哪里?
ray/python/ray/_private/runtime_env/agent/runtime_env_agent.py
如何做到环境隔离?
- 通过docker
- Ray 还支持使用 Python 虚拟环境(如 venv 或 virtualenv)来隔离包
从任务提交到完成的过程中运行时package经历了哪些阶段?
1. 任务提交
当用户提交一个任务或演员时,可以通过 @ray.remote 装饰器或 .options() 方法指定 runtime_env。这个 runtime_env 可以包含 Python 包、环境变量、conda 环境、Docker 容器等信息。
2. 任务调度
一旦任务被提交,raylet 会将任务的元数据(包括 runtime_env)放入任务队列中。调度器会根据可用资源和任务的需求来选择合适的工作节点。
3. 运行时环境的解析
在选择了合适的工作节点后,raylet 会解析任务的 runtime_env。这包括:
- 解析 Python 包:如果 runtime_env 中指定了 pip 或 conda,raylet 会解析这些依赖项,并确保它们在工作节点上可用。
- 处理环境变量:raylet 会设置指定的环境变量,以确保任务在正确的环境中运行。
- Docker 容器:如果指定了 Docker 容器,raylet 会确保在工作节点上拉取并运行该容器。
4. 资源分配
在解析完 runtime_env 后,raylet 会分配所需的资源(如 CPU、内存等)给任务。此时,raylet 还会确保所需的运行时环境已经准备好。
5. 任务执行
一旦资源分配完成,raylet 会将任务发送到工作节点。工作节点会在指定的运行时环境中执行任务。这可能涉及到:
- 安装缺失的 Python 包(如果使用 pip 或 conda)。
- 设置环境变量。
- 启动 Docker 容器并在其中运行任务。
6. 结果返回
任务执行完成后,结果会被返回到 raylet,然后再返回给用户。
关键实现细节
- 依赖管理:raylet 使用 pip 和 conda 的 URI 来下载和安装依赖项。它会在工作节点上执行相应的命令来确保环境的正确性。
- 容器化支持:通过 Docker,raylet 可以在隔离的环境中运行任务,确保不同任务之间的环境不会相互干扰。
- 环境变量的设置:raylet 会在任务执行前设置所有指定的环境变量,以确保任务在预期的环境中运行。
自定义镜像是如何调度docker拉起的?
代码里面比较简单粗暴…直接用podman cmd指令。
async def _create_impl(image_uri: str, logger: logging.Logger):
# Pull image if it doesn't exist
# Also get path to `default_worker.py` inside the image.
pull_image_cmd = [
"podman",
"run",
"--rm",
image_uri,
"python",
"-c",
(
"import ray._private.workers.default_worker as default_worker; "
"print(default_worker.__file__)"
),
]
logger.info("Pulling image %s", image_uri)
worker_path = await check_output_cmd(pull_image_cmd, logger=logger)
return worker_path.strip()
def _modify_context_impl(
image_uri: str,
worker_path: str,
run_options: Optional[List[str]],
context: RuntimeEnvContext,
logger: logging.Logger,
ray_tmp_dir: str,
):
context.override_worker_entrypoint = worker_path
container_driver = "podman"
container_command = [
container_driver,
"run",
"-v",
ray_tmp_dir + ":" + ray_tmp_dir,
"--cgroup-manager=cgroupfs",
"--network=host",
"--pid=host",
"--ipc=host",
"--userns=keep-id",
]
# Environment variables to set in container
env_vars = dict()
# ...
# Set environment variables
for env_var_name, env_var_value in env_vars.items():
container_command.append("--env")
container_command.append(f"{env_var_name}='{env_var_value}'")
如何调用k8s 修改镜像这些的?
官方文档里面显示,所有的head/worker节点都是通过ray start来启动节点的,官方还为ray定制了一套CRD,方便ray灵活修改环境变量等。也就是说,kuberay提供了一套apiserver给到ray 来调用。
apiVersion: ray.io/v1
kind: RayCluster
metadata:
annotations:
# If this annotation is set to "true", KubeRay will respect the container `command` and `args`.
ray.io/overwrite-container-cmd: "true"
核心代码
RuntimeEnvPlugin
所有的环境相关代码都规整为一个个的Plugin,这是所有人的基类
```python/ray/_private/runtime_env/plugin.py
class RuntimeEnvPlugin(ABC):
"""Abstract base class for runtime environment plugins."""
@staticmethod
def validate(runtime_env_dict: dict) -> None:
# 核心功能:验证用户提供的运行时环境字典
# 常规场景:检查字典的结构和内容是否符合预期
# 边缘情况:如果字典缺少必要的字段,抛出ValueError
pass
def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]:
# 核心功能:返回与当前插件相关的URI
# 常规场景:返回空列表表示没有可用的URI
return []
async def create(
self,
uri: Optional[str],
runtime_env,
context: RuntimeEnvContext,
logger: logging.Logger,
) -> float:
# 核心功能:创建并安装运行时环境
# 常规场景:返回0表示没有占用磁盘空间
# 边缘情况:如果URI无效或创建失败,可能需要处理异常
return 0
def modify_context(
self,
uris: List[str],
runtime_env: "RuntimeEnv", # noqa: F821
context: RuntimeEnvContext,
logger: logging.Logger,
) -> None:
# 核心功能:修改上下文以影响工作者的启动行为
# 常规场景:添加环境变量或改变工作目录
return
def delete_uri(self, uri: str, logger: logging.Logger) -> float:
# 核心功能:删除指定URI的资源
# 常规场景:返回0表示没有空间被回收
# 边缘情况:如果URI无效,可能需要处理异常
return 0
class RuntimeEnvPluginManager:
"""This manager is used to load plugins in runtime env agent."""
def validate_plugin_class(self, plugin_class: Type[RuntimeEnvPlugin]) -> None:
# 核心功能:验证插件类是否有效
# 常规场景:检查插件类是否继承自RuntimeEnvPlugin
# 边缘情况:如果插件类没有名称或名称冲突,抛出RuntimeError
def validate_priority(self, priority: Any) -> None:
# 核心功能:验证插件优先级是否在有效范围内
# 常规场景:检查优先级是否为整数并在允许范围内
# 边缘情况:如果优先级无效,抛出RuntimeError
def load_plugins(self, plugin_configs: List[Dict]) -> None:
# 核心功能:加载插件配置并创建URI缓存
# 常规场景:遍历插件配置并验证每个插件
# 边缘情况:如果配置无效,抛出RuntimeError
def add_plugin(self, plugin: RuntimeEnvPlugin) -> None:
# 核心功能:添加插件并创建URI缓存
# 常规场景:验证插件类和优先级
# 边缘情况:如果插件类无效,抛出RuntimeError
def create_uri_cache_for_plugin(self, plugin: RuntimeEnvPlugin) -> URICache:
# 核心功能:创建URI缓存
# 常规场景:根据环境变量设置缓存大小
# 边缘情况:如果环境变量无效,使用默认值
def sorted_plugin_setup_contexts(self) -> List[PluginSetupContext]:
# 核心功能:返回按优先级排序的插件上下文
# 常规场景:使用sorted函数进行排序
return sorted(self.plugins.values(), key=lambda x: x.priority)
核心plugin
from ray._private.runtime_env.conda import CondaPlugin
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.java_jars import JavaJarsPlugin
from ray._private.runtime_env.image_uri import ContainerPlugin
from ray._private.runtime_env.pip import PipPlugin
from ray._private.runtime_env.uv import UvPlugin
from ray._private.runtime_env.plugin import RuntimeEnvPluginManager
from ray._private.runtime_env.py_modules import PyModulesPlugin
from ray._private.runtime_env.working_dir import WorkingDirPlugin
from ray._private.runtime_env.nsight import NsightPlugin
from ray._private.runtime_env.mpi import MPIPlugin
最后
看完以后最大的收获是,知道了如何插入公司内部的环境变量以及内部profiler工具。这一块大部分公司都会有自己的工具链,使用plugin的方式确实很方便。不知不觉已经写了8篇文章了,还是那句话,只要开始了,就永远不晚,共勉!祝看到这里的各位新年快乐!
标签:Ray,RuntimeEnv,源码,conda,env,pip,runtime,ray From: https://blog.csdn.net/weixin_43956669/article/details/144900507