首页 > 其他分享 >ceph-messenger模块代码走读(1)

ceph-messenger模块代码走读(1)

时间:2024-08-19 11:41:11浏览次数:23  
标签:const 走读 void entity ceph messenger ms type dispatcher

messenger代码走读

  1. messenger的使用

以mgr代码为例,看看messengrr的初始化和启动。

//构造函数,初始化一个client_messenger对象。
MgrStandby::MgrStandby(int argc, const char **argv) :
  Dispatcher(g_ceph_context),
  monc{g_ceph_context, poolctx},
  client_messenger(Messenger::create(
             g_ceph_context,
             cct->_conf.get_val<std::string>("ms_public_type").empty() ?
            cct->_conf.get_val<std::string>("ms_type") : cct->_conf.get_val<std::string>("ms_public_type"),
             entity_name_t::MGR(),
             "mgr",
             Messenger::get_pid_nonce()))
             ....
{}
//初始化,此过程中包含messenger对象的启停。
int MgrStandby::init()
{
  // Initialize Messenger
  client_messenger->add_dispatcher_tail(this);
  client_messenger->add_dispatcher_head(&objecter);
  client_messenger->add_dispatcher_tail(&client);
  client_messenger->start();

  poolctx.start(2);

  // Initialize MonClient
  if (monc.build_initial_monmap() < 0) {
    client_messenger->shutdown();
    client_messenger->wait();
    return -1;
  }

  monc.sub_want("mgrmap", 0, 0);

  monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
      |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
  monc.set_messenger(client_messenger.get());

  int r = monc.init();
  if (r < 0) {
    monc.shutdown();
    client_messenger->shutdown();
    client_messenger->wait();
    return r;
  }
  mgrc.init();
  client_messenger->add_dispatcher_tail(&mgrc);

  r = monc.authenticate();
  if (r < 0) {
    derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
    monc.shutdown();
    client_messenger->shutdown();
    client_messenger->wait();
    return r;
  }

  monc.set_passthrough_monmap();

  client_t whoami = monc.get_global_id();
  client_messenger->set_myname(entity_name_t::MGR(whoami.v));
  monc.set_log_client(&log_client);
  _update_log_config();
  objecter.set_client_incarnation(0);
  objecter.init();
  objecter.start();
  client.init();
  timer.init();

  py_module_registry.init();
  mgr_perf_start(g_ceph_context);

  tick();

  dout(4) << "Complete." << dendl;
  return 0;
}

  1. 初始化函数解析

从上文可以看到,messenger对象的初始化,调用了create函数。而mstype默认为async+posix,所以messenger对象均为asyncMessenger对象。

Messenger *Messenger::create(CephContext *cct, const std::string &type,
                 entity_name_t name, std::string lname, uint64_t nonce)
{
  if (type == "random" || type.find("async") != std::string::npos)
    return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
  lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
  return nullptr;
}
//global.yaml.in
- name: ms_type
  type: str
  level: advanced
  desc: Messenger implementation to use for network communication
  fmt_desc: Transport type used by Async Messenger. Can be ``async+posix``,
    ``async+dpdk`` or ``async+rdma``. Posix uses standard TCP/IP networking and is
    default. Other transports may be experimental and support may be limited.
  default: async+posix
  flags:
  - startup
  with_legacy: true
- name: ms_public_type
  type: str
  level: advanced
  desc: Messenger implementation to use for the public network
  long_desc: If not specified, use ms_type
  see_also:
  - ms_type
  flags:
  - startup
  with_legacy: true

下面看看asyncmessenger的初始化函数。如果默认的路径,transport_type就是posix。

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                               const std::string &type, std::string mname, uint64_t _nonce)
  : SimplePolicyMessenger(cct, name),
    dispatch_queue(cct, this, mname),
    nonce(_nonce)
{
  std::string transport_type = "posix";
  if (type.find("rdma") != std::string::npos)
    transport_type = "rdma";
  else if (type.find("dpdk") != std::string::npos)
    transport_type = "dpdk";

  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
    "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
  single->ready(transport_type);
  stack = single->stack.get();
  stack->start();
  local_worker = stack->get_worker();
  local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,
                     local_worker, true, true);
  init_local_connection();
  reap_handler = new C_handle_reap(this);
  unsigned processor_num = 1;
  if (stack->support_local_listen_table())
    processor_num = stack->get_num_worker();
  for (unsigned i = 0; i < processor_num; ++i)
    processors.push_back(new Processor(this, stack->get_worker(i), cct));
}

