Checkpoint 3 Writeup
该lab主要实现TCP发送方,细节比较多,具有一定难度,编写时需要从整体上理清设计思路,然后再实现具体的函数。
Timer
由于要实现TCP中的超时重传功能,所以需要在发送方维护一个定时器,但不需要自己使用计时函数,因为文档里说明了所有对时间的了解都是通过tick函数得到的,每次能从函数参数中得到已经流逝的时间。
通过阅读文档中发送方的行为,判断计时器应该完成的任务有:
开始计时、停止计时、判断是否正在计时、判断是否超时、RTO翻倍、RTO重置
因此本地应该维护四个成员初始RTO、当前RTO、当前时间、是否正在计时
下面是计时器的实现
class Timer
{
uint64_t initial_RTO_ms_;
uint64_t RTO_ms;
uint64_t time_ms { 0 };
bool is_timing { false };
public:
Timer( uint64_t ms )
: initial_RTO_ms_( ms )
, RTO_ms( ms )
{}
void start()
{
time_ms = 0;
is_timing = true;
}
void stop() { is_timing = false; }
bool timing() { return is_timing; }
bool expired()
{
return time_ms >= RTO_ms;
}
void tick( uint64_t ms_since_last_tick )
{
if ( is_timing ) {
time_ms += ms_since_last_tick;
}
}
void set_RTO() { RTO_ms = initial_RTO_ms_; }
void double_RTO() { RTO_ms = 2 * RTO_ms; }
};
TCPSender
包含成员变量
Wrap32 isn_;
bool SYN_sent { false };
bool FIN_sent { false };
uint64_t initial_RTO_ms_;
/*
? why 0 wrong , 1 right
first SYN may be back off retranmission, so not 0
win_size = 0 we must send a 1 sized message
*/
uint64_t win_size { 1 };
uint64_t ackno { 0 };
uint64_t sentno { 0 };
uint64_t retransmissions_count { 0 };
uint64_t outbytes_count { 0 };
std::queue<TCPSenderMessage> out_standing {};
std::queue<TCPSenderMessage> sending_queue {};
Timer timer;
其中Wrap32 isn_是一个32位的无符号整数,用于记录初始序列号,SYN_sent和FIN_sent用于记录是否发送过SYN和FIN,initial_RTO_ms_用于记录初始RTO,win_size用于记录窗口大小,ackno用于记录期望收到的ack(即已经确认的数据下一个序列号),sentno用于记录要发送的下一个序列号,retransmissions_count用于记录重传次数,outbytes_count用于记录已发送但未收到ack的字节数,out_standing用于记录已发送但未被确认的数据(队列的特性保证了out_standing队首报文一定是未确认的最早报文),sending_queue用于存放待发送的的数据,timer为定时器。
有一个细节是窗口大小初始值不应该为0,因为当窗口大小为0时,发送方无法判断这是到底是由于有容量但是SYN丢失还是接收方确实没容量,导致超时不会增加重传计数并加倍。
-
optional
TCPSender::maybe_send()
从发送队列sending_queue中取出一个TCPSenderMessage发送,如果计时器未运行,重启定时器 -
void TCPSender::push( Reader& outbound_stream )
从outbound_stream中循环读取数据,将数据封装成TCPSenderMessage放入sending_queue中- 决定是否发送SYN,这里无论是SYN还是FIN报文都可以携带数据
- 读取尽可能多但不超过窗口大小的payload,注意SYN和FIN报文也算入窗口大小,算入序列
- 决定是否发送FIN
- 直接丢弃空报文(在Reader无数据可读且未关闭的情况下,上述步骤可能会产生空报文)
- 增加sentno,并把报文放入发送队列sending_queue和未确认队列out_standing
-
void TCPSender::receive( const TCPReceiverMessage& msg )
接收接收方的ACK报文- 更新win_size
- 丢弃ackno为空的报文和包含不可能存在的ackno的报文
- 更新本地ackno,由接收方的性质可知TCPReceiverMessage的ackno之前的数据已全部接收,故只要大于本地ackno即可更新,并从未完成队列中清除已确认的报文,同时重置计时器RTO,若未完成队列非空,则还要启动计时器。
-
TCPSenderMessage TCPSender::send_empty_message() const
发送空报文,可以用于检测程序是否正确 -
void TCPSender::tick( const size_t ms_since_last_tick )
更新计时器计数值,若计时器超时,则重传未确认的数据(将out_standing队首报文放入sending_queue),重启计时器。如果窗口大小非零还要重传次数加一,RTO加倍。
/* TCPSender constructor (uses a random ISN if none given) */
TCPSender::TCPSender( uint64_t initial_RTO_ms, optional<Wrap32> fixed_isn )
: isn_( fixed_isn.value_or( Wrap32 { random_device()() } ) )
, initial_RTO_ms_( initial_RTO_ms )
, timer( initial_RTO_ms )
{}
uint64_t TCPSender::sequence_numbers_in_flight() const
{
// Your code here.
return outbytes_count;
}
uint64_t TCPSender::consecutive_retransmissions() const
{
// Your code here.
return retransmissions_count;
}
optional<TCPSenderMessage> TCPSender::maybe_send()
{
// Your code here.
if ( sending_queue.empty() ) {
return std::nullopt;
}
if ( !timer.timing() ) {
timer.start();
}
TCPSenderMessage tosend = sending_queue.front();
sending_queue.pop();
return tosend;
}
void TCPSender::push( Reader& outbound_stream )
{
// Your code here.
// using temp var to ensure no back off rto in tick
uint64_t cur_win_size = ( win_size > 0 ? win_size : 1 );
while ( outbytes_count < cur_win_size ) {
/*
why inside the loop? wrong outside(cannt fill window, in fact like a random size)
message contains Buffer, Buffer contans a shared_ptr<string> ,we must push each ptr(different) to
string(different), rather than put the same ptr to only one thing(may not same as changed)
*/
TCPSenderMessage tosend;
if ( !SYN_sent ) {
SYN_sent = true;
tosend.SYN = true;
outbytes_count++;
}
tosend.seqno = Wrap32::wrap( sentno, isn_ );
read( outbound_stream, min( cur_win_size - outbytes_count, TCPConfig::MAX_PAYLOAD_SIZE ), tosend.payload );
outbytes_count += tosend.payload.size();
if ( !FIN_sent && outbound_stream.is_finished() && outbytes_count < cur_win_size ) {
FIN_sent = true;
tosend.FIN = true;
outbytes_count++;
}
if ( !tosend.sequence_length() ) {
break;
}
sentno += tosend.sequence_length();
sending_queue.push( tosend );
out_standing.push( tosend );
// !
if ( FIN_sent || outbound_stream.bytes_buffered() == 0 ) {
break;
}
}
}
TCPSenderMessage TCPSender::send_empty_message() const
{
// Your code here.
return TCPSenderMessage { Wrap32::wrap( sentno, isn_ ), false, {}, false };
}
void TCPSender::receive( const TCPReceiverMessage& msg )
{
win_size = msg.window_size;
if ( msg.ackno.has_value() ) {
auto ackno_recv = msg.ackno.value().unwrap( isn_, sentno );
if ( ackno_recv > sentno ) {
return;
}
if ( ackno_recv > ackno ) {
ackno = ackno_recv;
while ( !out_standing.empty() ) {
TCPSenderMessage& first_msg = out_standing.front();
// only having been acked, the message can be removed from out_standing
if ( first_msg.seqno.unwrap( isn_, sentno ) + first_msg.sequence_length() <= ackno ) {
outbytes_count -= first_msg.sequence_length();
out_standing.pop();
} else {
break;
}
}
timer.set_RTO();
retransmissions_count = 0;
if ( out_standing.empty() )
timer.stop();
else
timer.start();
}
}
}
void TCPSender::tick( const size_t ms_since_last_tick )
{
// Your code here.
timer.tick( ms_since_last_tick );
if ( timer.expired() ) {
sending_queue.push( out_standing.front() );
if ( win_size != 0 ) {
retransmissions_count++;
timer.double_RTO();
}
timer.start();
}
}
标签:uint64,ackno,CS144,lab3,ms,TCPSender,RTO,size
From: https://www.cnblogs.com/wangerblog/p/17757913.html