lab 地址 :lab3-doc
代码实现:lab3-code
完整目录:
0. ByteStream
1. StreamReassembler
2. TCPReceiver
3. TCPSender
4. TCPConnection
5. ARP
6. IP-Router
1. 目标
lab2 实现完 TCPReceiver
后,需要实现 TCPSender
,相对于前者,Sender 要实现的功能相对更复杂一些,需要支持:
- 根据 Sender 当前的状态对可发送窗口进行填充,发包
- Sender 需要根据对方通知的窗口大小和 ackno 来确认对方当前收到的字节流进度
- 需要支持超时重传机制,根据时间变化(RTO),定时重传那些还没有 ack 的报文
2. 实现
2.1 窗口填充
这里主要根据 lab 中提供的状态说明图进行实现:
理论上只有 3 种情况需要处理:
- 初始化,需要发送 SYN 包,即 CLOSED 状态,此刻需要填充 Syn 包
- ByteStream 中还有数据可写入发送,且对方窗口大小足够,此时正常按照 payload 大小限制填充数据包
- ByteStream 已经 eof,但是 Fin 包还未发送,此刻需要填充 Fin 包
为了配合重传机制的实现,当填充成功一个数据包的同时,也需要对其进行缓存备份,代码如下:
void TCPSender::fill_window() {
// CLOSED
if (next_seqno_absolute() == 0) {
// need to send the syn
TCPSegment seg;
seg.header().syn = true;
seg.header().seqno = _isn;
_segments_out.push(seg);
_next_seqno += seg.length_in_sequence_space();
_bytes_in_flight += seg.length_in_sequence_space();
// cache the segments
_cache_segment(next_seqno_absolute() , seg);
}
// SYN_ACKED
if (not stream_in().eof() && next_seqno_absolute() > bytes_in_flight()) {
// if windows size equal zero , the fill window method should act like the window size is one
size_t max_seg_size = TCPConfig::MAX_PAYLOAD_SIZE;
size_t remaining_wsz = _peer_windows_size ? _peer_windows_size : 1;
size_t flight_size = bytes_in_flight();
if (remaining_wsz < flight_size) {
return;
}
remaining_wsz -= flight_size;
string read_stream = _stream.read(min(remaining_wsz, max_seg_size));
while (!read_stream.empty()) {
TCPSegment seg;
seg.header().seqno = next_seqno();
seg.payload() = Buffer(std::move(read_stream));
remaining_wsz -= seg.length_in_sequence_space();
if (stream_in().eof() && next_seqno_absolute() < stream_in().bytes_written() + 2 && remaining_wsz >= 1) {
seg.header().fin = true;
remaining_wsz -= 1;
}
_next_seqno += seg.length_in_sequence_space();
_bytes_in_flight += seg.length_in_sequence_space();
_segments_out.push(seg);
// cache the seg
_cache_segment(next_seqno_absolute(), seg);
read_stream = _stream.read(min(remaining_wsz, max_seg_size));
}
}
// SYN_ACKED
if (stream_in().eof() && next_seqno_absolute() < stream_in().bytes_written() + 2) {
// stream has reached EOF, but FIN flag hasn't been sent yet. so send the fin
size_t remaining_wsz = _peer_windows_size ? _peer_windows_size : 1;
size_t flight_size = bytes_in_flight();
if (remaining_wsz <= flight_size) {
return;
}
TCPSegment seg;
seg.header().seqno = next_seqno();
seg.header().fin = true;
_next_seqno += seg.length_in_sequence_space();
_bytes_in_flight += seg.length_in_sequence_space();
_segments_out.push(seg);
// cache the segments
_cache_segment(next_seqno_absolute(), seg);
}
}
2.2 定时重传
每次发送数据包的会对其进行缓存,当超过 RTO 时间没有收到这个包的确认时,就需要执行重传,更新等待时间为 当前 RTO * 2,假如有多个数据包超时,每次只会重传 seqno 最小的数据包。
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
if (_segments_cache.empty()) {
return;
}
_time += ms_since_last_tick;
if (_time >= _initial_retransmission_timeout && _retx_times <= TCPConfig::MAX_RETX_ATTEMPTS) {
_segments_out.push(_segments_cache.begin()->second);
_time = 0;
if (_peer_windows_size != 0) {
_initial_retransmission_timeout *= 2;
_retx_times += 1;
}
}
}
需要注意的是,只有当对方通知的窗口大小不为 0 的时候,才更新等待时间为 RTO * 2。
2.3 Ack 确认
Ack 确认的逻辑主要有几点:
- 如果有有效确认(即缓存的未确认数据包有被成功确认的情况),则需要:
- 更新重传机制相关变量(重传时间,重传次数)
- 删除确认成功数据包的缓存
- 填充窗口,因为对端成功确认了数据,对端可用窗口变大了
- 更新对端窗口大小
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
int left_need_ack_bytes = next_seqno() - ackno;
if (left_need_ack_bytes < 0) {
return;
}
// pop the cache segments;
vector<uint64_t> remove_acknos;
for (auto & seg : _segments_cache) {
uint64_t abs_ackno = unwrap(ackno, _isn, seg.first);
if (seg.first <= abs_ackno) {
remove_acknos.push_back(seg.first);
}
}
for (auto & remove_ackno : remove_acknos) {
_segments_cache.erase(remove_ackno);
}
_peer_windows_size = window_size;
// valid ackno
if (!remove_acknos.empty()) {
_bytes_in_flight = left_need_ack_bytes;
_time = 0;
_retx_times = 0;
_initial_retransmission_timeout = _default_initial_retransmission_timeout;
fill_window();
}
}