在mgr的init中,client_messenger将this、client、objecter都进行了add_dispatch_head/add_dispatch_tail。

  /**
   * Add a new Dispatcher to the front of the list. If you add
   * a Dispatcher which is already included, it will get a duplicate
   * entry. This will reduce efficiency but not break anything.
   */
  void add_dispatcher_head(Dispatcher *d) {
    bool first = dispatchers.empty();
    dispatchers.push_front(d);
    if (d->ms_can_fast_dispatch_any())
      fast_dispatchers.push_front(d);
    if (first)
      ready();
  }
  /**
   * Add a new Dispatcher to the end of the list. If you add
   * a Dispatcher which is already included, it will get a duplicate
   * entry. This will reduce efficiency but not break anything.
   */
  void add_dispatcher_tail(Dispatcher *d) {
    bool first = dispatchers.empty();
    dispatchers.push_back(d);
    if (d->ms_can_fast_dispatch_any())
      fast_dispatchers.push_back(d);
    if (first)
      ready();
  }

然后就是client_messenger对象的start

int AsyncMessenger::start()
{
  std::scoped_lock l{lock};
  ldout(cct,1) << __func__ << " start" << dendl;

  // register at least one entity, first!
  ceph_assert(my_name.type() >= 0);

  ceph_assert(!started);
  started = true;
  stopped = false;

  if (!did_bind) {
    entity_addrvec_t newaddrs = *my_addrs;
    for (auto& a : newaddrs.v) {
      a.nonce = nonce;
    }
    set_myaddrs(newaddrs);
    _init_local_connection();
  }

  return 0;
}
  1. messenger的组织结构

messenger作为msg的基类,主要负责绑定IP、转发请求等。

class Messenger {
private:
  // 请求分发处理器
  std::deque<Dispatcher*> dispatchers;
  std::deque<Dispatcher*> fast_dispatchers;
  ZTracer::Endpoint trace_endpoint;

public:
  //构造函数和初始化
  Messenger(CephContext *cct_, entity_name_t w);
  virtual ~Messenger() {}
  static Messenger *create(CephContext *cct,
                           const std::string &type,
                           entity_name_t name, std::string lname,
                           uint64_t nonce);
  static Messenger *create_client_messenger(CephContext *cct, std::string lname);

  //将 Dispatcher对象添加到messenger的dispatcher队列中。
  void add_dispatcher_head(Dispatcher *d) {
    bool first = dispatchers.empty();
    dispatchers.push_front(d);
    if (d->ms_can_fast_dispatch_any())
      fast_dispatchers.push_front(d);
    if (first)
      ready();
  }
  void add_dispatcher_tail(Dispatcher *d) {
    bool first = dispatchers.empty();
    dispatchers.push_back(d);
    if (d->ms_can_fast_dispatch_any())
      fast_dispatchers.push_back(d);
    if (first)
      ready();
  }
protected:

  /**
   * A courtesy function for Messenger implementations which
   * will be called when we receive our first Dispatcher.
   */
  virtual void ready() { }
    /**
   * Determine whether a message can be fast-dispatched. We will
   * query each Dispatcher in sequence to determine if they are
   * capable of handling a particular message via "fast dispatch".
   */
  bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
    for (const auto &dispatcher : fast_dispatchers) {
      if (dispatcher->ms_can_fast_dispatch2(m))
    return true;
    }
    return false;
  }
