首页 > 编程语言 >cpp-libp2p源码阅读笔记:(一)cpp-libp2p/src/basic

cpp-libp2p源码阅读笔记:(一)cpp-libp2p/src/basic

时间:2023-02-13 15:24:20浏览次数:40  
标签:std return buffer cb 源码 auto cpp libp2p size

 1 #include <libp2p/basic/message_read_writer_bigendian.hpp>
 2 
 3 #include <vector>
 4 
 5 #include <arpa/inet.h>
 6 #include <boost/assert.hpp>
 7 #include <libp2p/basic/message_read_writer_error.hpp>
 8 #include <libp2p/common/byteutil.hpp>
 9 
10 namespace libp2p::basic {
11   MessageReadWriterBigEndian::MessageReadWriterBigEndian(
12       std::shared_ptr<ReadWriter> conn)
13       : conn_{std::move(conn)} {
14     BOOST_ASSERT(conn_ != nullptr);
15   }
16 
17   void MessageReadWriterBigEndian::read(ReadCallbackFunc cb) {
18     auto buffer = std::make_shared<std::vector<uint8_t>>();
19     buffer->resize(kLenMarkerSize);
20     conn_->read(
21         *buffer, kLenMarkerSize,
22         [self{shared_from_this()}, buffer, cb{std::move(cb)}](auto &&result) {
23           if (not result) {
24             return cb(result.error());
25           }
26           uint32_t msg_len = ntohl(  // NOLINT
27               common::convert<uint32_t>(buffer->data()));
28           buffer->resize(msg_len);
29           std::fill(buffer->begin(), buffer->end(), 0u);
30           self->conn_->read(*buffer, msg_len,
31                             [self, buffer, cb](auto &&result) {
32                               if (not result) {
33                                 return cb(result.error());
34                               }
35                               cb(buffer);
36                             });
37         });
38   }
39 
40   void MessageReadWriterBigEndian::write(gsl::span<const uint8_t> buffer,
41                                          Writer::WriteCallbackFunc cb) {
42     if (buffer.empty()) {
43       // TODO(107): Reentrancy
44       return cb(MessageReadWriterError::BUFFER_IS_EMPTY);
45     }
46 
47     std::vector<uint8_t> raw_buf;
48     raw_buf.reserve(kLenMarkerSize + buffer.size());
49     common::putUint32BE(raw_buf, buffer.size());
50     raw_buf.insert(raw_buf.end(), buffer.begin(), buffer.end());
51     conn_->write(raw_buf, raw_buf.size(),
52                  [self{shared_from_this()}, cb{std::move(cb)}](auto &&result) {
53                    if (not result) {
54                      return cb(result.error());
55                    }
56                    cb(result.value() - self->kLenMarkerSize);
57                  });
58   }
59 }  // namespace libp2p::basic

这段代码实现了一个名为 MessageReadWriterBigEndian 的 C++ 类,它的作用是实现基于大端字节序的消息读写。

类的构造函数接收一个 std::shared_ptr<ReadWriter> 类型的参数,用于将它存储在内部变量 conn_ 中。

类实现了两个公共方法:

  • read:该方法实现了读取消息的功能。它首先从 conn_ 对象读取 kLenMarkerSize 大小的数据,这是消息长度标识。之后,使用读取到的长度值,在从 conn_ 对象读取消息数据,最后调用回调函数 cb。
  • write:该方法实现了写入消息的功能。它首先判断待写入的消息是否为空,如果为空则直接返回。否则,创建一个向量 raw_buf,该向量用于存储消息长度标识和消息数据。在使用 conn_ 对象写入 raw_buf 向量中的数据,并调用回调函数 cb。

其中,kLenMarkerSize 常量定义了消息长度标识的大小。在代码中使用了 ntohl 和 common::putUint32BE 函数,用于将整数值转换为大端字节序的二进制表示。

 1 #include "libp2p/basic/message_read_writer_error.hpp"
 2 
 3 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, MessageReadWriterError, e) {
 4   using E = libp2p::basic::MessageReadWriterError;
 5   switch (e) {
 6     case E::SUCCESS:
 7       return "success";
 8     case E::BUFFER_IS_EMPTY:
 9       return "empty buffer provided";
10     case E::VARINT_EXPECTED:
11       return "varint expected at the beginning of Protobuf message";
12     case E::INTERNAL_ERROR:
13       return "internal error happened";
14   }
15   return "unknown error";
16 }

