首页 > 其他分享 >ceph如何进行数据的读写(2)

ceph如何进行数据的读写(2)

时间:2024-08-13 11:50:06浏览次数:13  
标签:get cct 读写 osd ceph tid 数据 event op

本章摘要

上文说到,librados/IoctxImpl.cc中调用objecter_op和objecter的op_submit函数,进行op请求的封装、加参和提交。
本文详细介绍相关函数的调用。

osdc中的操作

初始化Op对象,提交请求

设置Op对象的时间,oid,操作类型等信息。

//osdc/Objector.h
  // mid-level helpers
  Op *prepare_mutate_op(
    const object_t& oid, const object_locator_t& oloc,
    ObjectOperation& op, const SnapContext& snapc,
    ceph::real_time mtime, int flags,
    Context *oncommit, version_t *objver = NULL,
    osd_reqid_t reqid = osd_reqid_t(),
    ZTracer::Trace *parent_trace = nullptr) {
    Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
           CEPH_OSD_FLAG_WRITE, oncommit, objver,
           nullptr, parent_trace);
    o->priority = op.priority;
    o->mtime = mtime;
    o->snapc = snapc;
    o->out_rval.swap(op.out_rval);
    o->out_bl.swap(op.out_bl);
    o->out_handler.swap(op.out_handler);
    o->out_ec.swap(op.out_ec);
    o->reqid = reqid;
    op.clear();
    return o;
  }
////osdc/Objector.cc
// read | write ---------------------------
void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
{
  shunique_lock rl(rwlock, ceph::acquire_shared);
  ceph_tid_t tid = 0;
  if (!ptid)
    ptid = &tid;
  op->trace.event("op submit");
  _op_submit_with_budget(op, rl, ptid, ctx_budget);
}
//
void Objecter::_op_submit_with_budget(Op *op,
                      shunique_lock<ceph::shared_mutex>& sul,
                      ceph_tid_t *ptid,
                      int *ctx_budget)
{

  // throttle.  before we look at any state, because
  // _take_op_budget() may drop our lock while it blocks.
  if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
    int op_budget = _take_op_budget(op, sul);
    // take and pass out the budget for the first OP
    // in the context session
    if (ctx_budget && (*ctx_budget == -1)) {
      *ctx_budget = op_budget;
    }
  }

  if (osd_timeout > timespan(0)) {
    if (op->tid == 0)
      op->tid = ++last_tid;
    auto tid = op->tid;
    op->ontimeout = timer.add_event(osd_timeout,
                    [this, tid]() {
                      op_cancel(tid, -ETIMEDOUT); });
  }

  _op_submit(op, sul, ptid);
}

计算需要提交到哪个osd上,并建立连接。便于后续发送请求至对应的osd上

void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
{
  // rwlock is locked

  ldout(cct, 10) << __func__ << " op " << op << dendl;

  // pick target
  ceph_assert(op->session == NULL);
  OSDSession *s = NULL;

  bool check_for_latest_map = false;
  //计算需要提交到哪个osd上。
  int r = _calc_target(&op->target, nullptr);
  switch(r) {
  case RECALC_OP_TARGET_POOL_DNE:
    check_for_latest_map = true;
    break;
  case RECALC_OP_TARGET_POOL_EIO:
    if (op->has_completion()) {
      op->complete(osdc_errc::pool_eio, -EIO);
    }
    return;
  }

  // Try to get a session, including a retry if we need to take write lock
  //根据osd号,建立session连接
  r = _get_session(op->target.osd, &s, sul);

  _send_op_account(op);

  // send?

  ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));

  bool need_send = false;
  if (op->target.paused) {
    ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
           << dendl;
    _maybe_request_map();
  } else if (!s->is_homeless()) {
    need_send = true;
  } else {
    _maybe_request_map();
  }

  unique_lock sl(s->lock);
  if (op->tid == 0)
    op->tid = ++last_tid;

  _session_op_assign(s, op);
  
  //发送请求
  if (need_send) {
    _send_op(op);
  }

  // Last chance to touch Op here, after giving up session lock it can
  // be freed at any time by response handler.
  ceph_tid_t tid = op->tid;
  if (check_for_latest_map) {
    _send_op_map_check(op);
  }
  if (ptid)
    *ptid = tid;
  op = NULL;

  sl.unlock();
  put_session(s);

  ldout(cct, 5) << num_in_flight << " in flight" << dendl;
}

根据osd的序号,即osd id。建立session信息

/**
 * Look up OSDSession by OSD id.
 *
 * @returns 0 on success, or -EAGAIN if the lock context requires
 * promotion to write.
 */
