# 1.NACK的含義
丟包重傳(NACK)是抵抗網(wǎng)絡(luò)錯(cuò)誤的重要手段。NACK在接收端檢測(cè)到數(shù)據(jù)丟包后,發(fā)送NACK報(bào)文到發(fā)送端;發(fā)送端根據(jù)NACK報(bào)文中的序列號(hào),在發(fā)送緩沖區(qū)找到對(duì)應(yīng)的數(shù)據(jù)包,重新發(fā)送到接收端。NACK需要發(fā)送端,發(fā)送緩沖區(qū)的支持。
WebRTC中支持音頻和視頻的NACK重傳。我們這里只分析nack機(jī)制,不分析jitterbuffer或者neteq的更多實(shí)現(xiàn)。
# 2.WebRTC中NACK請(qǐng)求發(fā)送的條件
這里以視頻為例。
下面是webrtc中接收端觸發(fā)nack的條件,我們看下nack_module.cc文件中OnReceivedPacket的實(shí)現(xiàn)。
```
void NackModule::OnReceivedPacket(const VCMPacket& packet) {
? rtc::CritScope lock(&crit_);
? if (!running_)
? ? return;
? //獲取包的seqnum
? uint16_t seq_num = packet.seqNum;
? // TODO(philipel): When the packet includes information whether it is
? //? ? ? ? ? ? ? ? retransmitted or not, use that value instead. For
? //? ? ? ? ? ? ? ? now set it to true, which will cause the reordering
? //? ? ? ? ? ? ? ? statistics to never be updated.
? bool is_retransmitted = true;
? //判斷第一幀是不是關(guān)鍵幀
? bool is_keyframe = packet.isFirstPacket && packet.frameType == kVideoFrameKey;
//拿到第一個(gè)包的時(shí)候判斷,把第一個(gè)包的seqnum賦值給最新的last_seq_num,如果是關(guān)鍵幀的話,插入到關(guān)鍵幀列表中,同時(shí)把initialized_設(shè)置為true
? if (!initialized_) {
? ? last_seq_num_ = seq_num;
? ? if (is_keyframe)
? ? ? keyframe_list_.insert(seq_num);
? ? initialized_ = true;
? ? return;
? }
? if (seq_num == last_seq_num_)
? ? return;
//判斷有無(wú)亂序,亂序了,如來(lái)1,2,3,6包,然后來(lái)4包,就亂序了,就把4從nack_list中去掉,不再通知發(fā)送端重新發(fā)送4了
? if (AheadOf(last_seq_num_, seq_num)) {
? ? // An out of order packet has been received.
? ? //把重新收到的包從nack_list中移除掉
? ? nack_list_.erase(seq_num);
? ? if (!is_retransmitted)
? ? ? UpdateReorderingStatistics(seq_num);
? ? return;
? } else {
? //沒(méi)有亂序,如1,2,3,6包,就把(3+1,6)之間的包加入到nack_list中
? ? AddPacketsToNack(last_seq_num_ + 1, seq_num);
? ? last_seq_num_ = seq_num;
? ? // Keep track of new keyframes.
? ? if (is_keyframe)
? ? ? keyframe_list_.insert(seq_num);
? ? // And remove old ones so we don't accumulate keyframes.
? ? auto it = keyframe_list_.lower_bound(seq_num - kMaxPacketAge);
? ? if (it != keyframe_list_.begin())
? ? ? keyframe_list_.erase(keyframe_list_.begin(), it);
? ? // Are there any nacks that are waiting for this seq_num.
? ? //從nack_list 中取出需要發(fā)送 NACK 的序號(hào)列表, 如果某個(gè) seq 請(qǐng)求次數(shù)超過(guò) kMaxNackRetries = 10次則會(huì)從nack_list 中刪除.
? ? std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);
? ? //LOG(LS_INFO) << "nack_batch size[" << nack_batch.size() << "].";
? ? //在 NackModule 中觸發(fā)使用 NackSender::SednNack 發(fā)送 NACK 請(qǐng)求
? ? if (!nack_batch.empty())
? ? ? nack_sender_->SendNack(nack_batch);
? }
}
```
我們繼續(xù)跟蹤流程看下AddPacketsToNack函數(shù)的實(shí)現(xiàn)
```
void NackModule::AddPacketsToNack(uint16_t seq_num_start,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? uint16_t seq_num_end) {
? //LOG(LS_INFO) << "AddPacketsToNack. "
? //? ? ? ? ? ? << "start seq[" << seq_num_start
? //? ? ? ? ? ? << "],end seq[" << nack_list_.lower_bound(seq_num_end - kMaxPacketAge);
? nack_list_.erase(nack_list_.begin(), it);
? // If the nack list is too large, remove packets from the nack list until
? // the latest first packet of a keyframe. If the list is still too large,
? // clear it and request a keyframe.
? uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end);
? if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
? ? while (RemovePacketsUntilKeyFrame() &&
? ? ? ? ? nack_list_.size() + num_new_nacks > kMaxNackPackets) {
? ? }
? ? if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {
? ? ? nack_list_.clear();
? ? ? LOG(LS_WARNING) << "NACK list full, clearing NACK"
? ? ? ? ? ? ? ? ? ? ? ? " list and requesting keyframe.";
? ? //觸發(fā)關(guān)鍵幀請(qǐng)求
? ? ? keyframe_request_sender_->RequestKeyFrameNack();
? ? ? return;
? ? }
? }
? for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) {
? ? NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5),
? ? ? ? ? ? ? ? ? ? ? clock_->TimeInMilliseconds());
? ? RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end());
? ? nack_list_[seq_num] = nack_info;
? }
? //LOG(LS_INFO) << "nack_list size[" << nack_list_.size() << "]";
}
```
我們可以看到AddPacketsToNack()函數(shù)主要實(shí)現(xiàn)了:
nack_list 的最大容量為 kMaxNackPackets = 1000, 如果滿了會(huì)刪除最后一個(gè) KeyFrame 之前的所有nacked 序號(hào), 如果刪除之后還是滿的那么清空 nack_list 并請(qǐng)求KeyFrame。
我們繼續(xù)跟蹤流程,我們看下GetNackBatch函數(shù)實(shí)現(xiàn)
```
std::vector<uint16_t> NackModule::GetNackBatch(NackFilterOptions options) {
? bool consider_seq_num = options != kTimeOnly;
? bool consider_timestamp = options != kSeqNumOnly;
? int64_t now_ms = clock_->TimeInMilliseconds();
? std::vector<uint16_t> nack_batch;
? auto it = nack_list_.begin();
? //LOG(LS_INFO) << "nack_list size[" << nack_list_.size() << "]";
? while (it != nack_list_.end()) {
? ? bool delay_timed_out =
? ? ? ? now_ms - it->second.created_at_time >= kDefaultSendNackDelayMs;
? ? ? ? //只考慮時(shí)間模式
? ? ? ? //當(dāng)前序號(hào)是第一次發(fā)送(本地記錄的send_at_time == -1)
? ? ? ? //當(dāng)前最新收到的包序號(hào)在這個(gè)需要發(fā)送NAKC的序號(hào)的后面(避免當(dāng)前還在收之前沒(méi)收到的包)
//比如當(dāng)前最新收到100, 當(dāng)前檢測(cè)是否需要發(fā)送NACK的序號(hào)為小于等于100的才滿足條件, 比如 99
? ? if (delay_timed_out && consider_seq_num && it->second.sent_at_time == -1 &&
? ? ? ? AheadOrAt(last_seq_num_, it->second.send_at_seq_num)) {
? ? ? nack_batch.emplace_back(it->second.seq_num);
? ? ? ++it->second.retries;
? ? ? it->second.sent_at_time = now_ms;
? ? ? //從nack_list 中取出需要發(fā)送 NACK 的序號(hào)列表, 如果某個(gè) seq 請(qǐng)求次數(shù)超過(guò) kMaxNackRetries = 10次則會(huì)從nack_list 中刪除
? ? ? if (it->second.retries >= kMaxNackRetries) {
? ? ? ? LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
? ? ? ? ? ? ? ? ? ? ? ? << " removed from NACK list due to max retries.";
? ? ? ? //從nack_list_列表中移除
? ? ? ? it = nack_list_.erase(it);
? ? ? } else {
? ? ? ? ++it;
? ? ? }
? ? ? continue;
? ? }
? ? //只考慮時(shí)間模式
? ? //發(fā)送nack的條件變成,該序號(hào)上次發(fā)送NACK的時(shí)間到當(dāng)前時(shí)間要超過(guò)1個(gè)RTT(該序號(hào)一次也沒(méi)發(fā)送過(guò)NACK(send_at_time == -1)也滿足
? ? if (delay_timed_out && consider_timestamp && it->second.sent_at_time + rtt_ms_ <= now_ms) {
? ? ? nack_batch.emplace_back(it->second.seq_num);
? ? ? ++it->second.retries;
? ? ? it->second.sent_at_time = now_ms;
? ? ? if (it->second.retries >= kMaxNackRetries) {
? ? ? ? LOG(LS_WARNING) << "Sequence number " << it->second.seq_num
? ? ? ? ? ? ? ? ? ? ? ? << " removed from NACK list due to max retries.";
? ? ? ? it = nack_list_.erase(it);
? ? ? } else {
? ? ? ? ++it;
? ? ? }
? ? ? continue;
? ? }
? ? ++it;
? }
? return nack_batch;
}
```
從上面GetNackBatch函數(shù)我們可以知道,獲取nack_list存在2種控制邏輯。
# 3.WebRTC中處理NACK請(qǐng)求的實(shí)現(xiàn)
1. 首先是正常的 RTCP 處理流程: RTCPReceiver 中解析處理RTCP, 在 rtcp_receiver.cc中的TriggerCallbacksFromRtcpPacket函數(shù)中處理不同的RTCP消息.
2. 如果nackSequenceNumbers.size大于0,則觸發(fā) RtpRtcp 對(duì)象的ModuleRtpRtcpImpl::OnReceivedNack 處理流程。
我們看下OnReceivedNACK/rtp_rtcp_imp.cc函數(shù)
```
void ModuleRtpRtcpImpl::OnReceivedNACK(
? ? int64_t id, const std::list<uint16_t>& nack_sequence_numbers) {
? ? //將丟包的序號(hào) 記錄到PacketLossStats, 獲取RTT后進(jìn)入 RTPSedner.OnReceivedNack.
? for (uint16_t nack_sequence_number : nack_sequence_numbers) {
? ? send_loss_stats_.AddLostPacket(nack_sequence_number);
? }
? if (!rtp_sender_.StorePackets() ||
? ? ? nack_sequence_numbers.size() == 0) {
? ? return;
? }
? // Use RTT from RtcpRttStats class if provided.
? int64_t rtt = rtt_ms();
? if (rtt == 0) {
? ? rtcp_receiver_.RTT(rtcp_receiver_.RemoteSSRC(), NULL, &rtt, NULL, NULL);
? }
? rtp_sender_.OnReceivedNACK(id, nack_sequence_numbers, rtt);
}
```
我們繼續(xù)跟蹤流程,看下rtp_sender.cc下OnReceivedNACK()函數(shù)
```
void RTPSender::OnReceivedNACK(int64_t id,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? const std::list<uint16_t>& nack_sequence_numbers,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int64_t avg_rtt) {
? TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
? ? ? ? ? ? ? "RTPSender::OnReceivedNACK", "num_seqnum",
? ? ? ? ? ? ? nack_sequence_numbers.size(), "avg_rtt", avg_rtt);
? const int64_t now = clock_->TimeInMilliseconds();
? uint32_t bytes_re_sent = 0;
? uint32_t target_bitrate = GetTargetBitrate();
? ? //比特率限制檢查
? // Enough bandwidth to send NACK?
? if (!ProcessNACKBitRate(now)) {
? ? LOG(LS_INFO) << "NACK bitrate reached. Skip sending NACK response. Target "
? ? ? ? ? ? ? ? << target_bitrate;
? ? return;
? }
? for (std::list<uint16_t>::const_iterator it = nack_sequence_numbers.begin();
? ? ? it != nack_sequence_numbers.end(); ++it) {
? ? const int32_t bytes_sent = ReSendPacket(id, *it, 5 + avg_rtt);
? ? if (bytes_sent > 0) {
? ? ? bytes_re_sent += bytes_sent;
? ? } else if (bytes_sent == 0) {
? ? ? // The packet has previously been resent.
? ? ? // Try resending next packet in the list.
? ? ? continue;
? ? } else {
? ? ? // Failed to send one Sequence number. Give up the rest in this nack.
? ? ? LOG(LS_WARNING) << "Failed resending RTP packet " << *it
? ? ? ? ? ? ? ? ? ? ? << ", Discard rest of packets";
? ? ? break;
? ? }
? ? // Delay bandwidth estimate (RTT * BW).
? ? if (target_bitrate != 0 && avg_rtt) {
? ? ? // kbits/s * ms = bits => bits/8 = bytes
? ? ? size_t target_bytes =
? ? ? ? ? (static_cast<size_t>(target_bitrate / 1000) * avg_rtt) >> 3;
? ? ? if (bytes_re_sent > target_bytes) {
? ? ? ? break;? // Ignore the rest of the packets in the list.
? ? ? }
? ? }
? }
? if (bytes_re_sent > 0) {
? ? UpdateNACKBitRate(bytes_re_sent, now);
? }
}
```
我們繼續(xù)看下ReSendPacket()函數(shù)重新發(fā)送數(shù)據(jù)的實(shí)現(xiàn)
```
int32_t RTPSender::ReSendPacket(int64_t id, uint16_t packet_id, int64_t min_resend_time) {
? size_t length = IP_PACKET_SIZE;
? uint8_t data_buffer[IP_PACKET_SIZE];
? int64_t capture_time_ms;
//從緩存包中去獲取數(shù)據(jù)包
? if (!packet_history_.GetPacketAndSetSendTime(packet_id, min_resend_time, true,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data_buffer, &length,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? &capture_time_ms)) {
? ? // Packet not found.
? ? LOG(LS_INFO) << "ReSendPacket not found.seq[" << packet_id << "].";
? ? return 0;
? }
//如果開(kāi)啟平滑發(fā)送的話
? if (paced_sender_) {
? ? RtpUtility::RtpHeaderParser rtp_parser(data_buffer, length);
? ? RTPHeader header;
? ? if (!rtp_parser.Parse(&header)) {
? ? ? assert(false);
? ? ? return -1;
? ? }
? ? // Convert from TickTime to Clock since capture_time_ms is based on
? ? // TickTime.
? ? int64_t corrected_capture_tims_ms = capture_time_ms + clock_delta_ms_;
? ? paced_sender_->InsertPacket(
? ? ? ? id, RtpPacketSender::kNormalPriority, header.ssrc, header.sequenceNumber,
? ? ? ? corrected_capture_tims_ms, length - header.headerLength, true);
? ? return length;
? }
? int rtx = kRtxOff;
? {
? ? rtc::CritScope lock(&send_critsect_);
? ? rtx = rtx_;
? }
? //重新發(fā)送數(shù)據(jù)
? if (!PrepareAndSendPacket(id, data_buffer, length, capture_time_ms,
? ? ? ? ? ? ? ? ? ? ? ? ? ? (rtx & kRtxRetransmitted) > 0, true)) {
? ? return -1;
? }
? return static_cast<int32_t>(length);
}
```
3.通過(guò)上面我們可以知道,RTPSender中完成在PacketHistory中查找需要發(fā)送的RTP seq, 并決定重發(fā)時(shí)間. 重發(fā)也需要經(jīng)過(guò)重發(fā)的比特率限制的檢查. RTPSedner 初始化話時(shí)可以配置是否使用(PacedSend, 均勻發(fā)送), 最后檢查重發(fā)格式(RtxStatus() 可以獲取是否使用 RTX 封裝)后使用 RTPSedner::PrepareAndSendPacket進(jìn)行立即重發(fā). 如果是使用 PacedSend, 則使用 PacedSender::InsertPacket 先加入發(fā)送列表中, 它的process會(huì)定時(shí)處理發(fā)送任務(wù).