这段代码定义了一个错误类别:libp2p::basic::MessageReadWriterError。它使用了 Outcome 库来处理错误,它是一个 C++ 库,用于方便地返回、处理和报告操作的结果。

OUTCOME_CPP_DEFINE_CATEGORY的定义参见:cpp-libp2p/include/libp2p/outcome/outcome-register.hpp。

 1 #include <libp2p/basic/message_read_writer_uvarint.hpp>
 2 
 3 #include <vector>
 4 
 5 #include <boost/assert.hpp>
 6 #include <boost/optional.hpp>
 7 #include <libp2p/basic/message_read_writer_error.hpp>
 8 #include <libp2p/basic/varint_reader.hpp>
 9 #include <libp2p/multi/uvarint.hpp>
10 
11 namespace libp2p::basic {
12   MessageReadWriterUvarint::MessageReadWriterUvarint(
13       std::shared_ptr<ReadWriter> conn)
14       : conn_{std::move(conn)} {
15     BOOST_ASSERT(conn_ != nullptr);
16   }
17 
18   void MessageReadWriterUvarint::read(ReadCallbackFunc cb) {
19     VarintReader::readVarint(
20         conn_,
21         [self{shared_from_this()}, cb = std::move(cb)](
22             outcome::result<multi::UVarint> varint_res) mutable {
23           if (varint_res.has_error()) {
24             return cb(varint_res.error());
25           }
26 
27           auto msg_len = varint_res.value().toUInt64();
28           if (0 != msg_len) {
29             auto buffer = std::make_shared<std::vector<uint8_t>>(msg_len, 0);
30             self->conn_->read(
31                 *buffer, msg_len,
32                 [self, buffer, cb = std::move(cb)](auto &&res) mutable {
33                   if (!res) {
34                     return cb(res.error());
35                   }
36                   cb(std::move(buffer));
37                 });
38           } else {
39             cb(ResultType{});
40           }
41         });
42   }
43 
44   void MessageReadWriterUvarint::write(gsl::span<const uint8_t> buffer,
45                                        Writer::WriteCallbackFunc cb) {
46     auto varint_len = multi::UVarint{static_cast<uint64_t>(buffer.size())};
47 
48     auto msg_bytes = std::make_shared<std::vector<uint8_t>>();
49     msg_bytes->reserve(varint_len.size() + buffer.size());
50     msg_bytes->insert(msg_bytes->end(),
51                       std::make_move_iterator(varint_len.toVector().begin()),
52                       std::make_move_iterator(varint_len.toVector().end()));
53     msg_bytes->insert(msg_bytes->end(), buffer.begin(), buffer.end());
54 
55     conn_->write(*msg_bytes, msg_bytes->size(),
56                  [cb = std::move(cb), varint_size = varint_len.size(),
57                   msg_bytes](auto &&res) {
58                    if (!res) {
59                      return cb(res.error());
60                    }
61                    // hide a written varint from the user of the method
62                    cb(res.value() - varint_size);
63                  });
64   }
65 }  // namespace libp2p::basic

这段代码实现了一个名为MessageReadWriterUvarint的类。

这个类用于在读写消息时在消息前面添加一个无符号变量编码(Uvarint)的消息长度信息。通过在消息前面添加一个无符号变量编码的长度信息,可以避免在消息读写过程中出现消息边界问题。

主要功能有以下几点:

  • 实现了读取消息的read函数。在读取消息时,先读取Uvarint编码的消息长度,然后读取消息内容,最后返回读取的消息。

  • 实现了写入消息的write函数。在写入消息时,先写入Uvarint编码的消息长度,然后写入消息内容,最后返回写入的消息长度。

