-
message_read_writer_bigendian.cpp
1 #include <libp2p/basic/message_read_writer_bigendian.hpp> 2 3 #include <vector> 4 5 #include <arpa/inet.h> 6 #include <boost/assert.hpp> 7 #include <libp2p/basic/message_read_writer_error.hpp> 8 #include <libp2p/common/byteutil.hpp> 9 10 namespace libp2p::basic { 11 MessageReadWriterBigEndian::MessageReadWriterBigEndian( 12 std::shared_ptr<ReadWriter> conn) 13 : conn_{std::move(conn)} { 14 BOOST_ASSERT(conn_ != nullptr); 15 } 16 17 void MessageReadWriterBigEndian::read(ReadCallbackFunc cb) { 18 auto buffer = std::make_shared<std::vector<uint8_t>>(); 19 buffer->resize(kLenMarkerSize); 20 conn_->read( 21 *buffer, kLenMarkerSize, 22 [self{shared_from_this()}, buffer, cb{std::move(cb)}](auto &&result) { 23 if (not result) { 24 return cb(result.error()); 25 } 26 uint32_t msg_len = ntohl( // NOLINT 27 common::convert<uint32_t>(buffer->data())); 28 buffer->resize(msg_len); 29 std::fill(buffer->begin(), buffer->end(), 0u); 30 self->conn_->read(*buffer, msg_len, 31 [self, buffer, cb](auto &&result) { 32 if (not result) { 33 return cb(result.error()); 34 } 35 cb(buffer); 36 }); 37 }); 38 } 39 40 void MessageReadWriterBigEndian::write(gsl::span<const uint8_t> buffer, 41 Writer::WriteCallbackFunc cb) { 42 if (buffer.empty()) { 43 // TODO(107): Reentrancy 44 return cb(MessageReadWriterError::BUFFER_IS_EMPTY); 45 } 46 47 std::vector<uint8_t> raw_buf; 48 raw_buf.reserve(kLenMarkerSize + buffer.size()); 49 common::putUint32BE(raw_buf, buffer.size()); 50 raw_buf.insert(raw_buf.end(), buffer.begin(), buffer.end()); 51 conn_->write(raw_buf, raw_buf.size(), 52 [self{shared_from_this()}, cb{std::move(cb)}](auto &&result) { 53 if (not result) { 54 return cb(result.error()); 55 } 56 cb(result.value() - self->kLenMarkerSize); 57 }); 58 } 59 } // namespace libp2p::basic
这段代码实现了一个名为 MessageReadWriterBigEndian 的 C++ 类,它的作用是实现基于大端字节序的消息读写。
类的构造函数接收一个 std::shared_ptr<ReadWriter> 类型的参数,用于将它存储在内部变量 conn_ 中。
类实现了两个公共方法:
- read:该方法实现了读取消息的功能。它首先从 conn_ 对象读取 kLenMarkerSize 大小的数据,这是消息长度标识。之后,使用读取到的长度值,在从 conn_ 对象读取消息数据,最后调用回调函数 cb。
- write:该方法实现了写入消息的功能。它首先判断待写入的消息是否为空,如果为空则直接返回。否则,创建一个向量 raw_buf,该向量用于存储消息长度标识和消息数据。在使用 conn_ 对象写入 raw_buf 向量中的数据,并调用回调函数 cb。
其中,kLenMarkerSize 常量定义了消息长度标识的大小。在代码中使用了 ntohl 和 common::putUint32BE 函数,用于将整数值转换为大端字节序的二进制表示。
-
message_read_writer_error.cpp
1 #include "libp2p/basic/message_read_writer_error.hpp" 2 3 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, MessageReadWriterError, e) { 4 using E = libp2p::basic::MessageReadWriterError; 5 switch (e) { 6 case E::SUCCESS: 7 return "success"; 8 case E::BUFFER_IS_EMPTY: 9 return "empty buffer provided"; 10 case E::VARINT_EXPECTED: 11 return "varint expected at the beginning of Protobuf message"; 12 case E::INTERNAL_ERROR: 13 return "internal error happened"; 14 } 15 return "unknown error"; 16 }
这段代码定义了一个错误类别:libp2p::basic::MessageReadWriterError。它使用了 Outcome 库来处理错误,它是一个 C++ 库,用于方便地返回、处理和报告操作的结果。
OUTCOME_CPP_DEFINE_CATEGORY的定义参见:cpp-libp2p/include/libp2p/outcome/outcome-register.hpp。
-
message_read_writer_uvarint.cpp
1 #include <libp2p/basic/message_read_writer_uvarint.hpp> 2 3 #include <vector> 4 5 #include <boost/assert.hpp> 6 #include <boost/optional.hpp> 7 #include <libp2p/basic/message_read_writer_error.hpp> 8 #include <libp2p/basic/varint_reader.hpp> 9 #include <libp2p/multi/uvarint.hpp> 10 11 namespace libp2p::basic { 12 MessageReadWriterUvarint::MessageReadWriterUvarint( 13 std::shared_ptr<ReadWriter> conn) 14 : conn_{std::move(conn)} { 15 BOOST_ASSERT(conn_ != nullptr); 16 } 17 18 void MessageReadWriterUvarint::read(ReadCallbackFunc cb) { 19 VarintReader::readVarint( 20 conn_, 21 [self{shared_from_this()}, cb = std::move(cb)]( 22 outcome::result<multi::UVarint> varint_res) mutable { 23 if (varint_res.has_error()) { 24 return cb(varint_res.error()); 25 } 26 27 auto msg_len = varint_res.value().toUInt64(); 28 if (0 != msg_len) { 29 auto buffer = std::make_shared<std::vector<uint8_t>>(msg_len, 0); 30 self->conn_->read( 31 *buffer, msg_len, 32 [self, buffer, cb = std::move(cb)](auto &&res) mutable { 33 if (!res) { 34 return cb(res.error()); 35 } 36 cb(std::move(buffer)); 37 }); 38 } else { 39 cb(ResultType{}); 40 } 41 }); 42 } 43 44 void MessageReadWriterUvarint::write(gsl::span<const uint8_t> buffer, 45 Writer::WriteCallbackFunc cb) { 46 auto varint_len = multi::UVarint{static_cast<uint64_t>(buffer.size())}; 47 48 auto msg_bytes = std::make_shared<std::vector<uint8_t>>(); 49 msg_bytes->reserve(varint_len.size() + buffer.size()); 50 msg_bytes->insert(msg_bytes->end(), 51 std::make_move_iterator(varint_len.toVector().begin()), 52 std::make_move_iterator(varint_len.toVector().end())); 53 msg_bytes->insert(msg_bytes->end(), buffer.begin(), buffer.end()); 54 55 conn_->write(*msg_bytes, msg_bytes->size(), 56 [cb = std::move(cb), varint_size = varint_len.size(), 57 msg_bytes](auto &&res) { 58 if (!res) { 59 return cb(res.error()); 60 } 61 // hide a written varint from the user of the method 62 cb(res.value() - varint_size); 63 }); 64 } 65 } // namespace libp2p::basic
这段代码实现了一个名为MessageReadWriterUvarint的类。
这个类用于在读写消息时在消息前面添加一个无符号变量编码(Uvarint)的消息长度信息。通过在消息前面添加一个无符号变量编码的长度信息,可以避免在消息读写过程中出现消息边界问题。
主要功能有以下几点:
-
实现了读取消息的read函数。在读取消息时,先读取Uvarint编码的消息长度,然后读取消息内容,最后返回读取的消息。
-
实现了写入消息的write函数。在写入消息时,先写入Uvarint编码的消息长度,然后写入消息内容,最后返回写入的消息长度。
整个类使用了一个std::shared_ptr<ReadWriter>类型的conn_变量,表示一个读写连接。使用了boost库的assert.hpp头文件,确保conn_不为空,也使用了varint_reader.hpp头文件,实现了读取Uvarint编码的功能。
-
protobuf_message_read_writer.cpp
1 #include <libp2p/basic/protobuf_message_read_writer.hpp> 2 3 #include <boost/assert.hpp> 4 #include <libp2p/basic/message_read_writer_uvarint.hpp> 5 6 namespace libp2p::basic { 7 ProtobufMessageReadWriter::ProtobufMessageReadWriter( 8 std::shared_ptr<MessageReadWriter> read_writer) 9 : read_writer_{std::move(read_writer)} { 10 BOOST_ASSERT(read_writer_); 11 } 12 13 ProtobufMessageReadWriter::ProtobufMessageReadWriter( 14 std::shared_ptr<ReadWriter> 15 conn) // NOLINT(performance-unnecessary-value-param) 16 : read_writer_{ 17 std::make_shared<MessageReadWriterUvarint>(std::move(conn))} { 18 BOOST_ASSERT(read_writer_); 19 } 20 } // namespace libp2p::basic
这段代码实现了一个名为ProtobufMessageReadWriter的类,该类是libp2p网络中用于读写protobuf消息的类。该类的目的是为了通过MessageReadWriterUvarint来读写protobuf消息。
该类的构造函数有两个重载:
- 一个接受std::shared_ptr<MessageReadWriter>作为参数的构造函数,它直接赋值给了read_writer_。
- 一个接受std::shared_ptr<ReadWriter>作为参数的构造函数,它将conn参数传递给了MessageReadWriterUvarint的构造函数,然后再将生成的MessageReadWriterUvarint的指针赋给了read_writer_。
两个构造函数都进行了read_writer_的非空断言。
-
read_buffer.cpp
1 #include <cassert> 2 #include <cstring> 3 4 #include <libp2p/basic/read_buffer.hpp> 5 6 namespace libp2p::basic { 7 8 ReadBuffer::ReadBuffer(size_t alloc_granularity) 9 : alloc_granularity_(alloc_granularity), 10 total_size_(0), 11 first_byte_offset_(0), 12 capacity_remains_(0) { 13 assert(alloc_granularity > 0); 14 } 15 16 void ReadBuffer::add(BytesRef bytes) { 17 size_t sz = bytes.size(); 18 if (sz == 0) { 19 return; 20 } 21 22 if (capacity_remains_ >= sz) { 23 assert(!fragments_.empty()); 24 25 auto &vec = fragments_.back(); 26 vec.insert(vec.end(), bytes.begin(), bytes.end()); 27 28 capacity_remains_ -= sz; 29 } else if (capacity_remains_ > 0) { 30 auto &vec = fragments_.back(); 31 32 size_t new_capacity = vec.size() + sz + alloc_granularity_; 33 vec.reserve(new_capacity); 34 vec.insert(vec.end(), bytes.begin(), bytes.end()); 35 36 capacity_remains_ = alloc_granularity_; 37 } else { 38 fragments_.emplace_back(); 39 auto &vec = fragments_.back(); 40 41 size_t new_capacity = sz + alloc_granularity_; 42 43 vec.reserve(new_capacity); 44 vec.insert(vec.end(), bytes.begin(), bytes.end()); 45 46 capacity_remains_ = alloc_granularity_; 47 } 48 49 total_size_ += sz; 50 } 51 52 size_t ReadBuffer::consume(BytesRef &out) { 53 if (empty()) { 54 return 0; 55 } 56 57 size_t n_bytes = out.size(); 58 if (n_bytes >= total_size_) { 59 return consumeAll(out); 60 } 61 62 auto remains = n_bytes; 63 auto *p = out.data(); 64 65 while (remains > 0) { 66 auto consumed = consumePart(p, remains); 67 68 assert(consumed <= remains); 69 70 remains -= consumed; 71 p += consumed; // NOLINT 72 } 73 74 total_size_ -= n_bytes; 75 return n_bytes; 76 } 77 78 size_t ReadBuffer::addAndConsume(BytesRef in, BytesRef &out) { 79 if (in.empty()) { 80 return consume(out); 81 } 82 83 if (out.empty()) { 84 add(in); 85 return 0; 86 } 87 88 if (empty()) { 89 if (in.size() <= out.size()) { 90 memcpy(out.data(), in.data(), in.size()); 91 return in.size(); 92 } 93 memcpy(out.data(), in.data(), out.size()); 94 in = in.subspan(out.size()); 95 add(in); 96 return out.size(); 97 } 98 99 auto out_size = static_cast<size_t>(out.size()); 100 size_t consumed = 0; 101 102 if (out_size <= total_size_) { 103 consumed = consume(out); 104 add(in); 105 return consumed; 106 } 107 108 consumed = consumeAll(out); 109 auto out_remains = out.subspan(consumed); 110 return consumed + addAndConsume(in, out_remains); 111 } 112 113 void ReadBuffer::clear() { 114 total_size_ = 0; 115 first_byte_offset_ = 0; 116 capacity_remains_ = 0; 117 std::deque<Fragment>{}.swap(fragments_); 118 } 119 120 size_t ReadBuffer::consumeAll(BytesRef &out) { 121 assert(!fragments_.empty()); 122 auto *p = out.data(); 123 auto n = fragments_.front().size() - first_byte_offset_; 124 assert(n <= fragments_.front().size()); 125 126 memcpy(p, fragments_.front().data() + first_byte_offset_, n); // NOLINT 127 128 auto it = ++fragments_.begin(); 129 while (it != fragments_.end()) { 130 p += n; // NOLINT 131 n = it->size(); 132 memcpy(p, it->data(), n); 133 ++it; 134 } 135 136 auto ret = total_size_; 137 138 total_size_ = 0; 139 first_byte_offset_ = 0; 140 capacity_remains_ = 0; 141 142 // Find one fragment if not too large to avoid further allocations 143 bool keep_one_fragment = false; 144 bool is_first = true; 145 for (auto &f : fragments_) { 146 if (f.capacity() <= alloc_granularity_ * 2) { 147 f.clear(); 148 capacity_remains_ = f.capacity(); 149 if (!is_first) { 150 fragments_.front() = std::move(f); 151 } 152 keep_one_fragment = true; 153 break; 154 } 155 if (is_first) { 156 is_first = false; 157 } 158 } 159 fragments_.resize(keep_one_fragment ? 1 : 0); 160 161 return ret; 162 } 163 164 size_t ReadBuffer::consumePart(uint8_t *out, size_t n) { 165 if (fragments_.empty()) { 166 return 0; 167 } 168 169 auto &f = fragments_.front(); 170 171 assert(f.size() > first_byte_offset_); 172 173 auto fragment_size = f.size() - first_byte_offset_; 174 if (n > fragment_size) { 175 n = fragment_size; 176 } 177 178 memcpy(out, f.data() + first_byte_offset_, n); // NOLINT 179 180 if (n < fragment_size) { 181 first_byte_offset_ += n; 182 } else { 183 first_byte_offset_ = 0; 184 fragments_.pop_front(); 185 } 186 187 return n; 188 } 189 190 FixedBufferCollector::FixedBufferCollector(size_t expected_size, 191 size_t memory_threshold) 192 : memory_threshold_(memory_threshold), expected_size_(expected_size) { 193 } 194 195 void FixedBufferCollector::expect(size_t size) { 196 expected_size_ = size; 197 buffer_.clear(); 198 auto reserved = buffer_.capacity(); 199 if ((reserved > memory_threshold_) && (expected_size_ < reserved * 3 / 4)) { 200 Buffer new_buffer; 201 buffer_.swap(new_buffer); 202 } 203 } 204 205 boost::optional<FixedBufferCollector::CBytesRef> 206 FixedBufferCollector::add(CBytesRef &data) { 207 assert(expected_size_ >= buffer_.size()); 208 209 auto appending = static_cast<size_t>(data.size()); 210 auto buffered = buffer_.size(); 211 212 if (buffered == 0) { 213 if (appending >= expected_size_) { 214 // dont buffer, just split 215 CBytesRef ret = data.subspan(0, expected_size_); 216 data = data.subspan(expected_size_); 217 expected_size_ = 0; 218 return ret; 219 } 220 buffer_.reserve(expected_size_); 221 } 222 223 auto unread = expected_size_ - buffer_.size(); 224 if (unread == 0) { 225 // didnt expect anything 226 return boost::none; 227 } 228 229 bool filled = false; 230 if (appending >= unread) { 231 appending = unread; 232 filled = true; 233 } 234 235 buffer_.insert(buffer_.end(), data.begin(), data.begin() + appending); 236 data = data.subspan(appending); 237 238 if (filled) { 239 return CBytesRef(buffer_); 240 } 241 242 return boost::none; 243 } 244 245 boost::optional<FixedBufferCollector::BytesRef> 246 FixedBufferCollector::add(BytesRef &data) { 247 auto &span = (CBytesRef&)(data); //NOLINT 248 auto ret = add(span); 249 if (ret.has_value()) { 250 auto& v = ret.value(); 251 return BytesRef((uint8_t*)v.data(), v.size()); // NOLINT 252 } 253 return boost::none; 254 } 255 256 void FixedBufferCollector::reset() { 257 expected_size_ = 0; 258 Buffer new_buffer; 259 buffer_.swap(new_buffer); 260 } 261 262 } // namespace libp2p::basic
这段代码实现了一个名为 ReadBuffer 的类,它用于存储字节数组,并提供了一些用于读写存储的字节数组的操作。
主要功能如下:
- 向 ReadBuffer 中加入一个字节数组。
- 从 ReadBuffer 中读出一个字节数组。
- 清除 ReadBuffer 中的所有内容。
代码通过内置的 deque 容器存储字节数组,可以动态增加其存储空间。
FixedBufferCollector是一个缓存工具类,用于缓存输出数据的内容,以避免内存不足导致的程序崩溃。它继承了 Collector
类,并重写了该类的一些方法。
-
scheduler.cpp
1 #include <libp2p/basic/scheduler.hpp> 2 3 namespace libp2p::basic { 4 5 Scheduler::Handle::Handle(Scheduler::Handle::Ticket ticket, 6 std::weak_ptr<Scheduler> scheduler) 7 : ticket_(std::move(ticket)), scheduler_(std::move(scheduler)) {} 8 9 Scheduler::Handle &Scheduler::Handle::operator=( 10 Scheduler::Handle &&r) noexcept { 11 cancel(); 12 ticket_ = std::move(r.ticket_); 13 scheduler_ = std::move(r.scheduler_); 14 return *this; 15 } 16 17 Scheduler::Handle::~Handle() { 18 cancel(); 19 } 20 21 void Scheduler::Handle::cancel() noexcept { 22 auto sch = scheduler_.lock(); 23 if (sch) { 24 sch->cancel(ticket_); 25 scheduler_.reset(); 26 } 27 } 28 29 outcome::result<void> Scheduler::Handle::reschedule( 30 std::chrono::milliseconds delay_from_now) noexcept { 31 if (delay_from_now.count() <= 0) { 32 return Scheduler::Error::kInvalidArgument; 33 } 34 auto sch = scheduler_.lock(); 35 if (sch) { 36 auto res = sch->reschedule(ticket_, delay_from_now); 37 if (!res) { 38 scheduler_.reset(); 39 return res.error(); 40 } 41 ticket_ = std::move(res.value()); 42 return outcome::success(); 43 } 44 return Scheduler::Error::kHandleDetached; 45 } 46 47 } // namespace libp2p::basic 48 49 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, Scheduler::Error, e) { 50 using E = libp2p::basic::Scheduler::Error; 51 switch (e) { 52 case E::kInvalidArgument: 53 return "Scheduler: invalid argument"; 54 case E::kHandleDetached: 55 return "Scheduler: handle detached, cannot reschedule"; 56 case E::kItemNotFound: 57 return "Scheduler: item not found, cannot reschedule"; 58 } 59 return "Scheduler: unknown error"; 60 }
这个Scheduler类是一个定时调度器类,它可以被用来调度某些任务在未来某个时间点运行。类中包含了一个Handle类,它可以用来取消、重新调度已调度的任务。调度的任务需要指定延迟多长时间运行,并且Handle类也可以用来重新调度已调度的任务。
Scheduler 类的一个重要成员是 Handle 类,该类是一个辅助类,用于管理 Scheduler 中定时任务的句柄。在 Scheduler 类中,每一个定时任务都会被封装为一个 Handle 对象,该对象可以被用于取消定时任务、重新安排定时任务等。
Handle 类有一个 ticket_ 变量,用于存储当前定时任务的编号;scheduler_ 变量则用于存储一个对 Scheduler 类的 weak_ptr,该 weak_ptr 用于保持与 Scheduler 类的联系。
Handle 类实现了析构函数和 cancel() 函数,析构函数用于在 Handle 对象销毁时取消定时任务,cancel() 函数则可以在任意时刻手动取消定时任务。
Handle 类还实现了 reschedule() 函数,该函数用于重新安排定时任务,可以更改定时任务的执行时间。
最后,OUTCOME_CPP_DEFINE_CATEGORY 宏定义了一个错误码枚举,该枚举定义了 Scheduler 类可能出现的错误,并为每一种错误定义了一个错误消息。
-
varint_prefix_reader.cpp
1 #include <libp2p/basic/varint_prefix_reader.hpp> 2 3 namespace libp2p::basic { 4 5 namespace { 6 constexpr uint8_t kHighBitMask = 0x80; 7 8 // just because 64 == 9*7 + 1 9 constexpr uint8_t kMaxBytes = 10; 10 11 } // namespace 12 13 void VarintPrefixReader::reset() { 14 value_ = 0; 15 state_ = kUnderflow; 16 got_bytes_ = 0; 17 } 18 19 VarintPrefixReader::State VarintPrefixReader::consume(uint8_t byte) { 20 if (state_ == kUnderflow) { 21 bool next_byte_needed = (byte & kHighBitMask) != 0; 22 uint64_t tmp = byte & ~kHighBitMask; 23 24 switch (++got_bytes_) { 25 case 1: 26 break; 27 case kMaxBytes: 28 if (tmp > 1 || next_byte_needed) { 29 state_ = kOverflow; 30 return state_; 31 } 32 [[fallthrough]]; 33 default: 34 tmp <<= 7 * (got_bytes_ - 1); 35 break; 36 } 37 38 value_ += tmp; 39 if (!next_byte_needed) { 40 state_ = kReady; 41 } 42 43 } else if (state_ == kReady) { 44 return kError; 45 } 46 47 return state_; 48 } 49 50 VarintPrefixReader::State VarintPrefixReader::consume( 51 gsl::span<const uint8_t> &buffer) { 52 size_t consumed = 0; 53 State s(state_); 54 for (auto byte : buffer) { 55 ++consumed; 56 s = consume(byte); 57 if (s != kUnderflow) { 58 break; 59 } 60 } 61 if (consumed > 0 && (s == kReady || s == kUnderflow)) { 62 buffer = buffer.subspan(consumed); 63 } 64 return s; 65 } 66 67 } // namespace libp2p::basic
这段代码是用于读取varint整数前缀的类的实现。varint是一种编码方式,用于将整数压缩到最小的字节数以存储在网络传输或文件中。
在这种方式中,数字的每个存储字节使用7个低有效位来存储数字的一部分,剩下的1位高有效位表示是否还有更多的字节用于存储数字。如果高有效位被设置为1,则表示还有更多的字节用于存储数字;否则,如果高有效位被设置为0,则表示数字已经完整地存储在字节中。
换句话说,这种编码方式实现了变长前缀编码的核心思想,即通过在高有效位上存储一个标志位来指示数字是否结束,这样可以保证使用足够的字节数存储数字,但不会使用太多的字节。
VarintPrefixReader类实现了从输入数据中读取varint前缀整数的功能。该类的主要成员函数是consume
,它从输入数据流中读取一个字节并使用它来更新读取的varint数字的当前值。可以多次调用consume
函数,以读取多个字节。当整个varint前缀整数已被读取并且不再需要更多的字节时,consume
函数将返回状态kReady。当发生任何错误(例如数字的字节数超过了最大字节数)时,consume
函数将返回状态kError。
-
varint_reader.cpp
1 #include "libp2p/basic/varint_reader.hpp" 2 3 #include <vector> 4 5 namespace { 6 constexpr uint8_t kMaximumVarintLength = 9; // taken from Go 7 } 8 9 OUTCOME_CPP_DEFINE_CATEGORY(libp2p::basic, VarintReader::Error, e) { 10 using E = libp2p::basic::VarintReader::Error; 11 switch (e) { 12 case E::NO_VARINT: 13 return "Input stream does not contain a varint value"; 14 } 15 return "Unknown error"; 16 } 17 18 namespace libp2p::basic { 19 void VarintReader::readVarint( 20 std::shared_ptr<ReadWriter> conn, 21 std::function<void(outcome::result<multi::UVarint>)> cb) { 22 readVarint(std::move(conn), std::move(cb), 0, 23 std::make_shared<std::vector<uint8_t>>(kMaximumVarintLength, 0)); 24 } 25 26 void VarintReader::readVarint( 27 std::shared_ptr<ReadWriter> conn, 28 std::function<void(outcome::result<multi::UVarint>)> cb, 29 uint8_t current_length, 30 std::shared_ptr<std::vector<uint8_t>> varint_buf) { 31 if (current_length > kMaximumVarintLength) { 32 // TODO(107): Reentrancy here, defer callback 33 // to the moment we read more bytes than varint may contain and still no 34 // valid varint was parsed 35 return cb(Error::NO_VARINT); 36 } 37 38 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) 39 conn->read(gsl::make_span(varint_buf->data() + current_length, 1), 1, 40 [c = std::move(conn), cb = std::move(cb), current_length, 41 varint_buf](auto &&res) mutable { 42 if (not res.has_value()) { 43 return cb(res.error()); 44 } 45 46 auto varint_opt = multi::UVarint::create( 47 gsl::make_span(varint_buf->data(), current_length + 1)); 48 if (varint_opt) { 49 return cb(*varint_opt); 50 } 51 52 readVarint(std::move(c), std::move(cb), ++current_length, 53 std::move(varint_buf)); 54 }); 55 } 56 } // namespace libp2p::basic
这段代码实现了读取变长整数(varint)的功能。
首先定义了一个 constexpr uint8_t kMaximumVarintLength = 9;
,表示变长整数的最大长度为9。
接着定义了一个宏定义 OUTCOME_CPP_DEFINE_CATEGORY
,它根据不同的错误码设置错误信息。
然后是命名空间 libp2p::basic
,它定义了一个类 VarintReader
。该类有两个成员函数:readVarint
和 readVarint
(重载版本)。
首先是 readVarint
,它接收两个参数:一个指向 ReadWriter
的智能指针和一个回调函数。它内部调用了另一个重载版本的 readVarint
。
接着是重载版本的 readVarint
,它接收四个参数:一个指向 ReadWriter
的智能指针、一个回调函数、一个当前长度和一个指向存储变长整数字节的智能指针。该函数通过循环读取变长整数的字节,并使用给定的回调函数返回读取到的变长整数。
该代码实现了读取变长整数的功能,通过持续读取数据流直到读到一个有效的变长整数。
-
write_queue.cpp
1 #include <cassert> 2 3 #include <libp2p/basic/write_queue.hpp> 4 5 namespace libp2p::basic { 6 7 bool WriteQueue::canEnqueue(size_t size) const { 8 return (size + total_unsent_size_ <= size_limit_); 9 } 10 11 size_t WriteQueue::unsentBytes() const { 12 return total_unsent_size_; 13 } 14 15 void WriteQueue::enqueue(DataRef data, bool some, 16 Writer::WriteCallbackFunc cb) { 17 auto data_sz = static_cast<size_t>(data.size()); 18 19 assert(data_sz > 0); 20 assert(canEnqueue(data_sz)); 21 22 total_unsent_size_ += data_sz; 23 queue_.push_back({data, 0, 0, data_sz, some, std::move(cb)}); 24 } 25 26 size_t WriteQueue::dequeue(size_t window_size, DataRef &out, bool &some) { 27 if (total_unsent_size_ == 0 || window_size == 0 28 || active_index_ >= queue_.size()) { 29 out = DataRef{}; 30 return window_size; 31 } 32 33 assert(!queue_.empty()); 34 35 auto &item = queue_[active_index_]; 36 37 assert(item.unacknowledged + item.acknowledged + item.unsent 38 == static_cast<size_t>(item.data.size())); 39 assert(item.unsent > 0); 40 41 out = item.data.subspan(item.acknowledged + item.unacknowledged); 42 auto sz = static_cast<size_t>(out.size()); 43 44 assert(sz == item.unsent); 45 46 if (sz > window_size) { 47 sz = window_size; 48 out = out.subspan(0, window_size); 49 } 50 51 item.unsent -= sz; 52 item.unacknowledged += sz; 53 54 if (item.some) { 55 assert(item.acknowledged == 0); 56 some = true; 57 ++active_index_; 58 } else { 59 some = false; 60 if (item.unsent == 0) { 61 ++active_index_; 62 } 63 } 64 65 assert(item.unacknowledged + item.acknowledged + item.unsent 66 == static_cast<size_t>(item.data.size())); 67 68 assert(total_unsent_size_ >= sz); 69 total_unsent_size_ -= sz; 70 71 return window_size - sz; 72 } 73 74 WriteQueue::AckResult WriteQueue::ackDataSent(size_t size) { 75 AckResult result; 76 77 if (queue_.empty() || size == 0) { 78 // inconsistency, must not be called if nothing to ack 79 result.data_consistent = false; 80 return result; 81 } 82 83 auto &item = queue_.front(); 84 85 auto total_size = item.acknowledged + item.unacknowledged + item.unsent; 86 87 assert(total_size == static_cast<size_t>(item.data.size())); 88 89 if (size > item.unacknowledged) { 90 // inconsistency, more data is acked than callback was put for 91 result.data_consistent = false; 92 return result; 93 } 94 95 bool completed = false; 96 97 if (item.some) { 98 completed = true; 99 total_size = size; 100 101 } else { 102 item.unacknowledged -= size; 103 item.acknowledged += size; 104 105 completed = (item.acknowledged == total_size); 106 } 107 108 if (!completed) { 109 assert(total_size > item.acknowledged); 110 // data partially acknowledged, early to call the callback 111 result.data_consistent = true; 112 return result; 113 } 114 115 // acknowledging a portion of data was written 116 result.cb.swap(item.cb); 117 result.size_to_ack = total_size; 118 result.data_consistent = true; 119 120 queue_.pop_front(); 121 if (queue_.empty()) { 122 assert(total_unsent_size_ == 0); 123 active_index_ = 0; 124 } else if (active_index_ > 0) { 125 --active_index_; 126 } 127 128 return result; 129 } 130 131 std::vector<Writer::WriteCallbackFunc> WriteQueue::getAllCallbacks() { 132 std::vector<Writer::WriteCallbackFunc> v; 133 v.reserve(queue_.size()); 134 for (auto &item : queue_) { 135 if (!item.cb) { 136 continue; 137 } 138 v.emplace_back(); 139 item.cb.swap(v.back()); 140 } 141 return v; 142 } 143 144 void WriteQueue::clear() { 145 active_index_ = 0; 146 total_unsent_size_ = 0; 147 std::deque<Data> tmp_queue; 148 queue_.swap(tmp_queue); 149 } 150 151 } // namespace libp2p::basic
这段 C++ 代码定义了一个名为 WriteQueue
的类。该类实现了一个写队列,用于管理数据写入操作。
具体来说,该类实现了以下功能:
-
canEnqueue
函数:该函数用于判断是否可以向写队列中加入指定大小的数据。 -
unsentBytes
函数:该函数返回当前写队列中未发送的字节数。 -
enqueue
函数:该函数用于向写队列中加入数据以及与其相关的回调函数。 -
dequeue
函数:该函数用于从写队列中出队数据。 -
ackDataSent
函数:该函数用于通知写队列一部分数据已经被写入。 -
getAllCallbacks
函数:该函数用于获取写队列中所有的回调函数。
类的实现中使用了一些 STL 容器,如 vector
和 deque
,以及一些 C++ 的语言特性,如智能指针和 std::move
等。
这段代码还使用了一个 assert
宏,该宏用于检测程序运行时是否满足特定条件,并在不满足条件时终止程序运行。