WebRTC-nack機(jī)制詳解

# 1.NACK的含義

丟包重傳(NACK)是抵抗網(wǎng)絡(luò)錯(cuò)誤的重要手段。NACK在接收端檢測(cè)到數(shù)據(jù)丟包后,發(fā)送NACK報(bào)文到發(fā)送端;發(fā)送端根據(jù)NACK報(bào)文中的序列號(hào),在發(fā)送緩沖區(qū)找到對(duì)應(yīng)的數(shù)據(jù)包,重新發(fā)送到接收端。NACK需要發(fā)送端,發(fā)送緩沖區(qū)的支持。

WebRTC中支持音頻和視頻的NACK重傳。我們這里只分析nack機(jī)制,不分析jitterbuffer或者neteq的更多實(shí)現(xiàn)。

# 2.WebRTC中NACK請(qǐng)求發(fā)送的條件

這里以視頻為例。

下面是webrtc中接收端觸發(fā)nack的條件,我們看下nack_module.cc文件中OnReceivedPacket的實(shí)現(xiàn)。

```

void NackModule::OnReceivedPacket(const VCMPacket& packet) {

? rtc::CritScope lock(&crit_);

? if (!running_)

? ? return;

? //獲取包的seqnum

? uint16_t seq_num = packet.seqNum;

? // TODO(philipel): When the packet includes information whether it is

? //? ? ? ? ? ? ? ? retransmitted or not, use that value instead. For

? //? ? ? ? ? ? ? ? now set it to true, which will cause the reordering

? //? ? ? ? ? ? ? ? statistics to never be updated.

? bool is_retransmitted = true;

? //判斷第一幀是不是關(guān)鍵幀

? bool is_keyframe = packet.isFirstPacket && packet.frameType == kVideoFrameKey;

//拿到第一個(gè)包的時(shí)候判斷,把第一個(gè)包的seqnum賦值給最新的last_seq_num,如果是關(guān)鍵幀的話,插入到關(guān)鍵幀列表中,同時(shí)把initialized_設(shè)置為true

? if (!initialized_) {

? ? last_seq_num_ = seq_num;

? ? if (is_keyframe)

? ? ? keyframe_list_.insert(seq_num);

? ? initialized_ = true;

? ? return;

? }

? if (seq_num == last_seq_num_)

? ? return;

//判斷有無(wú)亂序,亂序了,如來(lái)1,2,3,6包,然后來(lái)4包,就亂序了,就把4從nack_list中去掉,不再通知發(fā)送端重新發(fā)送4了

? if (AheadOf(last_seq_num_, seq_num)) {

? ? // An out of order packet has been received.

? ? //把重新收到的包從nack_list中移除掉

? ? nack_list_.erase(seq_num);

? ? if (!is_retransmitted)

? ? ? UpdateReorderingStatistics(seq_num);

? ? return;

? } else {

? //沒(méi)有亂序,如1,2,3,6包,就把(3+1,6)之間的包加入到nack_list中

? ? AddPacketsToNack(last_seq_num_ + 1, seq_num);

? ? last_seq_num_ = seq_num;

? ? // Keep track of new keyframes.

? ? if (is_keyframe)

? ? ? keyframe_list_.insert(seq_num);

? ? // And remove old ones so we don't accumulate keyframes.

? ? auto it = keyframe_list_.lower_bound(seq_num - kMaxPacketAge);

? ? if (it != keyframe_list_.begin())

? ? ? keyframe_list_.erase(keyframe_list_.begin(), it);

? ? // Are there any nacks that are waiting for this seq_num.

? ? //從nack_list 中取出需要發(fā)送 NACK 的序號(hào)列表, 如果某個(gè) seq 請(qǐng)求次數(shù)超過(guò) kMaxNackRetries = 10次則會(huì)從nack_list 中刪除.

? ? std::vector<uint16_t> nack_batch = GetNackBatch(kSeqNumOnly);

? ? //LOG(LS_INFO) << "nack_batch size[" << nack_batch.size() << "].";

? ? //在 NackModule 中觸發(fā)使用 NackSender::SednNack 發(fā)送 NACK 請(qǐng)求

? ? if (!nack_batch.empty())

? ? ? nack_sender_->SendNack(nack_batch);

? }

}

```

