CS144 Lab 3
上一个实验:Lab 2
这篇文章会讲解 CS144 Lab 3 的思路和代码。
Lab 3
这个 Lab 的目标是写一个 TCP sender,它主要支持三个功能:
- 从
_stream
里读取数据并组成TCPSegment
,并把它发送给上一个 Lab 实现的TCPReceiver
; - 支持重发超时的
TCPSegment
; - 支持发送空的
TCPSegment
,这个功能会在下个 Lab 里用到。
在 git merge origin/lab3-startercode
合并 Lab 3 的代码后,我们就可以开始啦。
这次的 Lab 难度较大,因为 sender 的行为并不完全是课程中学到的 sender 行为,比如没有拥塞控制系统,也有一些机制是自定义的(比如超时重传机制,比如三次握手和四次挥手),所以需要仔细读文档。
既然是 Sender,咱们可以从核心的发送功能出发,思考我们需要什么变量,粗略思考后,至少这些东西是必须的:
bool _syn_sent
:用来记录是否已经发送过 SYN;bool _syn_ack_received
: 用来记录是否收到了来自接收方的 SYN/ACK;bool _fin_sent
:用来记录是否已经发送 FIN;size_t _receiver_window_size
: 我们需要通过接受 ackno 来确认 receiver 的窗口大小,从而控制发送的报文段数量;- 这里的报文段数量一词是不准确的,更准确的说法是 payload + syn/fin,但是这么写很麻烦,所以将就着看吧;
- SYN, FIN 是会占据序列号的,所以在计算窗口大小的时候,也要考虑他们。
size_t _receiver_free_size, size_t _bytes_in_flight
:仅仅有上面的变量是不够的,有一些报文段还在传输过程中,这些也会影响我们接下来可以发送的报文段数量;
有了这些我们可以写出 fill_window
和 send_segment
,后者只被前者调用,具体逻辑我会在注释里解释。
void TCPSender::fill_window() {
if (!_syn_sent) {
// Have not sent SYN yet, just send SYN
_syn_sent = true;
TCPSegment seg;
seg.header().syn = true;
send_segment(seg);
return;
}
if ((!_syn_ack_received) || (!_stream.buffer_size() && !_stream.eof()) || _fin_sent) {
/*
There are 3 cases we should send nothing.
1. Have not received SYN/AK yet(SYN is in the flight);
2. No more data from the input stream;
3. Have already sent FIN.
*/
return;
}
if (_receiver_window_size) {
// There are space to receive data
while (_receiver_free_space) {
/*
Maxium possible space that we estimate. The difference between
_receiver_window_size and _receiver_free_space is that the latter
one contained the bytes in flight.
*/
TCPSegment seg;
size_t payload_size = min({_stream.buffer_size(),
static_cast<size_t>(_receiver_free_space),
static_cast<size_t>(TCPConfig::MAX_PAYLOAD_SIZE)});
seg.payload() = _stream.read(payload_size);
if (_stream.eof() && static_cast<size_t>(_receiver_free_space) > payload_size) {
/*
FIN will take 1 bit of _receiver_free_space,
so we need to make sure there are enough space (extra 1 bit).
*/
seg.header().fin = true;
_fin_sent = true;
}
send_segment(seg);
if (_stream.buffer_empty())
break;
}
} else if (_receiver_free_space == 0) {
/*
The zero-window-detect-segment should only be sent once (retransmition excute by tick function). Before it is sent, _receiver_free_space is zero.
Then it will be -1. This part is asked by the documentation
*/
TCPSegment seg;
if (_stream.eof()) {
seg.header().fin = true;
_fin_sent = true;
send_segment(seg);
} else if (!_stream.buffer_empty()) {
// send 1 byte package
seg.payload() = _stream.read(1);
send_segment(seg);
}
}
}
void TCPSender::send_segment(TCPSegment &seg) {
/*
Just send the target TCPSegment, we use _segments_outstanding to track
outstanding segments.
*/
seg.header().seqno = wrap(_next_seqno, _isn);
_next_seqno += seg.length_in_sequence_space();
_bytes_in_flight += seg.length_in_sequence_space();
if (_syn_sent)
_receiver_free_space -= seg.length_in_sequence_space();
_segments_out.push(seg);
_segments_outstanding.push(seg);
if (!_timer_running) {
// Start the timer when there are outstanding bytes
_timer_running = true;
_time_elapsed = 0;
}
}
发送的核心部分写完后,我们需要思考接受 ACK 和超时重传机制。接受 ACK 的很清晰,就是第一次接受 ACK 应该是三次握手过程中的,接受 SYN/ACK,所以我们需要将 _syn_ack_received 置为 true,之后每次收到 ACK 我们都应该将 _segments_outstanding
里追踪的已经得到确认的报文段释放掉。最后再调用 fill_window
。
接下来是超时重传机制,虽然原文档里建议自己实现一个 timer 类,但是我想不出来,于是思路和代码参考了别人的实现,主要是这篇文章。只有当被调用 tick
方法的时候,我们才能更新计时器的数值(_time_elapsed
)。注意这里不包含关闭停止计时器,计时器关闭取决于 _segments_outstanding
,它为空,咱们就关闭,它不为空,咱们就打开。
具体到tick
函数里,我们希望通过记录之前的时间,以及新传入的时间,我们找到超时的那 一个 报文段,并且把它重传(放进 _segments_out
里),并且根据文档,当接收方还有窗口大小时,我们需要翻倍 RTO,翻倍 RTO 是目的是一种 Exponential Backoff 机制,我个人的理解是估计 RTT,参考这里。
void TCPSender::tick(const size_t ms_since_last_tick) {
if (!_timer_running)
return;
_time_elapsed += ms_since_last_tick;
if (_time_elapsed >= _rto) {
_segments_out.push(_segments_outstanding.front());
if (_receiver_window_size || (!_syn_ack_received)) {
/*
Notice that here is a corner case, if the SYN hasn't been acked
(in this case the _receiver_window_size may be 0),
we need to double the _rto as well.
*/
++_consecutive_retransmissions;
_rto <<= 1;
}
_time_elapsed = 0;
}
}
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);
if (!ack_valid(abs_ackno))
return;
_syn_ack_received = true;
_receiver_window_size = window_size;
_receiver_free_space = window_size;
while (!_segments_outstanding.empty()) {
TCPSegment seg = _segments_outstanding.front();
size_t curr_seg_abs_seqno = unwrap(seg.header().seqno, _isn, _next_seqno);
uint64_t curr_seg_payload = seg.length_in_sequence_space();
if (curr_seg_abs_seqno + curr_seg_payload <= abs_ackno) {
_bytes_in_flight -= curr_seg_payload;
_segments_outstanding.pop();
_time_elapsed = 0;
_rto = _initial_retransmission_timeout;
_consecutive_retransmissions = 0;
} else {
break;
}
}
if (!_segments_outstanding.empty()) {
// We need to consider the segment is received partly.
_receiver_free_space =
static_cast<uint16_t>(static_cast<uint64_t>(window_size) - _bytes_in_flight + abs_ackno -
unwrap(_segments_outstanding.front().header().seqno, _isn, _next_seqno));
}
if (!_bytes_in_flight) {
_timer_running = false;
}
fill_window();
}
这些就是主体部分了。还有一个send_empty_segment
参考如下:
void TCPSender::send_empty_segment() {
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno, _isn);
_segments_out.push(seg);
}
其余的代码部分:
unsigned int TCPSender::consecutive_retransmissions() const {
return _consecutive_retransmissions;
}
bool TCPSender::ack_valid(uint64_t abs_ackno) {
return abs_ackno <= _next_seqno; // fixed from 12-17
}
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _rto(_initial_retransmission_timeout) {}
uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }
bool _syn_sent = false;
bool _syn_ack_received = false;
bool _fin_sent = false;
uint64_t _bytes_in_flight = 0;
uint16_t _receiver_window_size = 0;
uint16_t _receiver_free_space = 0;
uint16_t _consecutive_retransmissions = 0;
unsigned int _rto = 0;
unsigned int _time_elapsed = 0;
bool _timer_running = false;
std::queue<TCPSegment> _segments_outstanding{};
完成后我们在build
目录下编译并测试,和之前类似:
$ CXX=g++-8 cmake ..
$ make format
$ make -j4
$ make check_lab3
通过的话会得到类似字样:
100% tests passed, 0 tests failed out of 33
Total Test time (real) = 1.65 sec
下一个实验:Lab 4
Let me know what you think of this article on twitter @CreamEggMeat or leave a comment below!