整个类使用了一个std::shared_ptr<ReadWriter>类型的conn_变量,表示一个读写连接。使用了boost库的assert.hpp头文件,确保conn_不为空,也使用了varint_reader.hpp头文件,实现了读取Uvarint编码的功能。

 1 #include <libp2p/basic/protobuf_message_read_writer.hpp>
 2 
 3 #include <boost/assert.hpp>
 4 #include <libp2p/basic/message_read_writer_uvarint.hpp>
 5 
 6 namespace libp2p::basic {
 7   ProtobufMessageReadWriter::ProtobufMessageReadWriter(
 8       std::shared_ptr<MessageReadWriter> read_writer)
 9       : read_writer_{std::move(read_writer)} {
10     BOOST_ASSERT(read_writer_);
11   }
12 
13   ProtobufMessageReadWriter::ProtobufMessageReadWriter(
14       std::shared_ptr<ReadWriter>
15           conn)  // NOLINT(performance-unnecessary-value-param)
16       : read_writer_{
17             std::make_shared<MessageReadWriterUvarint>(std::move(conn))} {
18     BOOST_ASSERT(read_writer_);
19   }
20 }  // namespace libp2p::basic

这段代码实现了一个名为ProtobufMessageReadWriter的类,该类是libp2p网络中用于读写protobuf消息的类。该类的目的是为了通过MessageReadWriterUvarint来读写protobuf消息。

该类的构造函数有两个重载:

  • 一个接受std::shared_ptr<MessageReadWriter>作为参数的构造函数,它直接赋值给了read_writer_。
  • 一个接受std::shared_ptr<ReadWriter>作为参数的构造函数,它将conn参数传递给了MessageReadWriterUvarint的构造函数,然后再将生成的MessageReadWriterUvarint的指针赋给了read_writer_。