我們繼續(xù)跟蹤流程看下AddPacketsToNack函數(shù)的實(shí)現(xiàn)

```

void NackModule::AddPacketsToNack(uint16_t seq_num_start,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? uint16_t seq_num_end) {

? //LOG(LS_INFO) << "AddPacketsToNack. "

? //? ? ? ? ? ? << "start seq[" << seq_num_start

? //? ? ? ? ? ? << "],end seq[" << nack_list_.lower_bound(seq_num_end - kMaxPacketAge);

? nack_list_.erase(nack_list_.begin(), it);

? // If the nack list is too large, remove packets from the nack list until

? // the latest first packet of a keyframe. If the list is still too large,

? // clear it and request a keyframe.

? uint16_t num_new_nacks = ForwardDiff(seq_num_start, seq_num_end);

? if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {

? ? while (RemovePacketsUntilKeyFrame() &&

? ? ? ? ? nack_list_.size() + num_new_nacks > kMaxNackPackets) {

? ? }

? ? if (nack_list_.size() + num_new_nacks > kMaxNackPackets) {

? ? ? nack_list_.clear();

? ? ? LOG(LS_WARNING) << "NACK list full, clearing NACK"

? ? ? ? ? ? ? ? ? ? ? ? " list and requesting keyframe.";

? ? //觸發(fā)關(guān)鍵幀請(qǐng)求

? ? ? keyframe_request_sender_->RequestKeyFrameNack();

? ? ? return;

? ? }

? }

? for (uint16_t seq_num = seq_num_start; seq_num != seq_num_end; ++seq_num) {

? ? NackInfo nack_info(seq_num, seq_num + WaitNumberOfPackets(0.5),

? ? ? ? ? ? ? ? ? ? ? clock_->TimeInMilliseconds());

? ? RTC_DCHECK(nack_list_.find(seq_num) == nack_list_.end());

? ? nack_list_[seq_num] = nack_info;

? }

? //LOG(LS_INFO) << "nack_list size[" << nack_list_.size() << "]";

}

```

我們可以看到AddPacketsToNack()函數(shù)主要實(shí)現(xiàn)了:

nack_list 的最大容量為 kMaxNackPackets = 1000, 如果滿了會(huì)刪除最后一個(gè) KeyFrame 之前的所有nacked 序號(hào), 如果刪除之后還是滿的那么清空 nack_list 并請(qǐng)求KeyFrame。

我們繼續(xù)跟蹤流程,我們看下GetNackBatch函數(shù)實(shí)現(xiàn)

```

std::vector<uint16_t> NackModule::GetNackBatch(NackFilterOptions options) {

? bool consider_seq_num = options != kTimeOnly;

? bool consider_timestamp = options != kSeqNumOnly;

? int64_t now_ms = clock_->TimeInMilliseconds();

? std::vector<uint16_t> nack_batch;

? auto it = nack_list_.begin();

? //LOG(LS_INFO) << "nack_list size[" << nack_list_.size() << "]";

? while (it != nack_list_.end()) {

? ? bool delay_timed_out =

? ? ? ? now_ms - it->second.created_at_time >= kDefaultSendNackDelayMs;

? ? ? ? //只考慮時(shí)間模式

? ? ? ? //當(dāng)前序號(hào)是第一次發(fā)送(本地記錄的send_at_time == -1)

? ? ? ? //當(dāng)前最新收到的包序號(hào)在這個(gè)需要發(fā)送NAKC的序號(hào)的后面(避免當(dāng)前還在收之前沒(méi)收到的包)

//比如當(dāng)前最新收到100, 當(dāng)前檢測(cè)是否需要發(fā)送NACK的序號(hào)為小于等于100的才滿足條件, 比如 99

? ? if (delay_timed_out && consider_seq_num && it->second.sent_at_time == -1 &&

? ? ? ? AheadOrAt(last_seq_num_, it->second.send_at_seq_num)) {

? ? ? nack_batch.emplace_back(it->second.seq_num);

? ? ? ++it->second.retries;

? ? ? it->second.sent_at_time = now_ms;

? ? ? //從nack_list 中取出需要發(fā)送 NACK 的序號(hào)列表, 如果某個(gè) seq 請(qǐng)求次數(shù)超過(guò) kMaxNackRetries = 10次則會(huì)從nack_list 中刪除

? ? ? if (it->second.retries >= kMaxNackRetries) {

? ? ? ? LOG(LS_WARNING) << "Sequence number " << it->second.seq_num

? ? ? ? ? ? ? ? ? ? ? ? << " removed from NACK list due to max retries.";

? ? ? ? //從nack_list_列表中移除

? ? ? ? it = nack_list_.erase(it);

? ? ? } else {

? ? ? ? ++it;

? ? ? }

? ? ? continue;

? ? }

? ? //只考慮時(shí)間模式

? ? //發(fā)送nack的條件變成,該序號(hào)上次發(fā)送NACK的時(shí)間到當(dāng)前時(shí)間要超過(guò)1個(gè)RTT(該序號(hào)一次也沒(méi)發(fā)送過(guò)NACK(send_at_time == -1)也滿足

? ? if (delay_timed_out && consider_timestamp && it->second.sent_at_time + rtt_ms_ <= now_ms) {

? ? ? nack_batch.emplace_back(it->second.seq_num);

? ? ? ++it->second.retries;

? ? ? it->second.sent_at_time = now_ms;

? ? ? if (it->second.retries >= kMaxNackRetries) {

? ? ? ? LOG(LS_WARNING) << "Sequence number " << it->second.seq_num

? ? ? ? ? ? ? ? ? ? ? ? << " removed from NACK list due to max retries.";

? ? ? ? it = nack_list_.erase(it);

? ? ? } else {

? ? ? ? ++it;

? ? ? }

? ? ? continue;

? ? }

? ? ++it;

? }

? return nack_batch;

}

```

