CS144 Lab 3

by on under CS144
4 minute read

上一个实验:Lab 2

这篇文章会讲解 CS144 Lab 3 的思路和代码。

Lab 3

这个 Lab 的目标是写一个 TCP sender,它主要支持三个功能:

  • _stream 里读取数据并组成 TCPSegment,并把它发送给上一个 Lab 实现的 TCPReceiver
  • 支持重发超时的 TCPSegment
  • 支持发送空的 TCPSegment,这个功能会在下个 Lab 里用到。

git merge origin/lab3-startercode 合并 Lab 3 的代码后,我们就可以开始啦。

这次的 Lab 难度较大,因为 sender 的行为并不完全是课程中学到的 sender 行为,比如没有拥塞控制系统,也有一些机制是自定义的(比如超时重传机制,比如三次握手和四次挥手),所以需要仔细读文档。

既然是 Sender,咱们可以从核心的发送功能出发,思考我们需要什么变量,粗略思考后,至少这些东西是必须的:

  • bool _syn_sent:用来记录是否已经发送过 SYN;
  • bool _syn_ack_received: 用来记录是否收到了来自接收方的 SYN/ACK;
  • bool _fin_sent:用来记录是否已经发送 FIN;
  • size_t _receiver_window_size: 我们需要通过接受 ackno 来确认 receiver 的窗口大小,从而控制发送的报文段数量;
    • 这里的报文段数量一词是不准确的,更准确的说法是 payload + syn/fin,但是这么写很麻烦,所以将就着看吧;
    • SYN, FIN 是会占据序列号的,所以在计算窗口大小的时候,也要考虑他们。
  • size_t _receiver_free_size, size_t _bytes_in_flight:仅仅有上面的变量是不够的,有一些报文段还在传输过程中,这些也会影响我们接下来可以发送的报文段数量;

有了这些我们可以写出 fill_windowsend_segment,后者只被前者调用,具体逻辑我会在注释里解释。

void TCPSender::fill_window() {
    if (!_syn_sent) {
        // Have not sent SYN yet, just send SYN
        _syn_sent = true;
        TCPSegment seg;
        seg.header().syn = true;
        send_segment(seg);
        return;
    }
    if ((!_syn_ack_received) || (!_stream.buffer_size() && !_stream.eof()) || _fin_sent) {
        /*
        There are 3 cases we should send nothing.
        1. Have not received SYN/AK yet(SYN is in the flight);
        2. No more data from the input stream;
        3. Have already sent FIN.
        */
        return;
    }

    if (_receiver_window_size) {
        // There are space to receive data
        while (_receiver_free_space) {
            /*
            Maxium possible space that we estimate. The difference between 
            _receiver_window_size and _receiver_free_space is that the latter 
            one contained the bytes in flight.
            */
            TCPSegment seg;
            size_t payload_size = min({_stream.buffer_size(),
                                       static_cast<size_t>(_receiver_free_space),
                                       static_cast<size_t>(TCPConfig::MAX_PAYLOAD_SIZE)});
            seg.payload() = _stream.read(payload_size);
            if (_stream.eof() && static_cast<size_t>(_receiver_free_space) > payload_size) {
                /* 
                FIN will take 1 bit of _receiver_free_space, 
                so we need to make sure there are enough space (extra 1 bit).
                */
                seg.header().fin = true;
                _fin_sent = true;
            }
            send_segment(seg);
            if (_stream.buffer_empty())
                break;
        }
    } else if (_receiver_free_space == 0) {
        /*
        The zero-window-detect-segment should only be sent once (retransmition excute by tick function). Before it is sent, _receiver_free_space is zero. 
        Then it will be -1. This part is asked by the documentation
        */
        TCPSegment seg;
        if (_stream.eof()) {
            seg.header().fin = true;
            _fin_sent = true;
            send_segment(seg);
        } else if (!_stream.buffer_empty()) {
            // send 1 byte package
            seg.payload() = _stream.read(1);
            send_segment(seg);
        }
    }
}

void TCPSender::send_segment(TCPSegment &seg) {
    /*
    Just send the target TCPSegment, we use _segments_outstanding to track
    outstanding segments.
    */
    seg.header().seqno = wrap(_next_seqno, _isn);
    _next_seqno += seg.length_in_sequence_space();
    _bytes_in_flight += seg.length_in_sequence_space();
    if (_syn_sent)
        _receiver_free_space -= seg.length_in_sequence_space();
    _segments_out.push(seg);
    _segments_outstanding.push(seg);
    if (!_timer_running) {
        // Start the timer when there are outstanding bytes
        _timer_running = true;
        _time_elapsed = 0;
    }
}