public:
  //绑定ip
  virtual int bind(const entity_addr_t& bind_addr) = 0;
  virtual int bindv(const entity_addrvec_t& addrs);
  virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; }
  virtual int client_bind(const entity_addr_t& bind_addr) = 0;

  //启动线程,可以接收、发送messages。
  virtual int start() { started = true; return 0; }
  //关闭线程
  virtual void wait() = 0;
  virtual int shutdown() { started = false; return 0; }
  //发送
  virtual int send_to(
    Message *m,
    int type,
    const entity_addrvec_t& addr) = 0;
  int send_to_mon(
    Message *m, const entity_addrvec_t& addrs) {
    return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
  }
  int send_to_mds(
    Message *m, const entity_addrvec_t& addrs) {
    return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
  }
  //建立连接
  virtual ConnectionRef connect_to(
    int type, const entity_addrvec_t& dest,
    bool anon=false, bool not_local_dest=false) = 0;
  ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
      bool anon=false, bool not_local_dest=false) {
    return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
      bool anon=false, bool not_local_dest=false) {
    return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
      bool anon=false, bool not_local_dest=false) {
    return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
      bool anon=false, bool not_local_dest=false) {
    return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
  }

protected:
  //快速分发
  //Deliver a single Message via "fast dispatch".
  void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
    m->set_dispatch_stamp(ceph_clock_now());
    for (const auto &dispatcher : fast_dispatchers) {
      if (dispatcher->ms_can_fast_dispatch2(m)) {
    dispatcher->ms_fast_dispatch2(m);
    return;
      }
    }
    ceph_abort();
  }
  void ms_fast_dispatch(Message *m) {
    return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
  }
  void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
    for (const auto &dispatcher : fast_dispatchers) {
      dispatcher->ms_fast_preprocess2(m);
    }
  }
  /**
   *  Deliver a single Message. Send it to each Dispatcher
   *  in sequence until one of them handles it.
   *  If none of our Dispatchers can handle it, ceph_abort().
   */
  void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
    m->set_dispatch_stamp(ceph_clock_now());
    for (const auto &dispatcher : dispatchers) {
      if (dispatcher->ms_dispatch2(m))
    return;
    }
    ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
  }
  void ms_deliver_dispatch(Message *m) {
    return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
  }
  //暂时没看懂
  /**
   * Notify each Dispatcher of a new Connection. Call
   * this function whenever a new Connection is initiated or
   * reconnects.
   */
  void ms_deliver_handle_connect(Connection *con) {
    for (const auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_connect(con);
    }
  }

  void ms_deliver_handle_fast_connect(Connection *con) {
    for (const auto& dispatcher : fast_dispatchers) {
      dispatcher->ms_handle_fast_connect(con);
    }
  }
 //暂时没看懂
  void ms_deliver_handle_accept(Connection *con) {
    for (const auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_accept(con);
    }
  }

  void ms_deliver_handle_fast_accept(Connection *con) {
    for (const auto& dispatcher : fast_dispatchers) {
      dispatcher->ms_handle_fast_accept(con);
    }
  }

  void ms_deliver_handle_reset(Connection *con) {
    for (const auto& dispatcher : dispatchers) {
      if (dispatcher->ms_handle_reset(con))
    return;
    }
  }

  void ms_deliver_handle_remote_reset(Connection *con) {
    for (const auto& dispatcher : dispatchers) {
      dispatcher->ms_handle_remote_reset(con);
    }
  }

  void ms_deliver_handle_refused(Connection *con) {
    for (const auto& dispatcher : dispatchers) {
      if (dispatcher->ms_handle_refused(con))
        return;
    }
  }
};
  1. asyncmessenger的组织结构

SimplePolicyMessenger继承自Messenger,实现了简单的set/get policy.

asyncmessenger继承SimplePolicyMessenger,实习了大部分函数。


class SimplePolicyMessenger : public Messenger
{
private:
  ceph::mutex policy_lock =
    ceph::make_mutex("SimplePolicyMessenger::policy_lock");
  ceph::net::PolicySet<Throttle> policy_set;

public:

  SimplePolicyMessenger(CephContext *cct, entity_name_t name): Messenger(cct, name){}

  Policy get_policy(int t) override {
    std::lock_guard l{policy_lock};
    return policy_set.get(t);
  }

  Policy get_default_policy() override {
    std::lock_guard l{policy_lock};
    return policy_set.get_default();
  }

  void set_default_policy(Policy p) override {
    std::lock_guard l{policy_lock};
    policy_set.set_default(p);
  }

  void set_policy(int type, Policy p) override {
    std::lock_guard l{policy_lock};
    policy_set.set(type, p);
  }

