首页 > 编程语言 >Ray 源码分析系列(3)—GCS

Ray 源码分析系列(3)—GCS

时间:2024-12-29 13:57:31浏览次数:7  
标签:std Status manager const gcs GCS 源码 Ray

Global Control Store

在这里插入图片描述

GCS (Global Control Store) 是 Ray 的全局控制存储系统,它是 Ray 的核心组件之一,负责存储和管理集群的元数据信息,它确保了整个集群的正常运行和高效调度。从图中可以看出来GCS 的重要性,其核心的功能包括:

  • 中心化管理: 提供全局视图、统一控制平面和元数据管理
  • 调度支持: 负责资源分配、任务调度和负载均衡
  • 容错机制: 实现状态恢复、故障检测和高可用性保证
  • 性能优化: 包含缓存机制、异步操作和批量更新
  • 监控和调试: 支持状态追踪、性能监控和故障诊断

表结构

GCS 存储的主要数据结构:

  • Actor Information:记录 Actor 的状态和位置。
  • Task Information:存储任务的状态和调度信息。
  • Object Information:管理对象的位置信息和状态。
  • Node Information:包含集群节点的详细信息。
  • System Information:系统的全局配置和状态。
# 概念性的表结构示例
TABLES = {
    # Actor 相关信息
    "ACTOR": {
        "actor_id": "metadata",  # 存储 actor 元数据
        "actor_id:node": "location",  # 存储 actor 位置
    },
    # 任务相关信息
    "TASK": {
        "task_id": "specification",  # 任务规范
        "task_id:status": "status",  # 任务状态
    },
    # 对象相关信息
    "OBJECT": {
        "object_id": "location",  # 对象位置
    }
}

与其他组件的交互

Client/Driver
    │
    ▼
Raylet (Local Scheduler)
    │
    ▼
GCS (Global Control Store)
    │
    ▼
Redis/Other Storage Backend

代码概览

核心组件

GCS由以下几个核心组件组成:

  1. 节点管理 (GcsNodeManager)
void GcsServer::InitGcsNodeManager() {
  gcs_node_manager_ = std::make_unique<GcsNodeManager>(
    gcs_publisher_.get(),
    gcs_table_storage_.get(),
    raylet_client_pool_.get());
}

负责管理集群中的节点

  • 负责管理集群中的节点
  • 处理节点注册、心跳和故障检测
  1. Actor 管理 (GcsActorManager)
void GcsServer::InitGcsActorManager() {
  gcs_actor_manager_ = std::make_unique<GcsActorManager>(
    std::move(scheduler),
    gcs_table_storage_.get(),
    gcs_publisher_.get());
}

管理 Actor 的创建、销毁和重建

  • 管理 Actor 的创建、销毁和重建
  • 处理 Actor 调度和故障恢复
  1. 资源管理 (GcsResourceManager)
void GcsServer::InitGcsResourceManager() {
  gcs_resource_manager_ = std::make_unique<GcsResourceManager>(
    cluster_resource_scheduler_->GetClusterResourceManager(),
    *gcs_node_manager_);
}
  • 管理集群资源
  • 追踪节点资源使用情况
  1. 任务管理 (GcsTaskManager)
void GcsServer::InitGcsTaskManager() {
  gcs_task_manager_ = std::make_unique<GcsTaskManager>();
}
  • 管理任务的调度和执行

存储实现

GCS 支持两种存储模式:

enum class StorageType {
  IN_MEMORY,      // 内存存储
  REDIS_PERSIST,  // Redis 持久化存储
  UNKNOWN
};

核心功能

  1. 服务启动流程:
void GcsServer::Start() {
  // 1. 初始化 KV 管理器
  InitKVManager();
  
  // 2. 异步加载 GCS 表数据
  auto gcs_init_data = std::make_shared<GcsInitData>();
  gcs_init_data->AsyncLoad();
  
  // 3. 初始化各个管理器
  InitGcsNodeManager();
  InitGcsActorManager(); 
  InitGcsResourceManager();
  // ...
  
  // 4. 启动 RPC 服务
  rpc_server_.Run();
}
  1. 事件监听机制:
