Global Control Store
GCS (Global Control Store) 是 Ray 的全局控制存储系统,它是 Ray 的核心组件之一,负责存储和管理集群的元数据信息,它确保了整个集群的正常运行和高效调度。从图中可以看出来GCS 的重要性,其核心的功能包括:
- 中心化管理: 提供全局视图、统一控制平面和元数据管理
- 调度支持: 负责资源分配、任务调度和负载均衡
- 容错机制: 实现状态恢复、故障检测和高可用性保证
- 性能优化: 包含缓存机制、异步操作和批量更新
- 监控和调试: 支持状态追踪、性能监控和故障诊断
表结构
GCS 存储的主要数据结构:
- Actor Information:记录 Actor 的状态和位置。
- Task Information:存储任务的状态和调度信息。
- Object Information:管理对象的位置信息和状态。
- Node Information:包含集群节点的详细信息。
- System Information:系统的全局配置和状态。
# 概念性的表结构示例
TABLES = {
# Actor 相关信息
"ACTOR": {
"actor_id": "metadata", # 存储 actor 元数据
"actor_id:node": "location", # 存储 actor 位置
},
# 任务相关信息
"TASK": {
"task_id": "specification", # 任务规范
"task_id:status": "status", # 任务状态
},
# 对象相关信息
"OBJECT": {
"object_id": "location", # 对象位置
}
}
与其他组件的交互
Client/Driver
│
▼
Raylet (Local Scheduler)
│
▼
GCS (Global Control Store)
│
▼
Redis/Other Storage Backend
代码概览
核心组件
GCS由以下几个核心组件组成:
- 节点管理 (GcsNodeManager)
void GcsServer::InitGcsNodeManager() {
gcs_node_manager_ = std::make_unique<GcsNodeManager>(
gcs_publisher_.get(),
gcs_table_storage_.get(),
raylet_client_pool_.get());
}
负责管理集群中的节点
- 负责管理集群中的节点
- 处理节点注册、心跳和故障检测
- Actor 管理 (GcsActorManager)
void GcsServer::InitGcsActorManager() {
gcs_actor_manager_ = std::make_unique<GcsActorManager>(
std::move(scheduler),
gcs_table_storage_.get(),
gcs_publisher_.get());
}
管理 Actor 的创建、销毁和重建
- 管理 Actor 的创建、销毁和重建
- 处理 Actor 调度和故障恢复
- 资源管理 (GcsResourceManager)
void GcsServer::InitGcsResourceManager() {
gcs_resource_manager_ = std::make_unique<GcsResourceManager>(
cluster_resource_scheduler_->GetClusterResourceManager(),
*gcs_node_manager_);
}
- 管理集群资源
- 追踪节点资源使用情况
- 任务管理 (GcsTaskManager)
void GcsServer::InitGcsTaskManager() {
gcs_task_manager_ = std::make_unique<GcsTaskManager>();
}
- 管理任务的调度和执行
存储实现
GCS 支持两种存储模式:
enum class StorageType {
IN_MEMORY, // 内存存储
REDIS_PERSIST, // Redis 持久化存储
UNKNOWN
};
核心功能
- 服务启动流程:
void GcsServer::Start() {
// 1. 初始化 KV 管理器
InitKVManager();
// 2. 异步加载 GCS 表数据
auto gcs_init_data = std::make_shared<GcsInitData>();
gcs_init_data->AsyncLoad();
// 3. 初始化各个管理器
InitGcsNodeManager();
InitGcsActorManager();
InitGcsResourceManager();
// ...
// 4. 启动 RPC 服务
rpc_server_.Run();
}
- 事件监听机制:
void GcsServer::InstallEventListeners() {
// 节点事件
gcs_node_manager_->AddNodeAddedListener();
gcs_node_manager_->AddNodeRemovedListener();
// Worker 事件
gcs_worker_manager_->AddWorkerDeadListener();
// Job 事件
gcs_job_manager_->AddJobFinishedListener();
}
- 调度机制
// 资源变化时触发调度
gcs_resource_manager_->AddResourcesChangedListener([this] {
// 调度待处理的 placement groups
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
// 调度待处理的任务
cluster_task_manager_->ScheduleAndDispatchTasks();
});
以上就是 Ray GCS 的核心实现架构,它作为 Ray 的中央控制组件,协调管理整个集群的运行。如果只是想大概了解下功能,下面的内容可以跳过了。如果你想深入地了解代码细节,可以接着往下翻。
核心代码详解
GCS核心的代码在 [https://github.com/ray-project/ray/tree/master/src/ray/gcs](https://github.com/ray-project/ray/tree/master/src/ray/gcs)
里面,现在正式开始代码级分析:
gcs_client
accessor
- accessor访问器主要用于访问 GCS 中存储的不同类型的数据
- 主要的访问器类包括:
// 访问 Actor 相关信息
class ActorInfoAccessor
// 访问 Job 相关信息
class JobInfoAccessor
// 访问节点相关信息
class NodeInfoAccessor
// 访问节点资源信息
class NodeResourceInfoAccessor
// 访问错误信息
class ErrorInfoAccessor
// 访问任务信息
class TaskInfoAccessor
// 访问 Worker 信息
class WorkerInfoAccessor
// 访问 Placement Group 信息
class PlacementGroupInfoAccessor
// 访问内部 KV 存储
class InternalKVAccessor
-
每个访问器类都提供了一系列异步和同步方法来:
- 获取(Get)数据
- 添加(Add)数据
- 更新(Update)数据
- 删除(Remove)数据
- 订阅(Subscribe)数据变更
-
主要的设计特点:
- 大量使用异步操作,通过回调函数处理结果
virtual Status AsyncGet(const ActorID &actor_id, const OptionalItemCallback<rpc::ActorTableData> &callback);
- 支持超时机制
virtual Status AsyncGetByName( const std::string &name, const std::string &ray_namespace, const OptionalItemCallback<rpc::ActorTableData> &callback, int64_t timeout_ms = -1);
- 提供本地缓存功能
virtual const rpc::GcsNodeInfo *Get(const NodeID &node_id, bool filter_dead_nodes = true) const;
- 支持重新订阅机制(用于 GCS 服务器故障恢复)
virtual void AsyncResubscribe();
这些访问器共同构成了 Ray 系统中与 GCS 交互的完整接口,方便系统的其他组件访问和管理分布式系统中的各种元数据。该设计具有以下几个特点:
- 关注点分离:不同类型的数据由专门的访问器管理
- 异步操作:提升系统响应速度
- 容错性:支持故障恢复机制
- 可扩展性:便于添加新的访问器和功能
global_state_accessor
GlobalStateAccessor
是用来为语言前端(如 Python 的 state.py)提供同步接口来访问 GCS 中的数据。这个类在 Ray 系统中扮演着重要角色,它是连接底层 GCS 服务和高层语言前端的桥梁,使得用户可以方便地获取和管理分布式系统的状态。重要的设计特点:
- 同步接口: 与 accessor.h 中的异步接口不同,这个类提供同步接口,便于语言前端使用
- 序列化处理: 为了支持多语言,数据以序列化字符串形式返回,使用时需要用 protobuf 反序列化
- 线程安全: 使用互斥锁保护关键操作
- 多重锁保护:
// 使用多个互斥锁分别保护不同类型的操作
mutable absl::Mutex mutex_;
mutable absl::Mutex debugger_port_mutex_;
mutable absl::Mutex debugger_threads_mutex_;
此外,其主要的功能包括:
- 连接管理
// 连接到 GCS 服务器
bool Connect() ABSL_LOCKS_EXCLUDED(mutex_);
// 断开与 GCS 服务器的连接
void Disconnect() ABSL_LOCKS_EXCLUDED(mutex_);
- Job相关操作
// 获取所有 Job 信息
std::vector<std::string> GetAllJobInfo(...);
// 获取下一个 Job ID
JobID GetNextJobID();
- Node 相关操作
// 获取所有节点信息
std::vector<std::string> GetAllNodeInfo();
// 获取所有可用资源
std::vector<std::string> GetAllAvailableResources();
// 获取所有总资源
std::vector<std::string> GetAllTotalResources();
// 获取正在下线的节点
std::unordered_map<NodeID, int64_t> GetDrainingNodes();
- Actor 相关操作
// 获取所有 Actor 信息
std::vector<std::string> GetAllActorInfo(...);
// 获取指定 Actor 信息
std::unique_ptr<std::string> GetActorInfo(const ActorID &actor_id);
- Worker 相关操作
// 获取指定 Worker 信息
std::unique_ptr<std::string> GetWorkerInfo(const WorkerID &worker_id);
// 获取所有 Worker 信息
std::vector<std::string> GetAllWorkerInfo();
// 更新 Worker 的调试器端口
bool UpdateWorkerDebuggerPort(const WorkerID &worker_id, const uint32_t debugger_port);
// 更新 Worker 暂停的线程数
bool UpdateWorkerNumPausedThreads(...);
- Placement Group 相关操作
Placement Group 是一组具有特定资源需求和位置约束的 “bundle”(资源包)的集合。每个 bundle 可以指定需要的 CPU、GPU、内存等资源,它提供了细粒度的资源控制和调度能力,使得开发者可以根据应用需求优化资源使用和组件部署。
// 获取所有 Placement Group 信息
std::vector<std::string> GetAllPlacementGroupInfo();
// 获取指定 Placement Group 信息
std::unique_ptr<std::string> GetPlacementGroupInfo(...);
这里用一个例子稍微展开讲一下它是如何跟python代码交互的:
# Python 示例
import ray
# 创建一个 Placement Group
placement_group = ray.util.placement_group(
bundles=[
{"CPU": 2, "GPU": 1}, # 第一个 bundle 需要 2CPU 和 1GPU
{"CPU": 1} # 第二个 bundle 只需要 1CPU
],
strategy="STRICT_PACK" # 调度策略
)
调度策略类型:
# STRICT_PACK: 所有 bundles 必须在同一个节点上
strategy="STRICT_PACK"
# PACK: 尽量将 bundles 放在同一个节点,但如果不行可以分散
strategy="PACK"
# SPREAD: 将 bundles 分散到不同节点
strategy="SPREAD"
# STRICT_SPREAD: 强制将 bundles 分散到不同节点
strategy="STRICT_SPREAD"
根据实际应用场景可以灵活适配,比如:
- 分布式训练
# 机器学习训练场景
@ray.remote(num_gpus=1)
class Trainer:
def train(self):
pass
# 创建 placement group 确保训练任务的资源分配
pg = ray.util.placement_group([
{"GPU": 1, "CPU": 4},
{"GPU": 1, "CPU": 4}
])
# 在 placement group 中创建 actors
trainers = [Trainer.options(placement_group=pg).remote()
for _ in range(2)]
- 通信密集型应用
# 需要频繁通信的组件放在同一节点
pg = ray.util.placement_group(
bundles=[{"CPU": 1}] * 3,
strategy="STRICT_PACK"
)
- 容错设计
# 将关键组件分散到不同节点以提高可用性
pg = ray.util.placement_group(
bundles=[{"CPU": 1}] * 3,
strategy="STRICT_SPREAD"
)
这里总结一些使用建议:
- 对于计算密集型任务,使用 PACK 策略减少网络开销
- 对于需要容错的服务,使用 SPREAD 策略分散风险
- 对于有严格要求的应用,使用 STRICT_ 前缀的策略
- 根据实际资源需求合理设计 bundle 大小和数量
gcs_server
gcs_server文件夹下面的实现非常多,这里梳理了所有关键类的核心功能与特性。
文件名 | 主要功能 | 核心组件/类 | 关键特性 |
---|---|---|---|
gcs_server.h/cc | GCS 服务器的主要实现 | GcsServer | - 管理所有 GCS 服务 - 处理 RPC 请求 - 协调各个组件 |
gcs_server_options.h | GCS 服务器配置选项 | GcsServerOptions | - 服务器配置参数 - Redis 配置 - 网络设置 |
gcs_resource_manager.h | 资源管理器 | GcsResourceManager | - 集群资源管理 - 资源分配 - 资源追踪 |
gcs_resource_scheduler.h | 资源调度器 | GcsResourceScheduler | - 资源调度策略 - 负载均衡 - 调度优化 |
gcs_actor_manager.h | Actor 管理器 | GcsActorManager | - Actor 生命周期管理 - Actor 调度 - 故障恢复 |
gcs_placement_group_manager.h | 放置组管理器 | GcsPlacementGroupManager | - 放置组创建/删除 - 资源捆绑管理 - 调度策略 |
gcs_node_manager.h | 节点管理器 | GcsNodeManager | - 节点注册/注销 - 心跳监控 - 节点状态管理 |
gcs_worker_manager.h | Worker 管理器 | GcsWorkerManager | - Worker 生命周期 - Worker 分配 - 状态追踪 |
gcs_job_manager.h | Job 管理器 | GcsJobManager | - Job 提交/完成 - Job 状态管理 - 资源分配 |
gcs_table_storage.h | 表存储接口 | GcsTableStorage | - 元数据存储 - 状态持久化 - 数据访问接口 |
gcs_redis_failure_detector.h | Redis 故障检测器 | GcsRedisFailureDetector | - Redis 健康检查 - 故障检测 - 自动恢复 |
gcs_rpc_server.h | RPC 服务器 | GcsRpcServer | - RPC 服务处理 - 请求路由 - 协议实现 |
gcs_init_data.h | 初始化数据管理 | GcsInitData | - 系统初始化数据 - 启动配置 - 状态恢复 |
gcs_heartbeat_manager.h | 心跳管理器 | GcsHeartbeatManager | - 节点心跳检测 - 存活性监控 - 故障检测 |
gcs_function_manager.h | 函数管理器 | GcsFunctionManager | - 函数注册 - 版本管理 - 函数元数据 |
gcs_error_manager.h | 错误管理器 | GcsErrorManager | - 错误收集 - 错误报告 - 故障处理 |
gcs_kv_manager.h | KV 存储管理器 | GcsKVManager | - 键值存储 - 元数据管理 - 数据同步 |
gcs_resource_report_poller.h | 资源报告轮询器 | GcsResourceReportPoller | - 资源使用监控 - 状态报告 - 数据收集 |
gcs_resource_deletion_manager.h | 资源删除管理器 | GcsResourceDeletionManager | - 资源清理 - 内存回收 - 垃圾收集 |
gcs_resource_manager_accessor.h | 资源管理器访问器 | GcsResourceManagerAccessor | - 资源访问接口 - 资源查询 - 权限控制 |
pubsub
这个发布-订阅机制是 Ray 系统中重要的通信基础设施,它使得系统各组件能够以松耦合的方式进行通信和状态同步。设计上采用了异步操作、线程安全和错误处理等现代软件工程实践。
主要类
- GcsPublisher 类
class GcsPublisher {
public:
// 构造函数,初始化一个基于 GCS 的发布者
GcsPublisher(std::unique_ptr<pubsub::Publisher> publisher);
// 获取底层的发布者实例
pubsub::Publisher &GetPublisher() const;
// 发布不同类型的消息
Status PublishActor(...);
Status PublishJob(...);
Status PublishNodeInfo(...);
Status PublishWorkerFailure(...);
Status PublishError(...);
Status PublishResourceBatch(...);
};
- GcsSubscriber 类
class GcsSubscriber {
public:
// 构造函数,初始化一个基于 GCS 的订阅者
GcsSubscriber(const rpc::Address &gcs_address,
std::unique_ptr<pubsub::Subscriber> subscriber);
// 订阅不同类型的消息
Status SubscribeActor(...);
Status SubscribeAllJobs(...);
Status SubscribeAllNodeInfo(...);
Status SubscribeAllWorkerFailures(...);
// 取消订阅
Status UnsubscribeActor(const ActorID &id);
// 检查是否已取消订阅
bool IsActorUnsubscribed(const ActorID &id);
};
- Python 专用类
class RAY_EXPORT PythonGcsPublisher {
public:
// 构造函数
explicit PythonGcsPublisher(const std::string &gcs_address);
// 连接到 GCS 发布服务
Status Connect();
// 发布错误和日志
Status PublishError(...);
Status PublishLogs(...);
private:
// 带重试的发布实现
Status DoPublishWithRetries(...);
};
- PythonGcsSubscriber 类
class RAY_EXPORT PythonGcsSubscriber {
public:
// 构造函数
explicit PythonGcsSubscriber(const std::string &gcs_address,
int gcs_port,
rpc::ChannelType channel_type,
const std::string &subscriber_id,
const std::string &worker_id);
// 订阅操作
Status Subscribe();
// 轮询不同类型的消息
Status PollError(...);
Status PollLogs(...);
Status PollActor(...);
// 关闭订阅者
Status Close();
// 获取最后一批消息的大小
int64_t last_batch_size();
};
关键特性
- 支持多种消息类型:Actor\Job\NodeInfo\Error
- 订阅管理: 支持订阅和取消订阅
- 重试机制:带有retry和timeout
主要用途
- 状态同步:在集群中同步各种组件的状态变化
- 事件通知:通知系统中的重要事件(如节点失败、Actor创建等)
- 错误传播:分发系统中的错误信息
- 资源更新:发布资源使用情况的更新
使用实例
// 创建发布者
auto publisher = std::make_unique<GcsPublisher>(...);
// 发布 Actor 信息
publisher->PublishActor(actor_id, actor_data, [](Status status) {
if (status.ok()) {
// 发布成功
}
});
// 创建订阅者
auto subscriber = std::make_unique<GcsSubscriber>(...);
// 订阅 Actor 更新
subscriber->SubscribeActor(actor_id,
[](const ActorID &id, const rpc::ActorTableData &data) {
// 处理 Actor 更新
},
[](Status status) {
// 处理订阅结果
});
store_client
in_memory_store_client
这个内存存储客户端是 Ray 系统中的关键组件,提供高性能的内存数据存储功能,同时确保线程安全并支持异步操作。其设计遵循现代 C++ 的最佳实践,包括 RAII、智能指针和互斥锁等特性。
尽管 InMemoryStoreClient 是一个内存存储组件,在 Ray 系统中,它主要通过 GCS 服务器提供全局数据访问服务。它不仅支持本地通信,还通过 GCS 转发机制,实现分布式环境下的通信功能。
- 设计特点:
- 分层设计
- 继承自 StoreClient 接口
- 实现了所有必要的存储操作
- 内存管理
- 使用智能指针管理表对象
- 自动清理不再使用的资源
- 并发控制
- 细粒度锁控制
- 表级别的互斥访问
- 回调机制
- 使用 IO service 确保回调顺序
- 支持异步操作完成通知
- 性能优化
- 使用哈希表实现快速查找
- 细粒度锁减少竞争
- 内存存储提供高性能访问
- 异步操作避免阻塞
- 分层设计
- 使用示例
// 创建客户端
InMemoryStoreClient client(io_service);
// 异步存储数据
client.AsyncPut("table1", "key1", "value1", true,
[](bool success) {
if (success) {
// 写入成功
}
});
// 异步获取数据
client.AsyncGet("table1", "key1",
[](const Status &status, const std::optional<std::string> &value) {
if (status.ok() && value) {
// 处理获取到的值
}
});
- 典型使用流程
// 1. 组件向 GCS 写入数据
component->GcsClient()->AsyncPut("table", "key", "value");
// 2. GCS 将数据存储在内存中
gcs_server->memory_store_->AsyncPut("table", "key", "value");
// 3. 其他组件可以通过 GCS 访问数据
other_component->GcsClient()->AsyncGet("table", "key");
observable_store_client
这个类的实现主要是跟踪系统的一些状态metrics,包括
- Actor 状态监控
- 资源状态追踪
- 任务状态更新
ObservableStoreClient 在 Ray 系统中扮演着重要角色:
- 提供了一种机制来监控和响应系统状态变化
- 支持组件间的松耦合通信
- 实现了高效的状态同步机制
- 便于实现复杂的依赖关系和状态管理
这种设计模式使得系统各组件能够及时响应状态变化,同时保持了良好的解耦性和可扩展性。由于这部分不是核心重点,就不展开分析了。
redis_store_client
RedisStoreClient 提供了一个可靠的分布式存储实现,适合作为 Ray 系统的持久化存储后端
- 核心数据结构
class RedisStoreClient : public StoreClient {
private:
// Redis 键的封装
struct RedisKey {
const std::string external_storage_namespace;
const std::string table_name;
std::string ToString() const;
};
// Redis 命令的封装
struct RedisCommand {
std::string command; // 命令名称
RedisKey redis_key; // Redis 键
std::vector<std::string> args;// 命令参数
};
// 并发控制键
struct RedisConcurrencyKey {
std::string table_name;
std::string key;
};
};
- 并发控制
class RedisStoreClient {
private:
// 互斥锁保护
absl::Mutex mu_;
// 每个键的待处理请求队列
absl::flat_hash_map<RedisConcurrencyKey,
std::queue<std::function<void()>>>
pending_redis_request_by_key_ ABSL_GUARDED_BY(mu_);
// 将请求推入发送队列
size_t PushToSendingQueue(
const std::vector<RedisConcurrencyKey> &keys,
std::function<void()> send_request);
// 从发送队列取出请求
std::vector<std::function<void()>> TakeRequestsFromSendingQueue(
const std::vector<RedisConcurrencyKey> &keys);
}
- Redis 操作封装
class RedisStoreClient {
private:
// 发送 Redis 命令
void SendRedisCmdWithKeys(
std::vector<std::string> keys,
RedisCommand command,
RedisCallback redis_callback);
// 批量获取值
void MGetValues(
const std::string &table_name,
const std::vector<std::string> &keys,
const MapCallback<std::string, std::string> &callback);
};
- 扫描功能
class RedisStoreClient {
private:
class RedisScanner {
// 扫描 Redis 中的键和值
static void ScanKeysAndValues(
std::shared_ptr<RedisClient> redis_client,
RedisKey redis_key,
RedisMatchPattern match_pattern,
MapCallback<std::string, std::string> callback);
private:
// 扫描状态
std::optional<size_t> cursor_;
absl::flat_hash_map<std::string, std::string> results_;
};
};
-
主要接口实现
- 异步写入
Status RedisStoreClient::AsyncPut( const std::string &table_name, const std::string &key, const std::string &data, bool overwrite, std::function<void(bool)> callback) { // 构造 Redis 命令 RedisCommand cmd{ overwrite ? "HSET" : "HSETNX", {external_storage_namespace_, table_name}, {key, data} }; // 发送命令 SendRedisCmdWithKeys({key}, std::move(cmd), [callback](const std::shared_ptr<CallbackReply> &reply) { callback(reply->IsNil()); }); }
- 异步读取
Status RedisStoreClient::AsyncGet( const std::string &table_name, const std::string &key, const OptionalItemCallback<std::string> &callback) { // 构造 Redis 命令 RedisCommand cmd{ "HGET", {external_storage_namespace_, table_name}, {key} }; // 发送命令 SendRedisCmdWithKeys({key}, std::move(cmd), [callback](const std::shared_ptr<CallbackReply> &reply) { if (reply->IsNil()) { callback(Status::OK(), std::nullopt); } else { callback(Status::OK(), reply->AsString()); } }); }
-
并发控制机制
// 1. 每个键维护一个请求队列
// 2. 请求按顺序处理
// 3. 多键操作需要等待所有键都可用
void RedisStoreClient::SendRedisCmdWithKeys(
std::vector<std::string> keys,
RedisCommand command,
RedisCallback redis_callback) {
auto send_request = [this, command, redis_callback]() {
redis_client_->RunCommand(
command.ToRedisArgs(),
redis_callback);
};
std::vector<RedisConcurrencyKey> concurrency_keys;
for (const auto &key : keys) {
concurrency_keys.push_back({command.redis_key.table_name, key});
}
PushToSendingQueue(concurrency_keys, std::move(send_request));
}
- 性能优化
// 批量操作
Status RedisStoreClient::AsyncMultiGet(...) {
// 将请求分批处理
const int batch_size = RayConfig::instance().maximum_gcs_storage_operation_batch_size();
// 批量发送请求
for (size_t i = 0; i < keys.size(); i += batch_size) {
// 处理一批请求
}
}
最后
认真看过ray的论文就会知道,并不是所有的task或者actor都会先发送到gcs中。当worker节点有足够的资源能够完成提交graph的执行的时候,local scheduler是不会也不需要借助gcs来完成跨节点计算的,这样的设计不得不说真的是太赞了。因为,大部分调度框架是不感知真实负载的,纯纯的就是一个资源分发器。这篇文章详细地分析了gcs的核心代码实现,如果你能看到这里,别忘了一键三联,Enjoy!
标签:std,Status,manager,const,gcs,GCS,源码,Ray From: https://blog.csdn.net/weixin_43956669/article/details/144798769