SQP擁塞控制算法實現(xiàn)和對比實驗

SQP擁塞控制算法介紹

一種為低延遲強交互視頻流設計的擁塞控制算法, google paper:https://arxiv.org/pdf/2207.11857.pdf

在交互視頻應用場景,需要傳輸高碼率的視頻率,并且保持極低的端到端時延,比如AR流和云游戲,SQP就是為這種場景而設計的擁塞控制算法。SQP采用基于幀的整形數(shù)據包來采樣網絡帶寬,并用自適應的單向采樣時延測量方法從網絡隊列中恢復,從而維持極低的隊列時延。SQP快速適應網絡帶寬變化,確保高帶寬利用率和低幀時延,并且當存在競爭流時也能在保證可接受的時延內擁有合理的帶寬份額。SQP有不錯的公平性,網絡上有影子緩存時也工作得挺好。

低延時交互流媒體應用生成固定幀率的裸幀。視頻碼率由ABR(Adaptive bitrate algorithm 碼率自適應算法)算法決定,而ABR算法又是由CCA(Congestion control algorithm,擁塞控制算法)產生的信號來決定, 從而管理幀時延、網絡擁塞、帶寬利用率。壓縮的幀通過網絡傳輸,然后在客戶端設備上解碼和顯示。

低時延視頻流架構:


低時延視頻流

SQP內部架構:


SQP內部框架

我是為低時延交互流媒體應用設計的擁塞控制算法, 比如云游戲,云VR等應用。 這一套擁塞控制算法的目標是: 1. 提供實時的帶寬估計,以確保盡可能大的帶寬利用率和盡可能低的端到端幀時延;2. 與其它基于網絡列隊的擁塞控制算法共存時,也可以提供有競爭力的吞吐量。

SQP算法實現(xiàn)

Feeback: 包級別的網絡反饋 + 幀級別的網絡反饋

#include "sqp_feedback_adaptor.h"

#include <algorithm>
#include <cstdlib>