  void set_policy_throttlers(int type,
                 Throttle* byte_throttle,
                 Throttle* msg_throttle) override {
    std::lock_guard l{policy_lock};
    policy_set.set_throttlers(type, byte_throttle, msg_throttle);
  }

}; /* SimplePolicyMessenger */

class AsyncMessenger : public SimplePolicyMessenger {
public:

  AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
                 std::string mname, uint64_t _nonce);
  ~AsyncMessenger() override;

  bool set_addr_unknowns(const entity_addrvec_t &addr) override;
  void set_addrs(const entity_addrvec_t &addrs) override;

  //获取队列长度
  int get_dispatch_queue_len() override {
    return dispatch_queue.get_queue_len();
  }

  double get_dispatch_queue_max_age(utime_t now) override {
    return dispatch_queue.get_max_age(now);
  }

  void set_cluster_protocol(int p) override {
    ceph_assert(!started && !did_bind);
    cluster_protocol = p;
  }

  //实现父类函数
  int bind(const entity_addr_t& bind_addr) override;
  int rebind(const std::set<int>& avoid_ports) override;
  int bindv(const entity_addrvec_t& bind_addrs) override;
  int client_bind(const entity_addr_t& bind_addr) override;
  int client_reset() override;
  bool should_use_msgr2() override;
  int start() override;
  void wait() override;
  int shutdown() override;
  int send_to(Message *m, int type, const entity_addrvec_t& addrs) override;

  ConnectionRef connect_to(int type,
               const entity_addrvec_t& addrs,
               bool anon, bool not_local_dest=false) override;
  ConnectionRef get_loopback_connection() override;
  void mark_down(const entity_addr_t& addr) override {
    mark_down_addrs(entity_addrvec_t(addr));
  }
  void mark_down_addrs(const entity_addrvec_t& addrs) override;
  void mark_down_all() override {
    shutdown_connections(true);
  }

protected:
  //实现父类
  void ready() override;

private:
  //创建连接
  AsyncConnectionRef create_connect(const entity_addrvec_t& addrs, int type, bool anon);

  void _finish_bind(const entity_addrvec_t& bind_addrs, const entity_addrvec_t& listen_addrs);

  entity_addrvec_t _filter_addrs(const entity_addrvec_t& addrs);

 private:
  //以下暂时没搞懂
  NetworkStack *stack;
  std::vector<Processor*> processors;
  friend class Processor;
  DispatchQueue dispatch_queue;

  /**
   * hash map of addresses to Asyncconnection
   *
   * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
   * invalid and can be replaced by anyone holding the msgr lock
   */
  ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns;

  void _init_local_connection() {
    ceph_assert(ceph_mutex_is_locked(lock));
    local_connection->peer_addrs = *my_addrs;
    local_connection->peer_type = my_name.type();
    local_connection->set_features(CEPH_FEATURES_ALL);
    ms_deliver_handle_fast_connect(local_connection.get());
  }

  void shutdown_connections(bool queue_reset);

public:

  /// con used for sending messages to ourselves
  AsyncConnectionRef local_connection;

  /**
   * This wraps _lookup_conn.
   */
  AsyncConnectionRef lookup_conn(const entity_addrvec_t& k) {
    std::lock_guard l{lock};
    return _lookup_conn(k); /* make new ref! */
  }

  int accept_conn(const AsyncConnectionRef& conn);
  bool learned_addr(const entity_addr_t &peer_addr_for_me);
  void add_accept(Worker *w, ConnectedSocket cli_socket,
          const entity_addr_t &listen_addr,
          const entity_addr_t &peer_addr);
  NetworkStack *get_stack() {
    return stack;
  }

  /**
   * Fill in the address and peer type for the local connection, which
   * is used for delivering messages back to ourself.
   */
  void init_local_connection() {
    std::lock_guard l{lock};
    local_connection->is_loopback = true;
    _init_local_connection();
  }

  /**
   * Unregister connection from `conns`
   */
  void unregister_conn(const AsyncConnectionRef& conn) {
    std::lock_guard l{deleted_lock};
    deleted_conns.emplace(std::move(conn));
    conn->unregister();

    if (deleted_conns.size() >= cct->_conf->ms_async_reap_threshold) {
      local_worker->center.dispatch_event_external(reap_handler);
    }
  }

  /**
   * Reap dead connection from `deleted_conns`
   */
  void reap_dead();

} ;