int Objecter::_get_session(int osd, OSDSession **session,
               shunique_lock<ceph::shared_mutex>& sul)
{
  auto s = new OSDSession(cct, osd);
  osd_sessions[osd] = s;
  s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
  s->con->set_priv(RefCountedPtr{s});
  logger->inc(l_osdc_osd_session_open);
  logger->set(l_osdc_osd_sessions, osd_sessions.size());
  s->get();
  *session = s;
  ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
         << s->get_nref() << dendl;
  return 0;
}

构建osdop的message,发送请求

void Objecter::_send_op(Op *op)
{
  // rwlock is locked
  // op->session->lock is locked

  // backoff?
  auto p = op->session->backoffs.find(op->target.actual_pgid);
  if (p != op->session->backoffs.end()) {
    hobject_t hoid = op->target.get_hobj();
    auto q = p->second.lower_bound(hoid);
    if (q != p->second.begin()) {
      --q;
      if (hoid >= q->second.end) {
        ++q;
      }
    }
    if (q != p->second.end()) {
      ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
             << "," << q->second.end << ")" << dendl;
      int r = cmp(hoid, q->second.begin);
      if (r == 0 || (r > 0 && hoid < q->second.end)) {
        ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
               << " id " << q->second.id << " on " << hoid
               << ", queuing " << op << " tid " << op->tid << dendl;
        return;
      }
    }
  }

  ceph_assert(op->tid > 0);
  //构建MOSDOp对象,Message OSD OP。
  MOSDOp *m = _prepare_osd_op(op);

  if (op->target.actual_pgid != m->get_spg()) {
    ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
           << m->get_spg() << " to " << op->target.actual_pgid
           << ", updating and reencoding" << dendl;
    m->set_spg(op->target.actual_pgid);
    m->clear_payload();  // reencode
  }

  ldout(cct, 15) << "_send_op " << op->tid << " to "
         << op->target.actual_pgid << " on osd." << op->session->osd
         << dendl;

  ConnectionRef con = op->session->con;
  ceph_assert(con);

  op->incarnation = op->session->incarnation;

  if (op->trace.valid()) {
    m->trace.init("op msg", nullptr, &op->trace);
  }
  op->session->con->send_message(m);
}

构建MOSDOp,加参数。设置必要参数

Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op)
{
  // rwlock is locked

  int flags = op->target.flags;
  flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
  flags |= CEPH_OSD_FLAG_SUPPORTSPOOLEIO;

  // Nothing checks this any longer, but needed for compatibility with
  // pre-luminous osds
  flags |= CEPH_OSD_FLAG_ONDISK;

  if (!honor_pool_full)
    flags |= CEPH_OSD_FLAG_FULL_FORCE;

  op->target.paused = false;
  op->stamp = ceph::coarse_mono_clock::now();

  hobject_t hobj = op->target.get_hobj();
  auto m = new MOSDOp(client_inc, op->tid,
              hobj, op->target.actual_pgid,
              osdmap->get_epoch(),
              flags, op->features);

  m->set_snapid(op->snapid);
  m->set_snap_seq(op->snapc.seq);
  m->set_snaps(op->snapc.snaps);

  m->ops = op->ops;
  m->set_mtime(op->mtime);
  m->set_retry_attempt(op->attempts++);

  if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
    op->trace.init("op", &trace_endpoint);
  }

  if (op->priority)
    m->set_priority(op->priority);
  else
    m->set_priority(cct->_conf->osd_client_op_priority);

  if (op->reqid != osd_reqid_t()) {
    m->set_reqid(op->reqid);
  }

  logger->inc(l_osdc_op_send);
  ssize_t sum = 0;
  for (unsigned i = 0; i < m->ops.size(); i++) {
    sum += m->ops[i].indata.length();
  }
  logger->inc(l_osdc_op_send_bytes, sum);

  return m;
}

发送请求

int AsyncConnection::send_message(Message *m)
{
  FUNCTRACE(async_msgr->cct);
  lgeneric_subdout(async_msgr->cct, ms,
           1) << "-- " << async_msgr->get_myaddrs() << " --> "
              << get_peer_addrs() << " -- "
              << *m << " -- " << m << " con "
              << this
              << dendl;

  if (is_blackhole()) {
    lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
      << " blackhole " << *m << dendl;
    m->put();
    return 0;
  }

  // optimistic think it's ok to encode(actually may broken now)
  if (!m->get_priority())
    m->set_priority(async_msgr->get_default_send_priority());

  m->get_header().src = async_msgr->get_myname();
  m->set_connection(this);

#if defined(WITH_EVENTTRACE)
  if (m->get_type() == CEPH_MSG_OSD_OP)
    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
  else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
    OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
#endif

  if (is_loopback) { //loopback connection
    ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
    std::lock_guard<std::mutex> l(write_lock);
    if (protocol->is_connected()) {
      dispatch_queue->local_delivery(m, m->get_priority());
    } else {
      ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
                                 << " Drop message " << m << dendl;
      m->put();
    }
    return 0;
  }

  // we don't want to consider local message here, it's too lightweight which
  // may disturb users
  logger->inc(l_msgr_send_messages);

  protocol->send_message(m);
  return 0;
}