#include "common/logger.h"
#include "common_time.h"
namespace BCC {
#define HISTORY_CACHE_MS 60000
#define SRTT_FACTOR 0.6

SQPFeedbackAdaptor::SQPFeedbackAdaptor() {
    int i;

    for (i = 0; i < FEEDBACK_RTT_WIN_SIZE; ++i) {
        rtts_[i] = -1;
    }

    min_feedback_rtt_ = 10;
    num_ = 0;
    index_ = 0;
    hist_ = new SQPSenderHistory(HISTORY_CACHE_MS);
}

SQPFeedbackAdaptor::~SQPFeedbackAdaptor() {
    if (hist_) {
        delete hist_;
        hist_ = nullptr;
    }

    for (auto const& pair : frame_map_) {
        delete pair.second;
    }
    frame_map_.clear();
}

void SQPFeedbackAdaptor::AddPacket(uint16_t seq, size_t size) {
    PacketFeedBackItem packet;
    packet.arrival_ts = -1;
    packet.create_ts = packet.send_ts = GET_SYS_MS();
    packet.payload_size = size;
    packet.sequence_number = seq;
    hist_->Add(&packet);
}

void SQPFeedbackAdaptor::AddFrame(uint32_t frame_idx, size_t packet_size, bool first_packet, bool last_packet) {
    FrameFeedBackItem* frame_ptr;
    int64_t now_ts = GET_SYS_MS();
    if (frame_map_.find(frame_idx) != frame_map_.end()) {
        frame_ptr = frame_map_[frame_idx];
    }
    else {
        frame_ptr = new FrameFeedBackItem();
        frame_ptr->frame_index = frame_idx;
        frame_map_[frame_idx] = frame_ptr;
    }

    frame_ptr->frame_size += packet_size;
    if (first_packet) {
        frame_ptr->send_start_ts = now_ts;
    } 

    if (last_packet) {
        frame_ptr->send_end_ts = now_ts;
        hist_->AddFrame(frame_ptr);
        delete frame_ptr;
        frame_map_.erase(frame_idx);
    }
}

int SQPFeedbackAdaptor::OnFeedback(BCC_SQP::FeedBackMsgItem* msg) {
    int32_t i = 0, feedback_rtt = 0;
    int64_t now_ts = GET_SYS_MS();
    int64_t delta_ts = 0;
    feedback_rtt = -1;
    num_ = 0;

    for (i = 0; i < msg->samples_num; i++) {
        //根據反饋的SEQ獲取對應的報文發(fā)送信息,計算反饋RTT,更新報文到達時刻
        if (hist_->Get(msg->samples[i].seq, &packets_[num_], 1) == 0) {
            //計算反饋RTT
            if (packets_[num_].send_ts > 0) {
                feedback_rtt = (std::max)(now_ts - packets_[num_].send_ts, (int64_t)feedback_rtt);
                rtts_[index_++ % FEEDBACK_RTT_WIN_SIZE] = feedback_rtt;
                srtt_ = SRTT_FACTOR * srtt_ + (1 - SRTT_FACTOR) * feedback_rtt;
            }

            //更新到達的值
            packets_[num_].arrival_ts = msg->samples[i].ts;
            delta_ts = packets_[num_].arrival_ts - packets_[num_].send_ts;
            if (new_min_one_way_delay_ == 0 || new_min_one_way_delay_ > delta_ts) {
                new_min_one_way_delay_ = delta_ts;
            }
            num_++;

            //更新時間窗內的最小one way delay的值
            if (now_ts - last_one_way_delay_update_ts_ > srtt_ * 2) {
                last_one_way_delay_update_ts_ = now_ts;
                min_one_way_delay_ = new_min_one_way_delay_;
                new_min_one_way_delay_ = 0;
            }

        }
    }

    frame_num_ = 0;
    for (i = 0; i < msg->frame_samples_num; i++) {
        //根據反饋的SEQ獲取對應的報文發(fā)送信息,計算反饋RTT,更新報文到達時刻
        if (hist_->GetFrame(msg->frame_samples[i].frame_index, &frames_[frame_num_], 1) == 0) {
            //更新到達的值
            frames_[frame_num_].arrival_start_ts = msg->frame_samples[i].arrival_start_ts;
            frames_[frame_num_].arrival_end_ts = msg->frame_samples[i].arrival_end_ts;

            if (max_frame_size_ < frames_[frame_num_].frame_size) {
                max_frame_size_ = frames_[frame_num_].frame_size;
            }
            frame_num_++;
        }
    }

    //更新報文與反饋的rtt最小值
    if (feedback_rtt > 0) {
        min_feedback_rtt_ = rtts_[0];

        for (i = 1; i < FEEDBACK_RTT_WIN_SIZE; i++) {
            if (min_feedback_rtt_ > rtts_[i] && rtts_[i] > 0) {
                min_feedback_rtt_ = rtts_[i];
                LOGD("[bcc][feedback] min feed back rtt update {}", min_feedback_rtt_);
            }
        }
    }

    //進行按到達時間的先后順序進行排序
    FeedbackQsort();
    FrameFeedbackQsort();
    return num_;
}
}  // namespace BCC

帶寬估計和發(fā)送速率控制:

#include "sqp_congestion_control.h"
#include "common/logger.h"

namespace BCC {
        SQPCongestionControl::SQPCongestionControl(uint32_t min_bitrate, uint32_t max_bitrate) {
                min_bitrate_ = min_bitrate;
                max_bitrate_ = max_bitrate;
                bandwidth_ = min_bitrate_;
        }
    SQPCongestionControl::~SQPCongestionControl() {

        }

        void SQPCongestionControl::UpdateBandwidthEstimator(uint32_t frame_size, uint32_t max_frame_size, int64_t frame_send_start_ts,
                int64_t frame_send_end_ts, int64_t frame_recv_start_ts, int64_t frame_recv_end_ts, int32_t one_way_delay) {

                double bandwidth_sample = 0.0f;
                double gama = ((double)max_frame_size) / frame_size;
                if (frame_recv_end_ts == frame_send_start_ts && frame_recv_end_ts == frame_recv_start_ts) {
                        return;
                }
                bandwidth_sample = frame_size * 8 * gama * 1000 / (frame_recv_end_ts - frame_send_start_ts - one_way_delay + (frame_recv_end_ts - frame_recv_start_ts) * (gama - 1));
                bandwidth_sample *= T_;
                if (bandwidth_ <= 0) {
                        bandwidth_ = bandwidth_sample;
                }
                else {
                        bandwidth_ = bandwidth_ + delta_ * (r_ * (bandwidth_sample / bandwidth_ - 1) - (bandwidth_ / bandwidth_sample - 1));
                }

        
        }