两个构造函数都进行了read_writer_的非空断言。

  1 #include <cassert>
  2 #include <cstring>
  3 
  4 #include <libp2p/basic/read_buffer.hpp>
  5 
  6 namespace libp2p::basic {
  7 
  8   ReadBuffer::ReadBuffer(size_t alloc_granularity)
  9       : alloc_granularity_(alloc_granularity),
 10         total_size_(0),
 11         first_byte_offset_(0),
 12         capacity_remains_(0) {
 13     assert(alloc_granularity > 0);
 14   }
 15 
 16   void ReadBuffer::add(BytesRef bytes) {
 17     size_t sz = bytes.size();
 18     if (sz == 0) {
 19       return;
 20     }
 21 
 22     if (capacity_remains_ >= sz) {
 23       assert(!fragments_.empty());
 24 
 25       auto &vec = fragments_.back();
 26       vec.insert(vec.end(), bytes.begin(), bytes.end());
 27 
 28       capacity_remains_ -= sz;
 29     } else if (capacity_remains_ > 0) {
 30       auto &vec = fragments_.back();
 31 
 32       size_t new_capacity = vec.size() + sz + alloc_granularity_;
 33       vec.reserve(new_capacity);
 34       vec.insert(vec.end(), bytes.begin(), bytes.end());
 35 
 36       capacity_remains_ = alloc_granularity_;
 37     } else {
 38       fragments_.emplace_back();
 39       auto &vec = fragments_.back();
 40 
 41       size_t new_capacity = sz + alloc_granularity_;
 42 
 43       vec.reserve(new_capacity);
 44       vec.insert(vec.end(), bytes.begin(), bytes.end());
 45 
 46       capacity_remains_ = alloc_granularity_;
 47     }
 48 
 49     total_size_ += sz;
 50   }
 51 
 52   size_t ReadBuffer::consume(BytesRef &out) {
 53     if (empty()) {
 54       return 0;
 55     }
 56 
 57     size_t n_bytes = out.size();
 58     if (n_bytes >= total_size_) {
 59       return consumeAll(out);
 60     }
 61 
 62     auto remains = n_bytes;
 63     auto *p = out.data();
 64 
 65     while (remains > 0) {
 66       auto consumed = consumePart(p, remains);
 67 
 68       assert(consumed <= remains);
 69 
 70       remains -= consumed;
 71       p += consumed;  // NOLINT
 72     }
 73 
 74     total_size_ -= n_bytes;
 75     return n_bytes;
 76   }
 77 
 78   size_t ReadBuffer::addAndConsume(BytesRef in, BytesRef &out) {
 79     if (in.empty()) {
 80       return consume(out);
 81     }
 82 
 83     if (out.empty()) {
 84       add(in);
 85       return 0;
 86     }
 87 
 88     if (empty()) {
 89       if (in.size() <= out.size()) {
 90         memcpy(out.data(), in.data(), in.size());
 91         return in.size();
 92       }
 93       memcpy(out.data(), in.data(), out.size());
 94       in = in.subspan(out.size());
 95       add(in);
 96       return out.size();
 97     }
 98 
 99     auto out_size = static_cast<size_t>(out.size());
100     size_t consumed = 0;
101 
102     if (out_size <= total_size_) {
103       consumed = consume(out);
104       add(in);
105       return consumed;
106     }
107 
108     consumed = consumeAll(out);
109     auto out_remains = out.subspan(consumed);
110     return consumed + addAndConsume(in, out_remains);
111   }
112 
113   void ReadBuffer::clear() {
114     total_size_ = 0;
115     first_byte_offset_ = 0;
116     capacity_remains_ = 0;
117     std::deque<Fragment>{}.swap(fragments_);
118   }
119 
120   size_t ReadBuffer::consumeAll(BytesRef &out) {
121     assert(!fragments_.empty());
122     auto *p = out.data();
123     auto n = fragments_.front().size() - first_byte_offset_;
124     assert(n <= fragments_.front().size());
125 
126     memcpy(p, fragments_.front().data() + first_byte_offset_, n);  // NOLINT
127 
128     auto it = ++fragments_.begin();
129     while (it != fragments_.end()) {
130       p += n;  // NOLINT
131       n = it->size();
132       memcpy(p, it->data(), n);
133       ++it;
134     }
135 
136     auto ret = total_size_;
137 
138     total_size_ = 0;
139     first_byte_offset_ = 0;
140     capacity_remains_ = 0;
141 
142     // Find one fragment if not too large to avoid further allocations
143     bool keep_one_fragment = false;
144     bool is_first = true;
145     for (auto &f : fragments_) {
146       if (f.capacity() <= alloc_granularity_ * 2) {
147         f.clear();
148         capacity_remains_ = f.capacity();
149         if (!is_first) {
150           fragments_.front() = std::move(f);
151         }
152         keep_one_fragment = true;
153         break;
154       }
155       if (is_first) {
156         is_first = false;
157       }
158     }
159     fragments_.resize(keep_one_fragment ? 1 : 0);
160 
161     return ret;
162   }
163 
164   size_t ReadBuffer::consumePart(uint8_t *out, size_t n) {
165     if (fragments_.empty()) {
166       return 0;
167     }
168 
169     auto &f = fragments_.front();
170 
171     assert(f.size() > first_byte_offset_);
172 
173     auto fragment_size = f.size() - first_byte_offset_;
174     if (n > fragment_size) {
175       n = fragment_size;
176     }
177 
178     memcpy(out, f.data() + first_byte_offset_, n);  // NOLINT
179 
180     if (n < fragment_size) {
181       first_byte_offset_ += n;
182     } else {
183       first_byte_offset_ = 0;
184       fragments_.pop_front();
185     }
186 
187     return n;
188   }
189 
190   FixedBufferCollector::FixedBufferCollector(size_t expected_size,
191                                              size_t memory_threshold)
192       : memory_threshold_(memory_threshold), expected_size_(expected_size) {
193   }
194 
195   void FixedBufferCollector::expect(size_t size) {
196     expected_size_ = size;
197     buffer_.clear();
198     auto reserved = buffer_.capacity();
199     if ((reserved > memory_threshold_) && (expected_size_ < reserved * 3 / 4)) {
200       Buffer new_buffer;
201       buffer_.swap(new_buffer);
202     }
203   }
204 
205   boost::optional<FixedBufferCollector::CBytesRef>
206   FixedBufferCollector::add(CBytesRef &data) {
207     assert(expected_size_ >= buffer_.size());
208 
209     auto appending = static_cast<size_t>(data.size());
210     auto buffered = buffer_.size();
211 
212     if (buffered == 0) {
213       if (appending >= expected_size_) {
214         // dont buffer, just split
215         CBytesRef ret = data.subspan(0, expected_size_);
216         data = data.subspan(expected_size_);
217         expected_size_ = 0;
218         return ret;
219       }
220       buffer_.reserve(expected_size_);
221     }
222 
223     auto unread = expected_size_ - buffer_.size();
224     if (unread == 0) {
225       // didnt expect anything
226       return boost::none;
227     }
228 
229     bool filled = false;
230     if (appending >= unread) {
231       appending = unread;
232       filled = true;
233     }
234 
235     buffer_.insert(buffer_.end(), data.begin(), data.begin() + appending);
236     data = data.subspan(appending);
237 
238     if (filled) {
239       return CBytesRef(buffer_);
240     }
241 
242     return boost::none;
243   }
244 
245   boost::optional<FixedBufferCollector::BytesRef>
246   FixedBufferCollector::add(BytesRef &data) {
247     auto &span = (CBytesRef&)(data); //NOLINT
248     auto ret = add(span);
249     if (ret.has_value()) {
250       auto& v = ret.value();
251       return BytesRef((uint8_t*)v.data(), v.size()); // NOLINT
252     }
253     return boost::none;
254   }
255 
256   void FixedBufferCollector::reset() {
257     expected_size_ = 0;
258     Buffer new_buffer;
259     buffer_.swap(new_buffer);
260   }
261 
262 }  // namespace libp2p::basic