從上面GetNackBatch函數(shù)我們可以知道,獲取nack_list存在2種控制邏輯。

# 3.WebRTC中處理NACK請(qǐng)求的實(shí)現(xiàn)

1. 首先是正常的 RTCP 處理流程: RTCPReceiver 中解析處理RTCP, 在 rtcp_receiver.cc中的TriggerCallbacksFromRtcpPacket函數(shù)中處理不同的RTCP消息.

2. 如果nackSequenceNumbers.size大于0,則觸發(fā) RtpRtcp 對(duì)象的ModuleRtpRtcpImpl::OnReceivedNack 處理流程。

我們看下OnReceivedNACK/rtp_rtcp_imp.cc函數(shù)

```

void ModuleRtpRtcpImpl::OnReceivedNACK(

? ? int64_t id, const std::list<uint16_t>& nack_sequence_numbers) {

? ? //將丟包的序號(hào) 記錄到PacketLossStats, 獲取RTT后進(jìn)入 RTPSedner.OnReceivedNack.

? for (uint16_t nack_sequence_number : nack_sequence_numbers) {

? ? send_loss_stats_.AddLostPacket(nack_sequence_number);

? }

? if (!rtp_sender_.StorePackets() ||

? ? ? nack_sequence_numbers.size() == 0) {

? ? return;

? }

? // Use RTT from RtcpRttStats class if provided.

? int64_t rtt = rtt_ms();

? if (rtt == 0) {

? ? rtcp_receiver_.RTT(rtcp_receiver_.RemoteSSRC(), NULL, &rtt, NULL, NULL);

? }

? rtp_sender_.OnReceivedNACK(id, nack_sequence_numbers, rtt);

}

```

我們繼續(xù)跟蹤流程,看下rtp_sender.cc下OnReceivedNACK()函數(shù)