标签:const,走读,void,entity,ceph,messenger,ms,type,dispatcher
From: https://www.cnblogs.com/whutao/p/18366995

相关文章

  • librados代码走读(1)
    本文内容本文主要介绍librados中关于C++部分的接口API调用。但并未深入到如何实现,有助于浅尝辄止的大概了解。RadosClient.h和RadosClient.cc是用于初始化RadosClient对象,一般用于客户端进行访问,主要操作:连接、断链存储集群intping_monitor(std::stringmon_id,std:......
  • 探索 Kubernetes 持久化存储之 Rook Ceph 初窥门径
    在Kubernetes生态系统中,持久化存储是支撑业务应用稳定运行的基石,对于维护整个系统的健壮性至关重要。对于选择自主搭建Kubernetes集群的运维架构师来说,挑选合适的后端持久化存储解决方案是关键的选型决策。目前,Ceph、GlusterFS、NFS、Longhorn和openEBS等解决方案已在业界......
  • ceph如何进行数据的读写(2)
    本章摘要上文说到,librados/IoctxImpl.cc中调用objecter_op和objecter的op_submit函数,进行op请求的封装、加参和提交。本文详细介绍相关函数的调用。osdc中的操作初始化Op对象,提交请求设置Op对象的时间,oid,操作类型等信息。//osdc/Objector.h//mid-levelhelpersOp*pr......
  • ceph如何进行数据的读写(3)
    本章摘要上文说到,osdc中封装请求,使用message中的相关机制将请求发送出去。本文详细介绍osd服务端如何进行请求的接收。osd初始化osd启动时,定义了message变量ms_public,该变量绑定public网络,负责接收客户端的请求。ms_public会启动对应的线程进行接收,并指定接收函数。//ceph_......
  • ceph如何进行数据的读写(1)
    版本ceph版本为17.ceph如何进行读写接口的实现Ceph的客户端通过librados的接口进行集群的访问,这里的访问包括:1)对集群的整体访问2)对象的访问两类接口,这套接口(API)包括C、C++和Python的实现,接口通过网络实现对Ceph集群的访问。在客户端层面,可以在自己的程序中调用该接口,从而集......
  • Ceph介绍
    1. Ceph简介Ceph是一种开源的分布式存储系统,它旨在提供高性能、高可靠性和可伸缩性的存储解决方案。Ceph作为一个软件定义存储(SDS)系统可以在通用硬件上运行,并支持多种存储类型,包括对象存储、块存储和文件系统。Ceph从2006年开源至今,一直是主流的分布式存储系统,已在OpenStac......
  • ceph分布式存储系统
    cephceph是一个开源的,用c++语言编写的分布式的存储系统,存储文件数据。lvm逻辑卷可以扩容raid磁盘阵列高可用基于物理意义上的存储系统分布式就是多台物理磁盘组成的一个集群,在这个基础之后实现高可用,扩展特点:1、远程访问2、多个服务器组成的虚拟硬盘3、分布式......
  • 挂载Ceph文件系统以及Ceph存储三副本特性展示
    创建文件系统cephfsvolumecreatecephfs 挂载CephFS的常规先决条件为客户端主机生成最小的conf文件并将其放在标准位置:mkdir-p-m755/etc/cephssh{user}@{mon-host}"sudocephconfiggenerate-minimal-conf"|sudotee/etc/ceph/ceph.conf确保conf具......
  • Ceph 文件系统
    创建存储池Ceph文件系统至少需要两个RADOS存储池,一个用于存储数据,一个用于存储元数据。cephosdpoolcreatecephfs_datacephosdpoolcreatecephfs_metadata创建文件系统创建池后,可以使用以下命令启用文件系统:fs new$cephfsnewcephfscephfs_metadatacephfs......
  • 【ceph】手动编译14.2.22 ceph版本---超详细版本,生产可用
      本站以分享各种运维经验和运维所需要的技能为主《python零基础入门》:python零基础入门学习《python运维脚本》: python运维脚本实践《shell》:shell学习《terraform》持续更新中:terraform_Aws学习零基础入门到最佳实战《k8》暂未更新《docker学习》暂未更新《ceph学......