在初始化中,实际选择的是ProtocolV2,该类继承自Protocol。有V1和V2两种。

void ProtocolV2::send_message(Message *m) {
  uint64_t f = connection->get_features();

  // TODO: Currently not all messages supports reencode like MOSDMap, so here
  // only let fast dispatch support messages prepare message
  const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
  if (can_fast_prepare) {
    prepare_send_message(f, m);
  }

  std::lock_guard<std::mutex> l(connection->write_lock);
  bool is_prepared = can_fast_prepare;
  // "features" changes will change the payload encoding
  if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
    // ensure the correctness of message encoding
    m->clear_payload();
    is_prepared = false;
    ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
                   << " != " << connection->get_features() << dendl;
  }
  if (state == CLOSED) {
    ldout(cct, 10) << __func__ << " connection closed."
                   << " Drop message " << m << dendl;
    m->put();
  } else {
    ldout(cct, 5) << __func__ << " enqueueing message m=" << m
                  << " type=" << m->get_type() << " " << *m << dendl;
    m->queue_start = ceph::mono_clock::now();
    m->trace.event("async enqueueing message");
    out_queue[m->get_priority()].emplace_back(
      out_queue_entry_t{is_prepared, m});
    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                   << dendl;
    if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
      write_in_progress = true;
      connection->center->dispatch_event_external(connection->write_handler);
    }
  }
}

请求发送线程处理

