明确函数作用
-
fill_window() 只关心 _segments_out 和 _outstanding_segments 这两个数据结构,它要做的就是构造报文并将其添加到这两个数据结构中,添加到 _segments_out 中就表示发送了,添加到 _outstanding_segments 中就表示这个报文段是未确认的
-
ack_received() 只关心 _outstanding_segments,它要做的是根据传入的 ackno 从 _outstanding_segments 中丢掉已经确认的报文段
-
tick() 只关心 _outstanding_segments,它要做的是累计时间,若超时的话,重传 _outstanding_segments 中最早的报文段
-
bytes_in_flight() 指明了 _outstanding_segments 占据的序列号长度
TCPSender 设计
class TCPSender {
private:
...
// 指明当前 TCPSender 所处状态,方便程序编写,_st 初始化为 CLOSED 状态
enum class SenderState { CLOSED = 0, SYN_SENT, SYN_ACKED, FIN_SENT, FIN_ACKED, ERROR };
SenderState _st{SenderState::CLOSED};
// 使用队列来存储未解决的报文段
std::queue<TCPSegment> _outstanding_segments{};
// _next_ackno 表示 TCPSender 希望被 ack 的第一个绝对序列号,也就是说 _next_seqno - _next_ackno 即为 bytes_in_flight()
uint64_t _next_ackno{0};
// 对端窗口大小,初始化为 1
size_t _window_size{1};
// 重传计时器
Timer _timer;
// 私有成员函数,发送一个新报文段,fill_window() 使用
void send_new_segment(const TCPSegment &seg);
public:
...
};
Timer 设计
具体逻辑
只有一个计时器,该计时器记录的时间是最早加入 _outstanding_segments 中的那个报文段的时间,连续重传的次数也是重传该报文段的次数,故 Timer 类设计如下
class Timer {
private:
bool _state; // 指示计时器当前状态,true 是启动,false 是关闭
unsigned int _initial_retransmission_timeout; // 初始 rto
unsigned int _cur_retransmission_timeout; // 当前 rto
unsigned int _time; // 累计时间
unsigned int _consecutive_retransmissions; // 连续重传次数
public:
Timer(unsigned int retx_timeout)
: _state(false)
, _initial_retransmission_timeout(retx_timeout)
, _cur_retransmission_timeout(retx_timeout)
, _time(0)
, _consecutive_retransmissions(0) {}
void run() {
_state = true;
_time = 0;
}
void stop() { _state = false; }
bool state() const { return _state; }
bool expired(const size_t ms_since_last_tick) {
_time += ms_since_last_tick;
return _time >= _cur_retransmission_timeout;
}
void double_rto() { _cur_retransmission_timeout *= 2; }
void reset_rto() { _cur_retransmission_timeout = _initial_retransmission_timeout; }
unsigned int consecutive_retx() const { return _consecutive_retransmissions; }
void inc_consecutive_retx() { _consecutive_retransmissions++; }
void reset_consecutive_retx() { _consecutive_retransmissions = 0; }
};
Timer 如何使用
在 fill_window() 中
在 ack_received() 中
在 tick() 中
fill_window() 设计
发送一个新报文段
void TCPSender::send_new_segment(const TCPSegment &seg) {
// 对于不占据序列长度的报文段不予发送
if (seg.length_in_sequence_space() > 0) {
segments_out().push(seg);
_outstanding_segments.push(seg);
// 发送了新报文段,就要更新 _next_seqno
_next_seqno += seg.length_in_sequence_space();
// 如果计时器未打开,就打开它
if (_timer.state() == false) {
_timer.run();
}
}
}
具体逻辑
对于 fill_window() 函数
- 在 CLOSED 状态下只发送不带有 payload 的 SYN 报文,然后到 SYN_SENT 状态
- 在 SYN_ACKED 状态下发送带有 payload 的报文,如果 stream_in().eof() 且对端窗口仍有空间就发送 FIN 报文,然后到 FIN_SENT 状态
- 其他状态不需要操作
void TCPSender::fill_window() {
switch (_st) {
case SenderState::CLOSED: {
TCPSegment seg;
seg.header().seqno = next_seqno();
seg.header().syn = true;
send_new_segment(seg);
_st = SenderState::SYN_SENT;
break;
}
case SenderState::SYN_ACKED: {
// 如果对端窗口大小为 0 的话看作是 1
size_t sequence_space = _window_size == 0 ? 1 : _window_size;
// 只要有空间就尝试从字节流中读取数据,打包成新报文发送
while (bytes_in_flight() < sequence_space) {
TCPSegment seg;
seg.header().seqno = next_seqno();
size_t len = min(TCPConfig::MAX_PAYLOAD_SIZE, sequence_space - bytes_in_flight());
seg.payload() = Buffer(stream_in().read(len));
if (stream_in().eof() && bytes_in_flight() + seg.length_in_sequence_space() < sequence_space) {
seg.header().fin = true;
_st = SenderState::FIN_SENT;
}
send_new_segment(seg);
// 如果已经发送了 FIN 报文或构造的报文所占据的序列长度为 0 就要跳出
if (_st == SenderState::FIN_SENT || seg.length_in_sequence_space() == 0) {
break;
}
}
break;
}
case SenderState::FIN_ACKED: {
break;
}
case SenderState::ERROR: {
break;
}
default: {
break;
}
}
}
ack_received() 设计
对于 ack_received() 函数
- 在 SYN_SENT 状态下,如果 SYN 报文段被确认,就到 SYN_ACKED 状态
- 在 FIN_SENT 状态下,如果 FIN 报文段被确认,就到 FIN_ACKED 状态
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t next_ackno = unwrap(ackno, _isn, _next_ackno); // receiver 已经确认的绝对序列号 + 1
_window_size = window_size;
// 如果 next_ackno > next_seqno_absolute(),显然出错,因为 Sender 还没发
// 如果 next_ackno <= _next_ackno,确认不了任何 _outstanding_segment
if (next_ackno > next_seqno_absolute() || next_ackno <= _next_ackno) return;
while (!_outstanding_segments.empty()) {
TCPSegment tmp = _outstanding_segments.front();
// 要确保最早的报文段整个能被确认,才 pop
if (_next_ackno + tmp.length_in_sequence_space() <= next_ackno) {
_outstanding_segments.pop();
_timer.reset_rto();
_timer.reset_consecutive_retx();
_next_ackno += tmp.length_in_sequence_space();
if (tmp.header().syn) {
_st = SenderState::SYN_ACKED;
} else if (tmp.header().fin) {
_st = SenderState::FIN_ACKED;
}
} else {
break;
}
}
if (_outstanding_segments.empty()) {
_timer.stop();
} else {
_timer.run();
}
}
tick() 设计
对于 tick() 函数
累计时间,如果超时就重传
void TCPSender::tick(const size_t ms_since_last_tick) {
if (_timer.state() && _timer.expired(ms_since_last_tick)) {
TCPSegment tmp = _outstanding_segments.front();
segments_out().push(tmp);
if (_window_size > 0) {
_timer.inc_consecutive_retx();
_timer.double_rto();
}
_timer.run();
}
}
其他函数
uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _next_ackno; }
unsigned int TCPSender::consecutive_retransmissions() const { return _timer.consecutive_retx(); }
void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = next_seqno();
segments_out().push(seg);
}
标签:seg,seqno,报文,next,lab3,window,segments
From: https://www.cnblogs.com/cong0221/p/17240250.html