这段代码实现了一个名为 ReadBuffer 的类,它用于存储字节数组,并提供了一些用于读写存储的字节数组的操作。

主要功能如下:

  • 向 ReadBuffer 中加入一个字节数组。
  • 从 ReadBuffer 中读出一个字节数组。
  • 清除 ReadBuffer 中的所有内容。

代码通过内置的 deque 容器存储字节数组,可以动态增加其存储空间。

FixedBufferCollector是一个缓存工具类,用于缓存输出数据的内容,以避免内存不足导致的程序崩溃。它继承了 Collector 类,并重写了该类的一些方法。

 1 #include <libp2p/basic/scheduler.hpp>
 2 
 3 namespace libp2p::basic {
 4 
 5   Scheduler::Handle::Handle(Scheduler::Handle::Ticket ticket,
 6                             std::weak_ptr<Scheduler> scheduler)
 7       : ticket_(std::move(ticket)), scheduler_(std::move(scheduler)) {}
 8 
 9   Scheduler::Handle &Scheduler::Handle::operator=(
10       Scheduler::Handle &&r) noexcept {
11     cancel();
12     ticket_ = std::move(r.ticket_);
13     scheduler_ = std::move(r.scheduler_);
14     return *this;
15   }
16 
17   Scheduler::Handle::~Handle() {
18     cancel();
19   }
20 
21   void Scheduler::Handle::cancel() noexcept {
22     auto sch = scheduler_.lock();
23     if (sch) {
24       sch->cancel(ticket_);
25       scheduler_.reset();
26     }
27   }
28 
29   outcome::result<void> Scheduler::Handle::reschedule(
30       std::chrono::milliseconds delay_from_now) noexcept {
31     if (delay_from_now.count() <= 0) {
32       return Scheduler::Error::kInvalidArgument;
33     }
34     auto sch = scheduler_.lock();
35     if (sch) {
36       auto res = sch->reschedule(ticket_, delay_from_now);
37       if (!res) {
38         scheduler_.reset();
39         return res.error();
40       }
41       ticket_ = std::move(res.value());
42       return outcome::success();
43     }
44     return Scheduler::Error::kHandleDetached;
45   }
46 
47 }  // namespace libp2p::basic
48 
49 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, Scheduler::Error, e) {
50   using E = libp2p::basic::Scheduler::Error;
51   switch (e) {
52     case E::kInvalidArgument:
53       return "Scheduler: invalid argument";
54     case E::kHandleDetached:
55       return "Scheduler: handle detached, cannot reschedule";
56     case E::kItemNotFound:
57       return "Scheduler: item not found, cannot reschedule";
58   }
59   return "Scheduler: unknown error";
60 }

这个Scheduler类是一个定时调度器类,它可以被用来调度某些任务在未来某个时间点运行。类中包含了一个Handle类,它可以用来取消、重新调度已调度的任务。调度的任务需要指定延迟多长时间运行,并且Handle类也可以用来重新调度已调度的任务。

Scheduler 类的一个重要成员是 Handle 类,该类是一个辅助类,用于管理 Scheduler 中定时任务的句柄。在 Scheduler 类中,每一个定时任务都会被封装为一个 Handle 对象,该对象可以被用于取消定时任务、重新安排定时任务等。