```

void RTPSender::OnReceivedNACK(int64_t id,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? const std::list<uint16_t>& nack_sequence_numbers,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int64_t avg_rtt) {

? TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),

? ? ? ? ? ? ? "RTPSender::OnReceivedNACK", "num_seqnum",

? ? ? ? ? ? ? nack_sequence_numbers.size(), "avg_rtt", avg_rtt);

? const int64_t now = clock_->TimeInMilliseconds();

? uint32_t bytes_re_sent = 0;

? uint32_t target_bitrate = GetTargetBitrate();

? ? //比特率限制檢查

? // Enough bandwidth to send NACK?

? if (!ProcessNACKBitRate(now)) {

? ? LOG(LS_INFO) << "NACK bitrate reached. Skip sending NACK response. Target "

? ? ? ? ? ? ? ? << target_bitrate;

? ? return;

? }

? for (std::list<uint16_t>::const_iterator it = nack_sequence_numbers.begin();

? ? ? it != nack_sequence_numbers.end(); ++it) {

? ? const int32_t bytes_sent = ReSendPacket(id, *it, 5 + avg_rtt);

? ? if (bytes_sent > 0) {

? ? ? bytes_re_sent += bytes_sent;

? ? } else if (bytes_sent == 0) {

? ? ? // The packet has previously been resent.

? ? ? // Try resending next packet in the list.

? ? ? continue;

? ? } else {

? ? ? // Failed to send one Sequence number. Give up the rest in this nack.

? ? ? LOG(LS_WARNING) << "Failed resending RTP packet " << *it

? ? ? ? ? ? ? ? ? ? ? << ", Discard rest of packets";

? ? ? break;

? ? }

? ? // Delay bandwidth estimate (RTT * BW).

? ? if (target_bitrate != 0 && avg_rtt) {

? ? ? // kbits/s * ms = bits => bits/8 = bytes

? ? ? size_t target_bytes =

? ? ? ? ? (static_cast<size_t>(target_bitrate / 1000) * avg_rtt) >> 3;

? ? ? if (bytes_re_sent > target_bytes) {

? ? ? ? break;? // Ignore the rest of the packets in the list.

? ? ? }

? ? }

? }

? if (bytes_re_sent > 0) {

? ? UpdateNACKBitRate(bytes_re_sent, now);

? }

}

```

我們繼續(xù)看下ReSendPacket()函數(shù)重新發(fā)送數(shù)據(jù)的實(shí)現(xiàn)

```

int32_t RTPSender::ReSendPacket(int64_t id, uint16_t packet_id, int64_t min_resend_time) {

? size_t length = IP_PACKET_SIZE;

? uint8_t data_buffer[IP_PACKET_SIZE];

? int64_t capture_time_ms;

//從緩存包中去獲取數(shù)據(jù)包

? if (!packet_history_.GetPacketAndSetSendTime(packet_id, min_resend_time, true,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? data_buffer, &length,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? &capture_time_ms)) {

? ? // Packet not found.

? ? LOG(LS_INFO) << "ReSendPacket not found.seq[" << packet_id << "].";

? ? return 0;

? }

//如果開(kāi)啟平滑發(fā)送的話

? if (paced_sender_) {

? ? RtpUtility::RtpHeaderParser rtp_parser(data_buffer, length);

? ? RTPHeader header;

? ? if (!rtp_parser.Parse(&header)) {

? ? ? assert(false);

? ? ? return -1;

? ? }

? ? // Convert from TickTime to Clock since capture_time_ms is based on

? ? // TickTime.

? ? int64_t corrected_capture_tims_ms = capture_time_ms + clock_delta_ms_;

? ? paced_sender_->InsertPacket(

? ? ? ? id, RtpPacketSender::kNormalPriority, header.ssrc, header.sequenceNumber,

? ? ? ? corrected_capture_tims_ms, length - header.headerLength, true);

? ? return length;

? }

? int rtx = kRtxOff;

? {

? ? rtc::CritScope lock(&send_critsect_);

? ? rtx = rtx_;

? }

? //重新發(fā)送數(shù)據(jù)

? if (!PrepareAndSendPacket(id, data_buffer, length, capture_time_ms,

? ? ? ? ? ? ? ? ? ? ? ? ? ? (rtx & kRtxRetransmitted) > 0, true)) {

? ? return -1;

? }

? return static_cast<int32_t>(length);

}

```

3.通過(guò)上面我們可以知道,RTPSender中完成在PacketHistory中查找需要發(fā)送的RTP seq, 并決定重發(fā)時(shí)間. 重發(fā)也需要經(jīng)過(guò)重發(fā)的比特率限制的檢查. RTPSedner 初始化話時(shí)可以配置是否使用(PacedSend, 均勻發(fā)送), 最后檢查重發(fā)格式(RtxStatus() 可以獲取是否使用 RTX 封裝)后使用 RTPSedner::PrepareAndSendPacket進(jìn)行立即重發(fā). 如果是使用 PacedSend, 則使用 PacedSender::InsertPacket 先加入發(fā)送列表中, 它的process會(huì)定時(shí)處理發(fā)送任務(wù).

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容