        uint32_t SQPCongestionControl::ComputePacingRate() {
                return (uint32_t)(bandwidth_ * m_);
                return 0;
        }

}; 

對比實驗:

  1. 采用panthoen實驗平臺

    1. 安裝panthoen: 以下三個主要組件

    2. Local 和 remote兩種網絡模式

  2. 編譯llama-sqp生成sender和receiver, 拷貝到panthoen/third_party/llama_sqp下

修改配置:

  • 生成src/wrappers/llama_sqp.py:
#!/usr/bin/env python

'''REMOVE ME: Example file to add a new congestion control scheme.

Use Python 2.7 and conform to PEP8.
Use snake_case as file name and make this file executable.
'''

from os import path
from subprocess import check_call

import arg_parser
import context

def main():
    # use 'arg_parser' to ensure a common test interface
    args = arg_parser.receiver_first()  # or 'arg_parser.sender_first()'

    # paths to the sender and receiver executables, etc.
    cc_repo = path.join(context.third_party_dir, 'llama_sqp')
    send_src = path.join(cc_repo, 'transport_sender')
    recv_src = path.join(cc_repo, 'transport_receiver')

    # [optional] dependencies of Debian packages
    if args.option == 'deps':
        print 'example_dep_1 example_dep_2'
        return

    # [optional] persistent setup that only needs to be run once
    if args.option == 'setup':
        # avoid running as root here
        return

    # [optional] non-persistent setup that should be performed on every reboot
    if args.option == 'setup_after_reboot':
        # avoid running as root here
        return

    # [required] run the first side on port 'args.port'
    if args.option == 'receiver':
        cmd = [recv_src, args.port]
        check_call(cmd)
        return

    # [required] run the other side to connect to the first side on 'args.ip'
    if args.option == 'sender':
        cmd = [send_src, args.ip, args.port]
        check_call(cmd)
        return

if __name__ == '__main__':
    main()
  • 配置src/config.yml添加llama_sqp

  • 運行實驗:

src/experiments/setup.py --install-deps --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/setup.py --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/test.py local --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/analysis/analyze.py --data-dir src/experiments/data/

實驗報告:

  1. 本地網絡實驗:


    算法對比

    算法對比

Llama-SQP時延最低,但吞吐量只有72.3%,遠沒有利用好帶寬, 待改進。

以下是詳細的實驗數(shù)據:


SQP詳細實驗數(shù)據
  1. 遠程網絡實驗
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • TCP通過維護一個擁塞窗口來進行擁塞控制,擁塞控制的原則是,只要網絡中沒有出現(xiàn)擁塞,擁塞窗口的值就可以再增大一些,...
    風亡小窩閱讀 2,062評論 0 1
  • 擁塞控制算法分類 基于丟包(loss rate)的擁塞控制算法例如TCP中早期的擁塞控制算法Reno, 會帶來較高...
    myxu_bin閱讀 3,536評論 1 1
  • 一、TCP Reno擁塞控制算法回顧 二、基于TCP Reno擁塞控制算法的改進 改進原因分析TCP Reno 提...
    風亡小窩閱讀 2,764評論 0 0
  • 網絡傳輸問題本質上是對網絡資源的共享和復用問題,因此擁塞控制是網絡工程領域的核心問題之一,并且隨著互聯(lián)網和數(shù)據中心...
    DeepNoMind閱讀 892評論 0 0
  • TCP擁塞控制算法的目的可以簡單概括為:公平競爭、充分利用網絡帶寬、降低網絡延時、優(yōu)化用戶體驗,然而就目前而言要實...
    VictorHong閱讀 2,682評論 0 0

友情鏈接更多精彩內容