Handle 类有一个 ticket_ 变量,用于存储当前定时任务的编号;scheduler_ 变量则用于存储一个对 Scheduler 类的 weak_ptr,该 weak_ptr 用于保持与 Scheduler 类的联系。

Handle 类实现了析构函数和 cancel() 函数,析构函数用于在 Handle 对象销毁时取消定时任务,cancel() 函数则可以在任意时刻手动取消定时任务。

Handle 类还实现了 reschedule() 函数,该函数用于重新安排定时任务,可以更改定时任务的执行时间。

最后,OUTCOME_CPP_DEFINE_CATEGORY 宏定义了一个错误码枚举,该枚举定义了 Scheduler 类可能出现的错误,并为每一种错误定义了一个错误消息。

 1 #include <libp2p/basic/varint_prefix_reader.hpp>
 2 
 3 namespace libp2p::basic {
 4 
 5   namespace {
 6     constexpr uint8_t kHighBitMask = 0x80;
 7 
 8     // just because 64 == 9*7 + 1
 9     constexpr uint8_t kMaxBytes = 10;
10 
11   }  // namespace
12 
13   void VarintPrefixReader::reset() {
14     value_ = 0;
15     state_ = kUnderflow;
16     got_bytes_ = 0;
17   }
18 
19   VarintPrefixReader::State VarintPrefixReader::consume(uint8_t byte) {
20     if (state_ == kUnderflow) {
21       bool next_byte_needed = (byte & kHighBitMask) != 0;
22       uint64_t tmp = byte & ~kHighBitMask;
23 
24       switch (++got_bytes_) {
25         case 1:
26           break;
27         case kMaxBytes:
28           if (tmp > 1 || next_byte_needed) {
29             state_ = kOverflow;
30             return state_;
31           }
32           [[fallthrough]];
33         default:
34           tmp <<= 7 * (got_bytes_ - 1);
35           break;
36       }
37 
38       value_ += tmp;
39       if (!next_byte_needed) {
40         state_ = kReady;
41       }
42 
43     } else if (state_ == kReady) {
44       return kError;
45     }
46 
47     return state_;
48   }
49 
50   VarintPrefixReader::State VarintPrefixReader::consume(
51       gsl::span<const uint8_t> &buffer) {
52     size_t consumed = 0;
53     State s(state_);
54     for (auto byte : buffer) {
55       ++consumed;
56       s = consume(byte);
57       if (s != kUnderflow) {
58         break;
59       }
60     }
61     if (consumed > 0 && (s == kReady || s == kUnderflow)) {
62       buffer = buffer.subspan(consumed);
63     }
64     return s;
65   }
66 
67 }  // namespace libp2p::basic

这段代码是用于读取varint整数前缀的类的实现。varint是一种编码方式,用于将整数压缩到最小的字节数以存储在网络传输或文件中。

在这种方式中,数字的每个存储字节使用7个低有效位来存储数字的一部分,剩下的1位高有效位表示是否还有更多的字节用于存储数字。如果高有效位被设置为1,则表示还有更多的字节用于存储数字;否则,如果高有效位被设置为0,则表示数字已经完整地存储在字节中。

换句话说,这种编码方式实现了变长前缀编码的核心思想,即通过在高有效位上存储一个标志位来指示数字是否结束,这样可以保证使用足够的字节数存储数字,但不会使用太多的字节。