void EventCenter::dispatch_event_external(EventCallbackRef e)
{
  uint64_t num = 0;
  {
    std::lock_guard lock{external_lock};
    if (external_num_events > 0 && *external_events.rbegin() == e) {
      return;
    }
    external_events.push_back(e);
    num = ++external_num_events;
  }
  if (num == 1 && !in_thread())
    wakeup();

  ldout(cct, 30) << __func__ << " " << e << " pending " << num << dendl;
}
std::function<void ()> NetworkStack::add_thread(Worker* w)
{
  return [this, w]() {
      rename_thread(w->id);
      const unsigned EventMaxWaitUs = 30000000;
      w->center.set_owner();
      ldout(cct, 10) << __func__ << " starting" << dendl;
      w->initialize();
      w->init_done();
      while (!w->done) {
        ldout(cct, 30) << __func__ << " calling event process" << dendl;

        ceph::timespan dur;
        int r = w->center.process_events(EventMaxWaitUs, &dur);
        if (r < 0) {
          ldout(cct, 20) << __func__ << " process events failed: "
                         << cpp_strerror(errno) << dendl;
          // TODO do something?
        }
        w->perf_logger->tinc(l_msgr_running_total_time, dur);
      }
      w->reset();
      w->destroy();
  };
}
int EventCenter::process_events(unsigned timeout_microseconds,  ceph::timespan *working_dur)
{
  struct timeval tv;
  int numevents;
  bool trigger_time = false;
  auto now = clock_type::now();
  clock_type::time_point end_time = now + std::chrono::microseconds(timeout_microseconds);

  auto it = time_events.begin();
  if (it != time_events.end() && end_time >= it->first) {
    trigger_time = true;
    end_time = it->first;

    if (end_time > now) {
      timeout_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end_time - now).count();
    } else {
      timeout_microseconds = 0;
    }
  }

  bool blocking = pollers.empty() && !external_num_events.load();
  if (!blocking)
    timeout_microseconds = 0;
  tv.tv_sec = timeout_microseconds / 1000000;
  tv.tv_usec = timeout_microseconds % 1000000;

  ldout(cct, 30) << __func__ << " wait second " << tv.tv_sec << " usec " << tv.tv_usec << dendl;
  std::vector<FiredFileEvent> fired_events;
  numevents = driver->event_wait(fired_events, &tv);
  auto working_start = ceph::mono_clock::now();
  for (int event_id = 0; event_id < numevents; event_id++) {
    int rfired = 0;
    FileEvent *event;
    EventCallbackRef cb;
    event = _get_file_event(fired_events[event_id].fd);

    /* note the event->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    if (event->mask & fired_events[event_id].mask & EVENT_READABLE) {
      rfired = 1;
      cb = event->read_cb;
      cb->do_request(fired_events[event_id].fd);
    }

    if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) {
      if (!rfired || event->read_cb != event->write_cb) {
        cb = event->write_cb;
        cb->do_request(fired_events[event_id].fd);
      }
    }

    ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd
                   << " mask is " << fired_events[event_id].mask << dendl;
  }

  if (trigger_time)
    numevents += process_time_events();

  if (external_num_events.load()) {
    external_lock.lock();
    std::deque<EventCallbackRef> cur_process;
    cur_process.swap(external_events);
    external_num_events.store(0);
    external_lock.unlock();
    numevents += cur_process.size();
    while (!cur_process.empty()) {
      EventCallbackRef e = cur_process.front();
      ldout(cct, 30) << __func__ << " do " << e << dendl;
      e->do_request(0);
      cur_process.pop_front();
    }
  }

  if (!numevents && !blocking) {
    for (uint32_t i = 0; i < pollers.size(); i++)
      numevents += pollers[i]->poll();
  }

  if (working_dur)
    *working_dur = ceph::mono_clock::now() - working_start;
  return numevents;
}

标签:get,cct,读写,osd,ceph,tid,数据,event,op
From: https://www.cnblogs.com/whutao/p/18356577

相关文章

  • H5 html单页面实现对接接口,获取接口数据
    一、AJAX的一种实现方式,XMLHttpRequestvarxhr=newXMLHttpRequest();xhr.open("POST","你的接口URL",true);xhr.setRequestHeader("Content-Type","application/json;charset=UTF-8");//准备发送的数据vardata=JSON.stringif......
  • ceph如何进行数据的读写(3)
    本章摘要上文说到,osdc中封装请求,使用message中的相关机制将请求发送出去。本文详细介绍osd服务端如何进行请求的接收。osd初始化osd启动时,定义了message变量ms_public,该变量绑定public网络,负责接收客户端的请求。ms_public会启动对应的线程进行接收,并指定接收函数。//ceph_......
  • ceph如何进行数据的读写(1)
    版本ceph版本为17.ceph如何进行读写接口的实现Ceph的客户端通过librados的接口进行集群的访问,这里的访问包括:1)对集群的整体访问2)对象的访问两类接口,这套接口(API)包括C、C++和Python的实现,接口通过网络实现对Ceph集群的访问。在客户端层面,可以在自己的程序中调用该接口,从而集......
  • 图数据库在社交网络分析中的应用
    图数据库在社交网络分析中的应用随着互联网的飞速发展,社交网络已成为人们日常生活中不可或缺的一部分。这些平台不仅连接了数以亿计的用户,还生成了海量的、高度互连的数据。如何有效地处理和分析这些数据,以理解用户行为、优化用户体验、提升平台价值,成为了一个重要的研究课题......
  • KingbaseES RAC运维案例之---集群及数据库管理
    案例说明:KingbaseESRAC在部署完成后,进行日常的集群及数据库管理。适用版本:KingbaseESV008R006C008M030B0010操作系统版本:[root@node201KingbaseHA]#cat/etc/centos-releaseCentOSLinuxrelease7.9.2009(Core)集群架构:如下所示,node1和node2为集群节点:节点信息:......
  • 信号处理卡 数据收发卡设计方案:428-基于XC7Z100+ADRV9009的双收双发无线电射频板卡 5G
    数据收发卡设计方案:428-基于XC7Z100+ADRV9009的双收双发无线电射频板卡5G小基站无线图传基于XC7Z100+ADRV9009的双收双发无线电射频板卡一、板卡概述        基于XC7Z100+ADRV9009的双收双发无线电射频板卡是基于Xilinx ZYNQ FPGA和ADI的无线收发芯片ADRV90......
  • 基于Dango+微信小程序的广西东盟旅游资源信息管理系统+80003(免费领源码)可做计算机毕业
    django广西-东盟旅游资源信息管理系统小程序摘 要在社会快速发展和人们生活水平提高的影响下,旅游产业蓬勃发展,旅游形式也变得多样化,使旅游资源信息的管理变得比过去更加困难。依照这一现实为基础,设计一个快捷而又方便的基于小程序的旅游资源信息管理系统是一项十分重要并且......
  • 【YashanDB数据库】Yashandb表闪回业务表实践
    数据误删除DELETE操作闪回示例(HEAP表)基于闪回查询(建议):select*fromsales.branches1;BRANCH_NOBRANCH_NAMEAREA_NOADDRESSEMPLOYEE_COUNT------------------------------------------------------------------------------------------------------------------......
  • Keras图形数据增强
    在《跟着迪哥学Python数据分析与机器学习实战》这书中提到Keras图像数据增强函数ImageDataGenerator:查看官方API却发现不存在,因为现在Keras都推荐使用版本3了。但是预处理层还是保留了,此时可以使用如下一些层的函数:Resizing层,调整层。版本2对应的是https://keras.io/2.16......
  • 【YashanDB数据库】yasboot查询数据库状态时显示数据库状态为off
    【问题现象】yasbootclusterstatus-cyashandb显示数据库状态为off与数据库实际的状态不符,如下图【问题分类】yasboot、yasdb使用问题【关键字】yasboot,数据库运行状态为off【问题描述】yasboot工具显示的状态【问题原因分析】发现是备库启动数据库方式不标准,即使用noh......