lab 地址 :lab2-doc
代码实现:lab2-code
完整目录:
0. ByteStream
1. StreamReassembler
2. TCPReceiver
3. TCPSender
4. TCPConnection
5. ARP
6. IP-Router
1. 目标
lab0 实现了一个读写字节流 ByteStream
,lab1 实现了可靠有序不重复的字节流组装器 StreamReassembler
,接下来要实现更上层的封装,TCPSender
和 TCPReceiver
,分别负责 TCPConnection
的收发包功能。
首先要实现的是 TCPReceiver
。
除了写入传入流之外,TCPReceiver
负责告诉 TCPSender
两件事:
1.“第一个未组装”字节的索引,称为“确认号”(ackno),这是接收方从发送方需要的第一个字节。
2.“第一个未组装”索引和“第一个不可接受”索引之间的距离。这称为“窗口大小”。
ackno 和窗口大小一起描述了接收方的窗口:允许 TCPSender
发送的一系列索引。使用该窗口,接收方可以控制传入数据的流量,使发送方限制其发送量,直到接收方准备好接收更多数据。我们有时将 ackno
称为窗口的“左边缘”(TCPReceiver
感兴趣的最小索引),将 ackno
+ 窗口大小称为“右边缘”(刚好超出 TCPReceiver
所感兴趣的最大索引)。
2. 实现
正常情况下,TCPReceiver 会收到 3 种报文:
- SYN 报文,带着初始 ISN ,用来标记字节流的起始位置,通常 ISN 是随机值,防止被攻击
- FIN 报文,表明通信结束
- 普通的数据报文,只需要写入 payload 到 ByteStream 即可。
TCP 报文头部的seqno
标识了 payload 字节流在完整字节流中的起始位置,然而这个字段只有 32 位,也就是说最多只能支持 4gb 的字节流,这显然是远远不够的,为了解决上面的问题,引入了absolute sequence number
(简称 abs_seqno)的概念,abs_seqno
定义为uin64_t
类型,这样可以支持最高2^64 - 1
长度的字节流。两者的区别如下:
stream index
其实际上就是 ByteStream
的字节流索引,只是少了 FIN 和 SYN 各自在字节流中的 1 个字节占用,它同样也是 uint64_t
类型。
abs_seqno
的起始位置永远是 0,这意味着它对于 seqno
会有 isn 长度的偏移,每次写入时都不断对其递增,由于其长度更长,即便 seqno
溢出了,abs_seqno
也能正常记录正确的长度。
2.1 seqno 和 abs_seqno 转换
由于 seqno 不是真正的 字节流 起始位置,因此接受报文时,需要对其转换成 abs_seqno
,才可以方便 TCPReceiver
中计算窗口大小。
转换的接口如下:
//! Transform a 64-bit absolute sequence number (zero-indexed) into a 32-bit relative sequence number
//! \param n the absolute sequence number
//! \param isn the initial sequence number
//! \returns the relative sequence number
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn);
//! Transform a 32-bit relative sequence number into a 64-bit absolute sequence number (zero-indexed)
//! \param n The relative sequence number
//! \param isn The initial sequence number
//! \param checkpoint A recent absolute sequence number
//! \returns the absolute sequence number that wraps to `n` and is closest to `checkpoint`
//! \note Each of the two streams of the TCP connection has its own ISN. One stream
//! runs from the local TCPSender to the remote TCPReceiver and has one ISN,
//! and the other stream runs from the remote TCPSender to the local TCPReceiver and
//! has a different ISN.
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint);
abs_seqno
转换成 seqno
比较简单,只需要把 abs_seqno
加上 isn
初始偏移量,然后取 abs_seqno
的低 32 位值即可。
seqno
转换成 abs_seqno 则稍微麻烦些,因为 seqno=17 可以表示多种 abs_seqno,如 2^32 + 17, or 2^33 + 17, 2^34 + 17 等等,这里引入一个 checkpoint 的概念,在 TCPReceiver 中 checkpoint 是当前写入的总字节数。
假定要将 n 从 seqno
转换成 abs_seqno
,这里先将 chekpoint
从 abs_seqno
转换成 seqno
,然后计算 n(seqno
版本) 和 checkpoint
(seqno
版本) 的偏移量,最后加到 checkpoint
(abs_seqno
版本)上面即可得出 n(abs_seqno
版本),参考下图:
实现如下:
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
uint64_t mask = (1ul << 32) - 1;
return WrappingInt32{static_cast<uint32_t>((n + isn.raw_value()) & mask)};
}
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
int32_t offset = n - wrap(checkpoint, isn);
int64_t result = checkpoint + offset;
result = result < 0 ? result + (1ul << 32) : result;
return result;
}
2.2 接收报文
处理报文比较简单,主要关注前面说的 SYN 和 FIN 报文即可,及时更新最新的 abs_seqno
。
void TCPReceiver::segment_received(const TCPSegment &seg) {
// 1. process syn flag
if (seg.header().syn) {
if (!_received_syn) {
_received_syn = true;
_isn = seg.header().seqno;
}
}
// 2. process fin flag
bool eof = false;
if (seg.header().fin) {
eof = true;
}
// 3. push payload
if (ackno().has_value() && !_reassembler.stream_out().input_ended()) {
// relative seqno to stream index
uint64_t stream_index = unwrap(seg.header().seqno, _isn, _checkpoint);
// stream index should in the window
uint64_t abs_ackno = unwrap(ackno().value(), _isn, _checkpoint);
if (stream_index + seg.length_in_sequence_space() <= abs_ackno || stream_index >= abs_ackno + window_size()) {
return;
}
if (!seg.header().syn) {
stream_index -= 1; // ignore syn flag;
}
_reassembler.push_substring(seg.payload().copy(), stream_index, eof);
_checkpoint = _reassembler.stream_out().bytes_written();
}
}
2.3 窗口大小和 ackno
窗口大小用于通知对端当前可以接收的字节流大小,ackno 用于通知对端当前接收的字节流进度。两者实现都比较简单,如下:
optional<WrappingInt32> TCPReceiver::ackno() const {
// next_write + 1 ,because syn flag will not push in stream
size_t next_write = _reassembler.stream_out().bytes_written() + 1;
next_write = _reassembler.stream_out().input_ended() ? next_write + 1 : next_write;
return !_received_syn ? optional<WrappingInt32>() : wrap(next_write, _isn);
}
size_t TCPReceiver::window_size() const {
return _reassembler.stream_out().remaining_capacity();
}