VarintPrefixReader类实现了从输入数据中读取varint前缀整数的功能。该类的主要成员函数是consume,它从输入数据流中读取一个字节并使用它来更新读取的varint数字的当前值。可以多次调用consume函数,以读取多个字节。当整个varint前缀整数已被读取并且不再需要更多的字节时,consume函数将返回状态kReady。当发生任何错误(例如数字的字节数超过了最大字节数)时,consume函数将返回状态kError。

 1 #include "libp2p/basic/varint_reader.hpp"
 2 
 3 #include <vector>
 4 
 5 namespace {
 6   constexpr uint8_t kMaximumVarintLength = 9;  // taken from Go
 7 }
 8 
 9 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, VarintReader::Error, e) {
10   using E = libp2p::basic::VarintReader::Error;
11   switch (e) {
12     case E::NO_VARINT:
13       return "Input stream does not contain a varint value";
14   }
15   return "Unknown error";
16 }
17 
18 namespace libp2p::basic {
19   void VarintReader::readVarint(
20       std::shared_ptr<ReadWriter> conn,
21       std::function<void(outcome::result<multi::UVarint>)> cb) {
22     readVarint(std::move(conn), std::move(cb), 0,
23                std::make_shared<std::vector<uint8_t>>(kMaximumVarintLength, 0));
24   }
25 
26   void VarintReader::readVarint(
27       std::shared_ptr<ReadWriter> conn,
28       std::function<void(outcome::result<multi::UVarint>)> cb,
29       uint8_t current_length,
30       std::shared_ptr<std::vector<uint8_t>> varint_buf) {
31     if (current_length > kMaximumVarintLength) {
32       // TODO(107): Reentrancy here, defer callback
33       // to the moment we read more bytes than varint may contain and still no
34       // valid varint was parsed
35       return cb(Error::NO_VARINT);
36     }
37 
38     // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
39     conn->read(gsl::make_span(varint_buf->data() + current_length, 1), 1,
40                [c = std::move(conn), cb = std::move(cb), current_length,
41                 varint_buf](auto &&res) mutable {
42                  if (not res.has_value()) {
43                    return cb(res.error());
44                  }
45 
46                  auto varint_opt = multi::UVarint::create(
47                      gsl::make_span(varint_buf->data(), current_length + 1));
48                  if (varint_opt) {
49                    return cb(*varint_opt);
50                  }
51 
52                  readVarint(std::move(c), std::move(cb), ++current_length,
53                             std::move(varint_buf));
54                });
55   }
56 }  // namespace libp2p::basic

这段代码实现了读取变长整数(varint)的功能。

首先定义了一个 constexpr uint8_t kMaximumVarintLength = 9;,表示变长整数的最大长度为9。

接着定义了一个宏定义 OUTCOME_CPP_DEFINE_CATEGORY,它根据不同的错误码设置错误信息。

然后是命名空间 libp2p::basic,它定义了一个类 VarintReader。该类有两个成员函数:readVarintreadVarint(重载版本)。

首先是 readVarint,它接收两个参数:一个指向 ReadWriter 的智能指针和一个回调函数。它内部调用了另一个重载版本的 readVarint

接着是重载版本的 readVarint,它接收四个参数:一个指向 ReadWriter 的智能指针、一个回调函数、一个当前长度和一个指向存储变长整数字节的智能指针。该函数通过循环读取变长整数的字节,并使用给定的回调函数返回读取到的变长整数。

