SQP擁塞控制算法介紹
一種為低延遲強交互視頻流設計的擁塞控制算法, google paper:https://arxiv.org/pdf/2207.11857.pdf
在交互視頻應用場景,需要傳輸高碼率的視頻率,并且保持極低的端到端時延,比如AR流和云游戲,SQP就是為這種場景而設計的擁塞控制算法。SQP采用基于幀的整形數(shù)據包來采樣網絡帶寬,并用自適應的單向采樣時延測量方法從網絡隊列中恢復,從而維持極低的隊列時延。SQP快速適應網絡帶寬變化,確保高帶寬利用率和低幀時延,并且當存在競爭流時也能在保證可接受的時延內擁有合理的帶寬份額。SQP有不錯的公平性,網絡上有影子緩存時也工作得挺好。
低延時交互流媒體應用生成固定幀率的裸幀。視頻碼率由ABR(Adaptive bitrate algorithm 碼率自適應算法)算法決定,而ABR算法又是由CCA(Congestion control algorithm,擁塞控制算法)產生的信號來決定, 從而管理幀時延、網絡擁塞、帶寬利用率。壓縮的幀通過網絡傳輸,然后在客戶端設備上解碼和顯示。
低時延視頻流架構:

SQP內部架構:

我是為低時延交互流媒體應用設計的擁塞控制算法, 比如云游戲,云VR等應用。 這一套擁塞控制算法的目標是: 1. 提供實時的帶寬估計,以確保盡可能大的帶寬利用率和盡可能低的端到端幀時延;2. 與其它基于網絡列隊的擁塞控制算法共存時,也可以提供有競爭力的吞吐量。
SQP算法實現(xiàn)
Feeback: 包級別的網絡反饋 + 幀級別的網絡反饋
#include "sqp_feedback_adaptor.h"
#include <algorithm>
#include <cstdlib>
#include "common/logger.h"
#include "common_time.h"
namespace BCC {
#define HISTORY_CACHE_MS 60000
#define SRTT_FACTOR 0.6
SQPFeedbackAdaptor::SQPFeedbackAdaptor() {
int i;
for (i = 0; i < FEEDBACK_RTT_WIN_SIZE; ++i) {
rtts_[i] = -1;
}
min_feedback_rtt_ = 10;
num_ = 0;
index_ = 0;
hist_ = new SQPSenderHistory(HISTORY_CACHE_MS);
}
SQPFeedbackAdaptor::~SQPFeedbackAdaptor() {
if (hist_) {
delete hist_;
hist_ = nullptr;
}
for (auto const& pair : frame_map_) {
delete pair.second;
}
frame_map_.clear();
}
void SQPFeedbackAdaptor::AddPacket(uint16_t seq, size_t size) {
PacketFeedBackItem packet;
packet.arrival_ts = -1;
packet.create_ts = packet.send_ts = GET_SYS_MS();
packet.payload_size = size;
packet.sequence_number = seq;
hist_->Add(&packet);
}
void SQPFeedbackAdaptor::AddFrame(uint32_t frame_idx, size_t packet_size, bool first_packet, bool last_packet) {
FrameFeedBackItem* frame_ptr;
int64_t now_ts = GET_SYS_MS();
if (frame_map_.find(frame_idx) != frame_map_.end()) {
frame_ptr = frame_map_[frame_idx];
}
else {
frame_ptr = new FrameFeedBackItem();
frame_ptr->frame_index = frame_idx;
frame_map_[frame_idx] = frame_ptr;
}
frame_ptr->frame_size += packet_size;
if (first_packet) {
frame_ptr->send_start_ts = now_ts;
}
if (last_packet) {
frame_ptr->send_end_ts = now_ts;
hist_->AddFrame(frame_ptr);
delete frame_ptr;
frame_map_.erase(frame_idx);
}
}
int SQPFeedbackAdaptor::OnFeedback(BCC_SQP::FeedBackMsgItem* msg) {
int32_t i = 0, feedback_rtt = 0;
int64_t now_ts = GET_SYS_MS();
int64_t delta_ts = 0;
feedback_rtt = -1;
num_ = 0;
for (i = 0; i < msg->samples_num; i++) {
//根據反饋的SEQ獲取對應的報文發(fā)送信息,計算反饋RTT,更新報文到達時刻
if (hist_->Get(msg->samples[i].seq, &packets_[num_], 1) == 0) {
//計算反饋RTT
if (packets_[num_].send_ts > 0) {
feedback_rtt = (std::max)(now_ts - packets_[num_].send_ts, (int64_t)feedback_rtt);
rtts_[index_++ % FEEDBACK_RTT_WIN_SIZE] = feedback_rtt;
srtt_ = SRTT_FACTOR * srtt_ + (1 - SRTT_FACTOR) * feedback_rtt;
}
//更新到達的值
packets_[num_].arrival_ts = msg->samples[i].ts;
delta_ts = packets_[num_].arrival_ts - packets_[num_].send_ts;
if (new_min_one_way_delay_ == 0 || new_min_one_way_delay_ > delta_ts) {
new_min_one_way_delay_ = delta_ts;
}
num_++;
//更新時間窗內的最小one way delay的值
if (now_ts - last_one_way_delay_update_ts_ > srtt_ * 2) {
last_one_way_delay_update_ts_ = now_ts;
min_one_way_delay_ = new_min_one_way_delay_;
new_min_one_way_delay_ = 0;
}
}
}
frame_num_ = 0;
for (i = 0; i < msg->frame_samples_num; i++) {
//根據反饋的SEQ獲取對應的報文發(fā)送信息,計算反饋RTT,更新報文到達時刻
if (hist_->GetFrame(msg->frame_samples[i].frame_index, &frames_[frame_num_], 1) == 0) {
//更新到達的值
frames_[frame_num_].arrival_start_ts = msg->frame_samples[i].arrival_start_ts;
frames_[frame_num_].arrival_end_ts = msg->frame_samples[i].arrival_end_ts;
if (max_frame_size_ < frames_[frame_num_].frame_size) {
max_frame_size_ = frames_[frame_num_].frame_size;
}
frame_num_++;
}
}
//更新報文與反饋的rtt最小值
if (feedback_rtt > 0) {
min_feedback_rtt_ = rtts_[0];
for (i = 1; i < FEEDBACK_RTT_WIN_SIZE; i++) {
if (min_feedback_rtt_ > rtts_[i] && rtts_[i] > 0) {
min_feedback_rtt_ = rtts_[i];
LOGD("[bcc][feedback] min feed back rtt update {}", min_feedback_rtt_);
}
}
}
//進行按到達時間的先后順序進行排序
FeedbackQsort();
FrameFeedbackQsort();
return num_;
}
} // namespace BCC
帶寬估計和發(fā)送速率控制:
#include "sqp_congestion_control.h"
#include "common/logger.h"
namespace BCC {
SQPCongestionControl::SQPCongestionControl(uint32_t min_bitrate, uint32_t max_bitrate) {
min_bitrate_ = min_bitrate;
max_bitrate_ = max_bitrate;
bandwidth_ = min_bitrate_;
}
SQPCongestionControl::~SQPCongestionControl() {
}
void SQPCongestionControl::UpdateBandwidthEstimator(uint32_t frame_size, uint32_t max_frame_size, int64_t frame_send_start_ts,
int64_t frame_send_end_ts, int64_t frame_recv_start_ts, int64_t frame_recv_end_ts, int32_t one_way_delay) {
double bandwidth_sample = 0.0f;
double gama = ((double)max_frame_size) / frame_size;
if (frame_recv_end_ts == frame_send_start_ts && frame_recv_end_ts == frame_recv_start_ts) {
return;
}
bandwidth_sample = frame_size * 8 * gama * 1000 / (frame_recv_end_ts - frame_send_start_ts - one_way_delay + (frame_recv_end_ts - frame_recv_start_ts) * (gama - 1));
bandwidth_sample *= T_;
if (bandwidth_ <= 0) {
bandwidth_ = bandwidth_sample;
}
else {
bandwidth_ = bandwidth_ + delta_ * (r_ * (bandwidth_sample / bandwidth_ - 1) - (bandwidth_ / bandwidth_sample - 1));
}
}
uint32_t SQPCongestionControl::ComputePacingRate() {
return (uint32_t)(bandwidth_ * m_);
return 0;
}
};
對比實驗:
-
采用panthoen實驗平臺
-
安裝panthoen: 以下三個主要組件
Local 和 remote兩種網絡模式
-
編譯llama-sqp生成sender和receiver, 拷貝到panthoen/third_party/llama_sqp下
修改配置:
- 生成src/wrappers/llama_sqp.py:
#!/usr/bin/env python
'''REMOVE ME: Example file to add a new congestion control scheme.
Use Python 2.7 and conform to PEP8.
Use snake_case as file name and make this file executable.
'''
from os import path
from subprocess import check_call
import arg_parser
import context
def main():
# use 'arg_parser' to ensure a common test interface
args = arg_parser.receiver_first() # or 'arg_parser.sender_first()'
# paths to the sender and receiver executables, etc.
cc_repo = path.join(context.third_party_dir, 'llama_sqp')
send_src = path.join(cc_repo, 'transport_sender')
recv_src = path.join(cc_repo, 'transport_receiver')
# [optional] dependencies of Debian packages
if args.option == 'deps':
print 'example_dep_1 example_dep_2'
return
# [optional] persistent setup that only needs to be run once
if args.option == 'setup':
# avoid running as root here
return
# [optional] non-persistent setup that should be performed on every reboot
if args.option == 'setup_after_reboot':
# avoid running as root here
return
# [required] run the first side on port 'args.port'
if args.option == 'receiver':
cmd = [recv_src, args.port]
check_call(cmd)
return
# [required] run the other side to connect to the first side on 'args.ip'
if args.option == 'sender':
cmd = [send_src, args.ip, args.port]
check_call(cmd)
return
if __name__ == '__main__':
main()
配置src/config.yml添加llama_sqp
運行實驗:
src/experiments/setup.py --install-deps --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/setup.py --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/test.py local --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/analysis/analyze.py --data-dir src/experiments/data/
實驗報告:
-
本地網絡實驗:
算法對比
算法對比
Llama-SQP時延最低,但吞吐量只有72.3%,遠沒有利用好帶寬, 待改進。
以下是詳細的實驗數(shù)據:

- 遠程網絡實驗