void GcsServer::InstallEventListeners() {
  // 节点事件
  gcs_node_manager_->AddNodeAddedListener();
  gcs_node_manager_->AddNodeRemovedListener();
  
  // Worker 事件  
  gcs_worker_manager_->AddWorkerDeadListener();
  
  // Job 事件
  gcs_job_manager_->AddJobFinishedListener();
}
  1. 调度机制
// 资源变化时触发调度
gcs_resource_manager_->AddResourcesChangedListener([this] {
  // 调度待处理的 placement groups
  gcs_placement_group_manager_->SchedulePendingPlacementGroups();
  // 调度待处理的任务
  cluster_task_manager_->ScheduleAndDispatchTasks();
});

以上就是 Ray GCS 的核心实现架构,它作为 Ray 的中央控制组件,协调管理整个集群的运行。如果只是想大概了解下功能,下面的内容可以跳过了。如果你想深入地了解代码细节,可以接着往下翻。


核心代码详解

GCS核心的代码在 [https://github.com/ray-project/ray/tree/master/src/ray/gcs](https://github.com/ray-project/ray/tree/master/src/ray/gcs) 里面,现在正式开始代码级分析:
在这里插入图片描述

gcs_client

在这里插入图片描述

accessor

  • accessor访问器主要用于访问 GCS 中存储的不同类型的数据
  • 主要的访问器类包括:
// 访问 Actor 相关信息
class ActorInfoAccessor
// 访问 Job 相关信息  
class JobInfoAccessor
// 访问节点相关信息
class NodeInfoAccessor 
// 访问节点资源信息
class NodeResourceInfoAccessor
// 访问错误信息
class ErrorInfoAccessor
// 访问任务信息 
class TaskInfoAccessor
// 访问 Worker 信息
class WorkerInfoAccessor
// 访问 Placement Group 信息
class PlacementGroupInfoAccessor
// 访问内部 KV 存储
class InternalKVAccessor
  • 每个访问器类都提供了一系列异步和同步方法来:

    • 获取(Get)数据
    • 添加(Add)数据
    • 更新(Update)数据
    • 删除(Remove)数据
    • 订阅(Subscribe)数据变更
  • 主要的设计特点:

    • 大量使用异步操作,通过回调函数处理结果
    virtual Status AsyncGet(const ActorID &actor_id,
                          const OptionalItemCallback<rpc::ActorTableData> &callback);
    
    • 支持超时机制
    virtual Status AsyncGetByName(
        const std::string &name,
        const std::string &ray_namespace, 
        const OptionalItemCallback<rpc::ActorTableData> &callback,
        int64_t timeout_ms = -1);
    
    • 提供本地缓存功能
    virtual const rpc::GcsNodeInfo *Get(const NodeID &node_id, 
                                       bool filter_dead_nodes = true) const;
    
    • 支持重新订阅机制(用于 GCS 服务器故障恢复)
    virtual void AsyncResubscribe();
    

这些访问器共同构成了 Ray 系统中与 GCS 交互的完整接口,方便系统的其他组件访问和管理分布式系统中的各种元数据。该设计具有以下几个特点:

  • 关注点分离:不同类型的数据由专门的访问器管理
  • 异步操作:提升系统响应速度
  • 容错性:支持故障恢复机制
  • 可扩展性:便于添加新的访问器和功能

global_state_accessor

GlobalStateAccessor 是用来为语言前端(如 Python 的 state.py)提供同步接口来访问 GCS 中的数据。这个类在 Ray 系统中扮演着重要角色,它是连接底层 GCS 服务和高层语言前端的桥梁,使得用户可以方便地获取和管理分布式系统的状态。重要的设计特点:

  • 同步接口: 与 accessor.h 中的异步接口不同,这个类提供同步接口,便于语言前端使用
  • 序列化处理: 为了支持多语言,数据以序列化字符串形式返回,使用时需要用 protobuf 反序列化
  • 线程安全: 使用互斥锁保护关键操作
  • 多重锁保护:
// 使用多个互斥锁分别保护不同类型的操作
mutable absl::Mutex mutex_;
mutable absl::Mutex debugger_port_mutex_;
mutable absl::Mutex debugger_threads_mutex_;

此外,其主要的功能包括:

  • 连接管理
// 连接到 GCS 服务器
bool Connect() ABSL_LOCKS_EXCLUDED(mutex_);

// 断开与 GCS 服务器的连接
void Disconnect() ABSL_LOCKS_EXCLUDED(mutex_);
  • Job相关操作
// 获取所有 Job 信息
std::vector<std::string> GetAllJobInfo(...);

// 获取下一个 Job ID
JobID GetNextJobID();
  • Node 相关操作
// 获取所有节点信息
std::vector<std::string> GetAllNodeInfo();

// 获取所有可用资源
std::vector<std::string> GetAllAvailableResources();

// 获取所有总资源
std::vector<std::string> GetAllTotalResources();

// 获取正在下线的节点
std::unordered_map<NodeID, int64_t> GetDrainingNodes();
  • Actor 相关操作
// 获取所有 Actor 信息
std::vector<std::string> GetAllActorInfo(...);

// 获取指定 Actor 信息
std::unique_ptr<std::string> GetActorInfo(const ActorID &actor_id);
  • Worker 相关操作
// 获取指定 Worker 信息
std::unique_ptr<std::string> GetWorkerInfo(const WorkerID &worker_id);

// 获取所有 Worker 信息
std::vector<std::string> GetAllWorkerInfo();

// 更新 Worker 的调试器端口
bool UpdateWorkerDebuggerPort(const WorkerID &worker_id, const uint32_t debugger_port);

// 更新 Worker 暂停的线程数
bool UpdateWorkerNumPausedThreads(...);
  • Placement Group 相关操作

Placement Group 是一组具有特定资源需求和位置约束的 “bundle”(资源包)的集合。每个 bundle 可以指定需要的 CPU、GPU、内存等资源,它提供了细粒度的资源控制和调度能力,使得开发者可以根据应用需求优化资源使用和组件部署。

// 获取所有 Placement Group 信息
std::vector<std::string> GetAllPlacementGroupInfo();

// 获取指定 Placement Group 信息
std::unique_ptr<std::string> GetPlacementGroupInfo(...);

这里用一个例子稍微展开讲一下它是如何跟python代码交互的:

# Python 示例
import ray

# 创建一个 Placement Group
placement_group = ray.util.placement_group(
    bundles=[
        {"CPU": 2, "GPU": 1},  # 第一个 bundle 需要 2CPU 和 1GPU
        {"CPU": 1}             # 第二个 bundle 只需要 1CPU
    ],
    strategy="STRICT_PACK"     # 调度策略
)

调度策略类型:

# STRICT_PACK: 所有 bundles 必须在同一个节点上
strategy="STRICT_PACK"

# PACK: 尽量将 bundles 放在同一个节点,但如果不行可以分散
strategy="PACK"

# SPREAD: 将 bundles 分散到不同节点
strategy="SPREAD"

# STRICT_SPREAD: 强制将 bundles 分散到不同节点
strategy="STRICT_SPREAD"

根据实际应用场景可以灵活适配,比如:

  • 分布式训练
# 机器学习训练场景
@ray.remote(num_gpus=1)
class Trainer:
    def train(self):
        pass

# 创建 placement group 确保训练任务的资源分配
pg = ray.util.placement_group([
    {"GPU": 1, "CPU": 4},
    {"GPU": 1, "CPU": 4}
])

# 在 placement group 中创建 actors
trainers = [Trainer.options(placement_group=pg).remote() 
           for _ in range(2)]
  • 通信密集型应用
# 需要频繁通信的组件放在同一节点
pg = ray.util.placement_group(
    bundles=[{"CPU": 1}] * 3,
    strategy="STRICT_PACK"
)
  • 容错设计
# 将关键组件分散到不同节点以提高可用性
pg = ray.util.placement_group(
    bundles=[{"CPU": 1}] * 3,
    strategy="STRICT_SPREAD"
)

这里总结一些使用建议:

  • 对于计算密集型任务,使用 PACK 策略减少网络开销
  • 对于需要容错的服务,使用 SPREAD 策略分散风险
  • 对于有严格要求的应用,使用 STRICT_ 前缀的策略
  • 根据实际资源需求合理设计 bundle 大小和数量

gcs_server

gcs_server文件夹下面的实现非常多,这里梳理了所有关键类的核心功能与特性。

文件名主要功能核心组件/类关键特性
gcs_server.h/ccGCS 服务器的主要实现GcsServer- 管理所有 GCS 服务
- 处理 RPC 请求
- 协调各个组件
gcs_server_options.hGCS 服务器配置选项GcsServerOptions- 服务器配置参数
- Redis 配置
- 网络设置
gcs_resource_manager.h资源管理器GcsResourceManager- 集群资源管理
- 资源分配
- 资源追踪
gcs_resource_scheduler.h资源调度器GcsResourceScheduler- 资源调度策略
- 负载均衡
- 调度优化
gcs_actor_manager.hActor 管理器GcsActorManager- Actor 生命周期管理
- Actor 调度
- 故障恢复
gcs_placement_group_manager.h放置组管理器GcsPlacementGroupManager- 放置组创建/删除
- 资源捆绑管理
- 调度策略
gcs_node_manager.h节点管理器GcsNodeManager- 节点注册/注销
- 心跳监控
- 节点状态管理
gcs_worker_manager.hWorker 管理器GcsWorkerManager- Worker 生命周期
- Worker 分配
- 状态追踪
gcs_job_manager.hJob 管理器GcsJobManager- Job 提交/完成
- Job 状态管理
- 资源分配
gcs_table_storage.h表存储接口GcsTableStorage- 元数据存储
- 状态持久化
- 数据访问接口
gcs_redis_failure_detector.hRedis 故障检测器GcsRedisFailureDetector- Redis 健康检查
- 故障检测
- 自动恢复
gcs_rpc_server.hRPC 服务器GcsRpcServer- RPC 服务处理
- 请求路由
- 协议实现
gcs_init_data.h初始化数据管理GcsInitData- 系统初始化数据
- 启动配置
- 状态恢复
gcs_heartbeat_manager.h心跳管理器GcsHeartbeatManager- 节点心跳检测
- 存活性监控
- 故障检测
gcs_function_manager.h函数管理器GcsFunctionManager- 函数注册
- 版本管理
- 函数元数据
gcs_error_manager.h错误管理器GcsErrorManager- 错误收集
- 错误报告
- 故障处理
gcs_kv_manager.hKV 存储管理器GcsKVManager- 键值存储
- 元数据管理
- 数据同步
gcs_resource_report_poller.h资源报告轮询器GcsResourceReportPoller- 资源使用监控
- 状态报告
- 数据收集
gcs_resource_deletion_manager.h资源删除管理器GcsResourceDeletionManager- 资源清理
- 内存回收
- 垃圾收集
gcs_resource_manager_accessor.h资源管理器访问器GcsResourceManagerAccessor- 资源访问接口
- 资源查询
- 权限控制

pubsub

这个发布-订阅机制是 Ray 系统中重要的通信基础设施,它使得系统各组件能够以松耦合的方式进行通信和状态同步。设计上采用了异步操作、线程安全和错误处理等现代软件工程实践。

主要类

  • GcsPublisher 类
class GcsPublisher {
public:
    // 构造函数,初始化一个基于 GCS 的发布者
    GcsPublisher(std::unique_ptr<pubsub::Publisher> publisher);
    
    // 获取底层的发布者实例
    pubsub::Publisher &GetPublisher() const;
    
    // 发布不同类型的消息
    Status PublishActor(...);
    Status PublishJob(...);
    Status PublishNodeInfo(...);
    Status PublishWorkerFailure(...);
    Status PublishError(...);
    Status PublishResourceBatch(...);
};
  • GcsSubscriber 类
class GcsSubscriber {
public:
    // 构造函数,初始化一个基于 GCS 的订阅者
    GcsSubscriber(const rpc::Address &gcs_address,
                 std::unique_ptr<pubsub::Subscriber> subscriber);
    
    // 订阅不同类型的消息
    Status SubscribeActor(...);
    Status SubscribeAllJobs(...);
    Status SubscribeAllNodeInfo(...);
    Status SubscribeAllWorkerFailures(...);
    
    // 取消订阅
    Status UnsubscribeActor(const ActorID &id);
    
    // 检查是否已取消订阅
    bool IsActorUnsubscribed(const ActorID &id);
};
  • Python 专用类
class RAY_EXPORT PythonGcsPublisher {
public:
    // 构造函数
    explicit PythonGcsPublisher(const std::string &gcs_address);
    
    // 连接到 GCS 发布服务
    Status Connect();
    
    // 发布错误和日志
    Status PublishError(...);
    Status PublishLogs(...);
    
private:
    // 带重试的发布实现
    Status DoPublishWithRetries(...);
};
  • PythonGcsSubscriber 类
class RAY_EXPORT PythonGcsSubscriber {
public:
    // 构造函数
    explicit PythonGcsSubscriber(const std::string &gcs_address,
                               int gcs_port,
                               rpc::ChannelType channel_type,
                               const std::string &subscriber_id,
                               const std::string &worker_id);
                               
    // 订阅操作
    Status Subscribe();
    
    // 轮询不同类型的消息
    Status PollError(...);
    Status PollLogs(...);
    Status PollActor(...);
    
    // 关闭订阅者
    Status Close();
    
    // 获取最后一批消息的大小
    int64_t last_batch_size();
};

关键特性

  • 支持多种消息类型:Actor\Job\NodeInfo\Error
  • 订阅管理: 支持订阅和取消订阅
  • 重试机制:带有retry和timeout

主要用途

  • 状态同步:在集群中同步各种组件的状态变化
  • 事件通知:通知系统中的重要事件(如节点失败、Actor创建等)
  • 错误传播:分发系统中的错误信息
  • 资源更新:发布资源使用情况的更新

使用实例

// 创建发布者
auto publisher = std::make_unique<GcsPublisher>(...);

// 发布 Actor 信息
publisher->PublishActor(actor_id, actor_data, [](Status status) {
    if (status.ok()) {
        // 发布成功
    }
});

// 创建订阅者
auto subscriber = std::make_unique<GcsSubscriber>(...);

// 订阅 Actor 更新
subscriber->SubscribeActor(actor_id, 
    [](const ActorID &id, const rpc::ActorTableData &data) {
        // 处理 Actor 更新
    },
    [](Status status) {
        // 处理订阅结果
    });

store_client

在这里插入图片描述

in_memory_store_client

这个内存存储客户端是 Ray 系统中的关键组件,提供高性能的内存数据存储功能,同时确保线程安全并支持异步操作。其设计遵循现代 C++ 的最佳实践,包括 RAII、智能指针和互斥锁等特性。

尽管 InMemoryStoreClient 是一个内存存储组件,在 Ray 系统中,它主要通过 GCS 服务器提供全局数据访问服务。它不仅支持本地通信,还通过 GCS 转发机制,实现分布式环境下的通信功能。

  • 设计特点:
    • 分层设计
      • 继承自 StoreClient 接口
      • 实现了所有必要的存储操作
    • 内存管理
      • 使用智能指针管理表对象
      • 自动清理不再使用的资源
    • 并发控制
      • 细粒度锁控制
      • 表级别的互斥访问
    • 回调机制
      • 使用 IO service 确保回调顺序
      • 支持异步操作完成通知
    • 性能优化
      • 使用哈希表实现快速查找
      • 细粒度锁减少竞争
      • 内存存储提供高性能访问
      • 异步操作避免阻塞
  • 使用示例
// 创建客户端
InMemoryStoreClient client(io_service);

// 异步存储数据
client.AsyncPut("table1", "key1", "value1", true, 
    [](bool success) {
        if (success) {
            // 写入成功
        }
    });

// 异步获取数据
client.AsyncGet("table1", "key1",
    [](const Status &status, const std::optional<std::string> &value) {
        if (status.ok() && value) {
            // 处理获取到的值
        }
    });
  • 典型使用流程
// 1. 组件向 GCS 写入数据
component->GcsClient()->AsyncPut("table", "key", "value");

// 2. GCS 将数据存储在内存中
gcs_server->memory_store_->AsyncPut("table", "key", "value");

// 3. 其他组件可以通过 GCS 访问数据
other_component->GcsClient()->AsyncGet("table", "key");

observable_store_client

这个类的实现主要是跟踪系统的一些状态metrics,包括

  • Actor 状态监控
  • 资源状态追踪
  • 任务状态更新

ObservableStoreClient 在 Ray 系统中扮演着重要角色:

  • 提供了一种机制来监控和响应系统状态变化
  • 支持组件间的松耦合通信
  • 实现了高效的状态同步机制
  • 便于实现复杂的依赖关系和状态管理

这种设计模式使得系统各组件能够及时响应状态变化,同时保持了良好的解耦性和可扩展性。由于这部分不是核心重点,就不展开分析了。

redis_store_client

RedisStoreClient 提供了一个可靠的分布式存储实现,适合作为 Ray 系统的持久化存储后端

  • 核心数据结构
class RedisStoreClient : public StoreClient {
private:
    // Redis 键的封装
    struct RedisKey {
        const std::string external_storage_namespace;
        const std::string table_name;
        std::string ToString() const;
    };

    // Redis 命令的封装
    struct RedisCommand {
        std::string command;          // 命令名称
        RedisKey redis_key;          // Redis 键
        std::vector<std::string> args;// 命令参数
    };

    // 并发控制键
    struct RedisConcurrencyKey {
        std::string table_name;
        std::string key;
    };
};
  • 并发控制
class RedisStoreClient {
private:
    // 互斥锁保护
    absl::Mutex mu_;

    // 每个键的待处理请求队列
    absl::flat_hash_map<RedisConcurrencyKey, 
                       std::queue<std::function<void()>>> 
        pending_redis_request_by_key_ ABSL_GUARDED_BY(mu_);

    // 将请求推入发送队列
    size_t PushToSendingQueue(
        const std::vector<RedisConcurrencyKey> &keys,
        std::function<void()> send_request);

    // 从发送队列取出请求
    std::vector<std::function<void()>> TakeRequestsFromSendingQueue(
        const std::vector<RedisConcurrencyKey> &keys);
}
  • Redis 操作封装
class RedisStoreClient {
private:
    // 发送 Redis 命令
    void SendRedisCmdWithKeys(
        std::vector<std::string> keys,
        RedisCommand command,
        RedisCallback redis_callback);

    // 批量获取值
    void MGetValues(
        const std::string &table_name,
        const std::vector<std::string> &keys,
        const MapCallback<std::string, std::string> &callback);
};
  • 扫描功能
class RedisStoreClient {
private:
    class RedisScanner {
        // 扫描 Redis 中的键和值
        static void ScanKeysAndValues(
            std::shared_ptr<RedisClient> redis_client,
            RedisKey redis_key,
            RedisMatchPattern match_pattern,
            MapCallback<std::string, std::string> callback);

    private:
        // 扫描状态
        std::optional<size_t> cursor_;
        absl::flat_hash_map<std::string, std::string> results_;
    };
};
  • 主要接口实现

    • 异步写入
    Status RedisStoreClient::AsyncPut(
        const std::string &table_name,
        const std::string &key,
        const std::string &data,
        bool overwrite,
        std::function<void(bool)> callback) {
        
        // 构造 Redis 命令
        RedisCommand cmd{
            overwrite ? "HSET" : "HSETNX",
            {external_storage_namespace_, table_name},
            {key, data}
        };
    
        // 发送命令
        SendRedisCmdWithKeys({key}, std::move(cmd), 
            [callback](const std::shared_ptr<CallbackReply> &reply) {
                callback(reply->IsNil());
            });
    }
    
    • 异步读取
    Status RedisStoreClient::AsyncGet(
        const std::string &table_name,
        const std::string &key,
        const OptionalItemCallback<std::string> &callback) {
        
        // 构造 Redis 命令
        RedisCommand cmd{
            "HGET",
            {external_storage_namespace_, table_name},
            {key}
        };
    
        // 发送命令
        SendRedisCmdWithKeys({key}, std::move(cmd),
            [callback](const std::shared_ptr<CallbackReply> &reply) {
                if (reply->IsNil()) {
                    callback(Status::OK(), std::nullopt);
                } else {
                    callback(Status::OK(), reply->AsString());
                }
            });
    }
    
  • 并发控制机制

// 1. 每个键维护一个请求队列
// 2. 请求按顺序处理
// 3. 多键操作需要等待所有键都可用
void RedisStoreClient::SendRedisCmdWithKeys(
    std::vector<std::string> keys,
    RedisCommand command,
    RedisCallback redis_callback) {
    
    auto send_request = [this, command, redis_callback]() {
        redis_client_->RunCommand(
            command.ToRedisArgs(),
            redis_callback);
    };

    std::vector<RedisConcurrencyKey> concurrency_keys;
    for (const auto &key : keys) {
        concurrency_keys.push_back({command.redis_key.table_name, key});
    }

    PushToSendingQueue(concurrency_keys, std::move(send_request));
}
  • 性能优化
// 批量操作
Status RedisStoreClient::AsyncMultiGet(...) {
    // 将请求分批处理
    const int batch_size = RayConfig::instance().maximum_gcs_storage_operation_batch_size();
    
    // 批量发送请求
    for (size_t i = 0; i < keys.size(); i += batch_size) {
        // 处理一批请求
    }
}

最后

认真看过ray的论文就会知道,并不是所有的task或者actor都会先发送到gcs中。当worker节点有足够的资源能够完成提交graph的执行的时候,local scheduler是不会也不需要借助gcs来完成跨节点计算的,这样的设计不得不说真的是太赞了。因为,大部分调度框架是不感知真实负载的,纯纯的就是一个资源分发器。这篇文章详细地分析了gcs的核心代码实现,如果你能看到这里,别忘了一键三联,Enjoy!

标签:std,Status,manager,const,gcs,GCS,源码,Ray
From: https://blog.csdn.net/weixin_43956669/article/details/144798769

相关文章

  • sysstat 源码编译安装与配置
    下载cd/usr/local/srcsudowgethttps://github.com/sysstat/sysstat/releases/download/v12.5.7/sysstat-12.5.7.tar.xz解压sudotar-xvfsysstat-12.5.7.tar.xzcdsysstat-12.5.7编译安装mkdir-p/usr/local/sysstatsudo./configure--prefix=/usr/local/sysstatss......
  • 爱心商城系统(源码+数据库+报告)
    一、项目介绍374.基于SpringBoot的爱心商城系统,系统包含两种角色:管理员、用户,系统分为前台和后台两大模块,主要功能如下。前台模块:-首页:展示平台的最新动态、热门商品、公益企业捐赠信息等内容。-论坛:用户可以在论坛上进行交流。-公告:展示平台发布的公告和通知信息。-......
  • 图书馆管理系统(源码+数据库+报告)
    一、项目介绍373基于SpringBoot的阿博图书馆管理系统,系统包含两种角色:用户、管理员,系统分为前台和后台两大模块,主要功能如下:1管理员功能模块-管理员登录  管理员通过填写用户名、密码、角色进行登录。-管理员功能界面  管理员登录后可以访问首页、个人中心、......
  • Wend看源码-Java-集合学习(Queue)
    概述   Wend看源码-Java-集合学习(List)-CSDN博客    Wend看源码-Java-集合学习(Set)-CSDN博客    在前两篇文章中,我们分别探讨了Java集合框架的父类以及List集合和Set集合的实现。接下来,本文将重点阐述Java中的Queue集合,包括其内部的数据结构以及核心方......
  • Wend看源码-Java-Map学习
    摘要        在当今的编程世界中,深入了解各类数据类型对于开发者而言至关重要。本篇聚焦于JDK21版本下,Java.util包所提供的Map类型。Map作为一种关键的数据结构,能够以键值对的形式高效存储和检索数据,广泛应用于众多领域。        本文将简要概述 Map......
  • 请手动实现Array.prototype.reduce的方法
    Array.prototype.reduce方法接收一个函数作为累加器(accumulator),数组中的每个值(从左到右)开始缩减,最终为一个值。以下是一个手动实现的reduce方法的示例:Array.prototype.myReduce=function(callback,initialValue){//如果没有提供初始值,则将数组的第一个元素作为初始......
  • 【java毕设 python毕设 大数据毕设】基于springboot的阳光幼儿管理系统的设计与实现
    ✍✍计算机编程指导师⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流!⚡⚡Java实战|SpringBoot/SSMPython实战项目|Django微信小程......
  • 【java毕设 python毕设 大数据毕设】基于springboot的银行信用卡额度管理系统的设计与
    ✍✍计算机编程指导师⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流!⚡⚡Java实战|SpringBoot/SSMPython实战项目|Django微信小程......
  • 毕业设计- springboot农产品电商平台 (案例分析)-附源码
    摘 要随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势;对于特色农产品电商平台当然也不能排除在外,随着网络技术的不断成熟,带动了特色农产品电商平台,它彻底改变了过去传统的管理方式,不仅使服务管理难度变低了,还提升了管理的灵活性。......
  • 基于Java+SSM+HTML5忘忧小区物业管理系统(源码+LW+调试文档+讲解等)/忘忧小区/物业管
    博主介绍......