发送的核心部分写完后,我们需要思考接受 ACK 和超时重传机制。接受 ACK 的很清晰,就是第一次接受 ACK 应该是三次握手过程中的,接受 SYN/ACK,所以我们需要将 _syn_ack_received 置为 true,之后每次收到 ACK 我们都应该将 _segments_outstanding 里追踪的已经得到确认的报文段释放掉。最后再调用 fill_window

接下来是超时重传机制,虽然原文档里建议自己实现一个 timer 类,但是我想不出来,于是思路和代码参考了别人的实现,主要是这篇文章。只有当被调用 tick 方法的时候,我们才能更新计时器的数值(_time_elapsed)。注意这里不包含关闭停止计时器,计时器关闭取决于 _segments_outstanding,它为空,咱们就关闭,它不为空,咱们就打开。

具体到tick函数里,我们希望通过记录之前的时间,以及新传入的时间,我们找到超时的那 一个 报文段,并且把它重传(放进 _segments_out 里),并且根据文档,当接收方还有窗口大小时,我们需要翻倍 RTO,翻倍 RTO 是目的是一种 Exponential Backoff 机制,我个人的理解是估计 RTT,参考这里

void TCPSender::tick(const size_t ms_since_last_tick) {
    if (!_timer_running)
        return;
    _time_elapsed += ms_since_last_tick;
    if (_time_elapsed >= _rto) {
        _segments_out.push(_segments_outstanding.front());
        if (_receiver_window_size || (!_syn_ack_received)) {
            /*
            Notice that here is a corner case, if the SYN hasn't been acked
            (in this case the _receiver_window_size may be 0),
            we need to double the _rto as well.
            */
            ++_consecutive_retransmissions;
            _rto <<= 1;
        }
        _time_elapsed = 0;
    }
}

void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);
    if (!ack_valid(abs_ackno))
        return;
    _syn_ack_received = true;
    _receiver_window_size = window_size;
    _receiver_free_space = window_size;
    while (!_segments_outstanding.empty()) {
        TCPSegment seg = _segments_outstanding.front();
        size_t curr_seg_abs_seqno = unwrap(seg.header().seqno, _isn, _next_seqno);
        uint64_t curr_seg_payload = seg.length_in_sequence_space();
        if (curr_seg_abs_seqno + curr_seg_payload <= abs_ackno) {
            _bytes_in_flight -= curr_seg_payload;
            _segments_outstanding.pop();
            _time_elapsed = 0;
            _rto = _initial_retransmission_timeout;
            _consecutive_retransmissions = 0;
        } else {
            break;
        }
    }
    if (!_segments_outstanding.empty()) {
        // We need to consider the segment is received partly.
        _receiver_free_space =
            static_cast<uint16_t>(static_cast<uint64_t>(window_size) - _bytes_in_flight + abs_ackno -
                                  unwrap(_segments_outstanding.front().header().seqno, _isn, _next_seqno));
    }

    if (!_bytes_in_flight) {
        _timer_running = false;
    }

    fill_window();
}

这些就是主体部分了。还有一个send_empty_segment参考如下:

void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = wrap(_next_seqno, _isn);
    _segments_out.push(seg);
}

其余的代码部分:

unsigned int TCPSender::consecutive_retransmissions() const { 
    return _consecutive_retransmissions; 
}

bool TCPSender::ack_valid(uint64_t abs_ackno) {
    return abs_ackno <= _next_seqno; // fixed from 12-17
}

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _rto(_initial_retransmission_timeout) {}

uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }

bool _syn_sent = false;
bool _syn_ack_received = false;
bool _fin_sent = false;
uint64_t _bytes_in_flight = 0;
uint16_t _receiver_window_size = 0;
uint16_t _receiver_free_space = 0;
uint16_t _consecutive_retransmissions = 0;
unsigned int _rto = 0;
unsigned int _time_elapsed = 0;
bool _timer_running = false;
std::queue<TCPSegment> _segments_outstanding{};

完成后我们在build目录下编译并测试,和之前类似:

$ CXX=g++-8 cmake ..
$ make format
$ make -j4
$ make check_lab3

通过的话会得到类似字样:

100% tests passed, 0 tests failed out of 33

Total Test time (real) =   1.65 sec

下一个实验:Lab 4

CS144, computer networking
comments powered by Disqus