该代码实现了读取变长整数的功能,通过持续读取数据流直到读到一个有效的变长整数。

  1 #include <cassert>
  2 
  3 #include <libp2p/basic/write_queue.hpp>
  4 
  5 namespace libp2p::basic {
  6 
  7   bool WriteQueue::canEnqueue(size_t size) const {
  8     return (size + total_unsent_size_ <= size_limit_);
  9   }
 10 
 11   size_t WriteQueue::unsentBytes() const {
 12     return total_unsent_size_;
 13   }
 14 
 15   void WriteQueue::enqueue(DataRef data, bool some,
 16                            Writer::WriteCallbackFunc cb) {
 17     auto data_sz = static_cast<size_t>(data.size());
 18 
 19     assert(data_sz > 0);
 20     assert(canEnqueue(data_sz));
 21 
 22     total_unsent_size_ += data_sz;
 23     queue_.push_back({data, 0, 0, data_sz, some, std::move(cb)});
 24   }
 25 
 26   size_t WriteQueue::dequeue(size_t window_size, DataRef &out, bool &some) {
 27     if (total_unsent_size_ == 0 || window_size == 0
 28         || active_index_ >= queue_.size()) {
 29       out = DataRef{};
 30       return window_size;
 31     }
 32 
 33     assert(!queue_.empty());
 34 
 35     auto &item = queue_[active_index_];
 36 
 37     assert(item.unacknowledged + item.acknowledged + item.unsent
 38            == static_cast<size_t>(item.data.size()));
 39     assert(item.unsent > 0);
 40 
 41     out = item.data.subspan(item.acknowledged + item.unacknowledged);
 42     auto sz = static_cast<size_t>(out.size());
 43 
 44     assert(sz == item.unsent);
 45 
 46     if (sz > window_size) {
 47       sz = window_size;
 48       out = out.subspan(0, window_size);
 49     }
 50 
 51     item.unsent -= sz;
 52     item.unacknowledged += sz;
 53 
 54     if (item.some) {
 55       assert(item.acknowledged == 0);
 56       some = true;
 57       ++active_index_;
 58     } else {
 59       some = false;
 60       if (item.unsent == 0) {
 61         ++active_index_;
 62       }
 63     }
 64 
 65     assert(item.unacknowledged + item.acknowledged + item.unsent
 66            == static_cast<size_t>(item.data.size()));
 67 
 68     assert(total_unsent_size_ >= sz);
 69     total_unsent_size_ -= sz;
 70 
 71     return window_size - sz;
 72   }
 73 
 74   WriteQueue::AckResult WriteQueue::ackDataSent(size_t size) {
 75     AckResult result;
 76 
 77     if (queue_.empty() || size == 0) {
 78       // inconsistency, must not be called if nothing to ack
 79       result.data_consistent = false;
 80       return result;
 81     }
 82 
 83     auto &item = queue_.front();
 84 
 85     auto total_size = item.acknowledged + item.unacknowledged + item.unsent;
 86 
 87     assert(total_size == static_cast<size_t>(item.data.size()));
 88 
 89     if (size > item.unacknowledged) {
 90       // inconsistency, more data is acked than callback was put for
 91       result.data_consistent = false;
 92       return result;
 93     }
 94 
 95     bool completed = false;
 96 
 97     if (item.some) {
 98       completed = true;
 99       total_size = size;
100 
101     } else {
102       item.unacknowledged -= size;
103       item.acknowledged += size;
104 
105       completed = (item.acknowledged == total_size);
106     }
107 
108     if (!completed) {
109       assert(total_size > item.acknowledged);
110       // data partially acknowledged, early to call the callback
111       result.data_consistent = true;
112       return result;
113     }
114 
115     // acknowledging a portion of data was written
116     result.cb.swap(item.cb);
117     result.size_to_ack = total_size;
118     result.data_consistent = true;
119 
120     queue_.pop_front();
121     if (queue_.empty()) {
122       assert(total_unsent_size_ == 0);
123       active_index_ = 0;
124     } else if (active_index_ > 0) {
125       --active_index_;
126     }
127 
128     return result;
129   }
130 
131   std::vector<Writer::WriteCallbackFunc> WriteQueue::getAllCallbacks() {
132     std::vector<Writer::WriteCallbackFunc> v;
133     v.reserve(queue_.size());
134     for (auto &item : queue_) {
135       if (!item.cb) {
136         continue;
137       }
138       v.emplace_back();
139       item.cb.swap(v.back());
140     }
141     return v;
142   }
143 
144   void WriteQueue::clear() {
145     active_index_ = 0;
146     total_unsent_size_ = 0;
147     std::deque<Data> tmp_queue;
148     queue_.swap(tmp_queue);
149   }
150 
151 }  // namespace libp2p::basic

这段 C++ 代码定义了一个名为 WriteQueue 的类。该类实现了一个写队列,用于管理数据写入操作。

具体来说,该类实现了以下功能:

  • canEnqueue 函数:该函数用于判断是否可以向写队列中加入指定大小的数据。

  • unsentBytes 函数:该函数返回当前写队列中未发送的字节数。

  • enqueue 函数:该函数用于向写队列中加入数据以及与其相关的回调函数。

  • dequeue 函数:该函数用于从写队列中出队数据。

  • ackDataSent 函数:该函数用于通知写队列一部分数据已经被写入。

  • getAllCallbacks 函数:该函数用于获取写队列中所有的回调函数。

类的实现中使用了一些 STL 容器,如 vectordeque,以及一些 C++ 的语言特性,如智能指针和 std::move 等。

这段代码还使用了一个 assert 宏,该宏用于检测程序运行时是否满足特定条件,并在不满足条件时终止程序运行。

标签:std,return,buffer,cb,源码,auto,cpp,libp2p,size
From: https://www.cnblogs.com/leeeeein/p/17116485.html

相关文章