rustdesk 深度剖析
背景
由于公司需求遠程桌面功能,和作者做了一些溝通,但作者比較忙也沒怎么回復我的問題,所以以自己個人能力能去了解一下最近比較火的遠程桌面技術 rustdesk 項目
但是整體粗略看下來(個人評價,不代表大眾),實現(xiàn)的代碼比較像 C++,整體給我的感覺比較亂,而且嵌套很嚴重,整體面向?qū)ο蟮睦砟钣行┢睿旧蠜]有任何備注,整體看下來困難重重,本身 rust 特性就非常多,作者也補充了很多,也讓我學習了很多,但是真心希望代碼能多多優(yōu)化一下,確實能看出來,時間很緊,也比較體諒作者,后續(xù)也希望自己能一起維護下社區(qū)代碼
中繼器目前沒有開放出來,開源的 demo 版本很簡單,就是告訴大家實現(xiàn)的主流程,具體性能優(yōu)化需要具體問題具體分析,邏輯的實現(xiàn)其實并不復雜,可以自己來補充,比如針對遠程桌面需要實現(xiàn)的遠程調(diào)用指令以及功能抽象
整體實現(xiàn)邏輯(不對的請?zhí)岢?,大家一起進步)
其實整體看下來,大概后端服務會啟動兩個,一個負責 config 配置的管理,通過 unix 套接字 進行交互獲取配置以及發(fā)送指令
另一個服務的話通過 unix 套接字 socket 進行交互管理配置,服務會初始化,并且會啟動 音頻、視頻、剪切板、輸入等功能服務的啟動操作,并初始化對外的 server 連接并接收來自客戶端的連接請求
client 端就是連接上面講到的 server 的代碼邏輯實現(xiàn), ui.rs 是頁面的具體邏輯實現(xiàn)抽象,并調(diào)用 client.rs 的相關功能
rustdesk 啟動源碼分析
main 函數(shù)分析
[features]
inline = []
cli = []
- 定義了 cfg features 功能,測功能需要執(zhí)行以下參數(shù)選擇編譯分支:
cargo build --features="inline"
cargo build --features="cli"
- 生成的二進制文件
ls -als target/debug/odontoceti
- 由于用了 cfg 的 features 所以在編譯時會選擇匹配的編譯分支
- 如果匹配 系統(tǒng)為 android 和 ios 會編譯,<u>但是很可惜,這個功能并沒有支持</u>
fn main() {
common::test_rendezvous_server();
common::test_nat_type();
#[cfg(target_os = "android")]
crate::common::check_software_update();
mobile::Session::start("");
}
- 如果不匹配 android 和 ios 以及 feature = "cli",這個也是默認以及完全實現(xiàn)了的啟動方式
先獲取參數(shù):
let mut args = Vec::new();
let mut i = 0;
for arg in std::env::args() {
if i > 0 {
args.push(arg);
}
i += 1;
}
如果第一個參數(shù)等于 --version 輸出 版本信息:
println!("{}", crate::VERSION);
return;
... ... 忽略旁支
如果參數(shù)為空,也是默認的啟動項:
// 啟動線程
if args.is_empty() {
std::thread::spawn(move || start_server(false, false));
... ...
// 啟動 web 服務
ui::start(&mut args[..]);
server 端啟動流程分析
- 啟動服務 start_server,先確認是否有已經(jīng)啟動的服務端
- 默認無參數(shù)并且沒有服務端變量
// 默認沒有參數(shù)會進入這個分支邏輯
else {
// 連接 ipc unix 已經(jīng)存在的套接字,并且已經(jīng)有了服務端
// 默認在 /tmp/rustdesk
match crate::ipc::connect(1000, "").await {
返回 result 類型
Ok(mut conn) => {
// 先通過 unix 套接字 發(fā)送給服務端一個空的請求嘗試下是否可以正常通信
allow_err!(conn.send(&Data::SystemInfo(None)).await);
// 請求配置信息,利用 inner 迭代器獲取數(shù)據(jù),超時1秒
if let Ok(Some(data)) = conn.next_timeout(1000).await {
log::info!("server info: {:?}", data);
}
// 同步 秘鑰 配置,異步監(jiān)聽
// 確認秘鑰是否為最新的
if Config::get_key_confirmed() {
... ...
} else {
// 更新秘鑰 pair 數(shù)據(jù)到共享配置
if let Ok(Some(Data::ConfirmedKey(Some(pair)))) =
conn.next_timeout(1000).await
{
Config::set_key_pair(pair);
Config::set_key_confirmed(true);
log::info!("key pair synced");
break;
}
}
}
// 捕獲 match 錯誤,證明沒有服務端已經(jīng)啟用,所以執(zhí)行線程啟動服務指令
Err(err) => {
log::info!("{}", err);
std::thread::spawn(|| start_server(true, false));
}
// 實例化 ServerPtr 結構 服務端
pub fn new() -> ServerPtr {
let mut server = Server {
connections: HashMap::new(),
services: HashMap::new(),
id_count: 0,
};
server.add_service(Box::new(audio_service::new()));
server.add_service(Box::new(video_service::new()));
server.add_service(Box::new(clipboard_service::new()));
server.add_service(Box::new(input_service::new_cursor()));
server.add_service(Box::new(input_service::new_pos()));
Arc::new(RwLock::new(server))
}
- 執(zhí)行真正的初次服務端啟動
- 此時 is_server 參數(shù)已經(jīng)修改為 true
// 定義 cfg 宏,target_os 為 linux 執(zhí)行以下代碼,獲取變量值
#[cfg(target_os = "linux")]
{
// 在 Linux/Unix 類操作系統(tǒng)上,
// DISPLAY 用來設置將圖形顯示到何處,
// 直接登陸圖形界面或者登陸命令行界面后使用 startx 啟動圖形
log::info!("DISPLAY={:?}", std::env::var("DISPLAY"));
// 該文件用于將憑據(jù)存儲在用于 xauthX 會話身份驗證的 cookie 中,
// XAUTHORITY 路徑 用于驗證與該特定顯示器的連接
log::info!("XAUTHORITY={:?}", std::env::var("XAUTHORITY"));
}
// 如果 is_server 為 true 執(zhí)行這段邏輯
if is_server {
// 啟動線程,移動 引用所有權 到 閉包內(nèi)
std::thread::spawn(move || {
// 如果啟動時報錯退出,調(diào)用 unix panic 返回 unix 狀態(tài)碼
if let Err(err) = crate::ipc::start("") {
log::error!("Failed to start ipc: {}", err);
std::process::exit(-1);
}
});
// TODO
crate::RendezvousMediator::start_all().await;
}
ipc 服務代碼流程分析
- ipc.rs start 流程分析
// tokio::main 宏 使用 Tokio 默認的單線程運行時
// current_thread runtime flavor 是一個輕量的、單線程 Runtime
// 在只需要創(chuàng)建少量任務并且處理少量套接字的情況下,他是一個不錯的選擇
// 比如為客戶端的異步函數(shù)提供一個同步接口的橋梁時,他就能工作的很好
#[tokio::main(flavor = "current_thread")]
pub async fn start(postfix: &str) -> ResultType<()> {
// 新建 套接字 listener
let mut incoming = new_listener(postfix).await?;
loop {
// accept 函數(shù)主要用于服務器端
// 初始化 listen 之后,默認會阻塞進程,
// 直到有一個客戶請求連接,建立好連接后
// 它返回的一個新的套接字 socketfd_new
// 此后,服務器端即可使用這個新的套接字 socketfd_new 與該客戶端進行通信
// 而 sockfd 則繼續(xù)用于監(jiān)聽其他客戶端的連接請求
// accept 返回句柄信息(源IP、源端口號、目的IP、目的端口號)
// incoming.next() 調(diào)用 self.socket.accept().await 返回 Result stream 信息
if let Some(result) = incoming.next().await {
match result {
Ok(stream) => {
// 初始化 tokio framed 連接
// Tokio 幫助函數(shù)將字節(jié)流轉(zhuǎn)換為幀流
// 字節(jié)流的例子包括 TCP 連接,管道,文件對象以及標準輸入和輸出
let mut stream = Connection::new(stream);
let postfix = postfix.to_owned();
// 創(chuàng)建 tokio 線程,遷移所有權到閉包
tokio::spawn(async move {
loop {
// stream 迭代器 Result 類型
match stream.next().await {
Err(err) => {
log::trace!("ipc{} connection closed: {}", postfix, err);
break;
}
Ok(Some(data)) => {
// 服務端處理 stream 數(shù)據(jù)邏輯
handle(data, &mut stream).await;
}
_ => {}
}
}
});
}
Err(err) => {
log::error!("Couldn't get client: {:?}", err);
}
}
}
}
}
- ipc.rs new_listener 流程分析
// 創(chuàng)建 套接字
// postfix 直接寫死在上一級,直接就是空的
pub async fn new_listener(postfix: &str) -> ResultType<Incoming> {
// #[cfg(not(windows))] 如果不是 windows 給予默認的創(chuàng)建 套接字 路徑地址
// format!("/tmp/{}", APP_NAME).into();
// 直接(創(chuàng)建目錄、設置權限、屬主屬組、格式化) 并檢查是否創(chuàng)建成功,否則報錯
let path = Config::ipc_path(postfix);
#[cfg(not(windows))]
// pid
// get_pid_file 創(chuàng)建 pid 文件,并且返回 pid 文件路徑 string
// pid 檢查其正確性
// 拿到 pid 進程號并創(chuàng)建,以及檢查是否已經(jīng)存在以及創(chuàng)建是否成功
// 檢查 pid 進程的屬主是否和當前啟動 rustdesk 進程屬主一致
// pid 進程的屬主如果不一致再次嘗試檢查連接
// 服務關閉后,或者有問題,刪除 pid
check_pid(postfix).await;
// 復制 unix 套接字的 path 棧引用
// 初始化 endpoint 結構
let mut endpoint = Endpoint::new(path.clone());
// 屬性安全設置
match SecurityAttributes::allow_everyone_create() {
Ok(attr) => endpoint.set_security_attributes(attr),
Err(err) => log::error!("Failed to set ipc{} security: {}", postfix, err),
};
// 設置 unix 套接字目錄的權限
// Result 類型 match 值
match endpoint.incoming() {
Ok(incoming) => {
log::info!("Started ipc{} server at path: {}", postfix, &path);
#[cfg(not(windows))]
{
// 設置目錄權限,并把 pid 進程號寫入 pid 文件
// 并且把 postfix 名字追加到 pid 文件名上
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok();
write_pid(postfix);
}
Ok(incoming)
}
// 錯誤類型的 直接輸出 沒啥東西
Err(err) => {
log::error!(
"Faild to start ipc{} server at path {}: {}",
postfix,
path,
err
);
Err(err.into())
}
}
}
- ipc.rs handle 流程分析
// handle 主要功能是通過 unix 套接字連接過來的客戶端
// 需要請求的 配置,并由 ipc 服務 send 給客戶端所需要的配置
// Data 是所有配置的數(shù)據(jù) rust 枚舉 類型,這個是服務端返回給客戶端的配置數(shù)據(jù)
// stream 是所需要發(fā)送的 connection 的實例化
async fn handle(data: Data, stream: &mut Connection) {
match data {
// 如果是需要 Data::SystemInfo(_) 類型的數(shù)據(jù)
// 那就 send 共享的數(shù)據(jù)給客戶端
Data::SystemInfo(_) => {
let info = format!(
"log_path: {}, config: {}, username: {}",
Config::log_path().to_str().unwrap_or(""),
Config::file().to_str().unwrap_or(""),
crate::username(),
);
allow_err!(stream.send(&Data::SystemInfo(Some(info))).await);
}
// 如果是 close 指令數(shù)據(jù),直接關閉服務,退出 返回 unix exit 狀態(tài)碼
Data::Close => {
log::info!("Receive close message");
std::process::exit(0);
}
// 這里會獲取 中繼器的 地址以及延遲
// config::ONLINE 這個配置會在 啟動
// crate::RendezvousMediator::start_all().await 時
// 會初始化所有配置數(shù)據(jù),包括 hashmap{"中繼器地址", "延遲時間"}
// 之后會在這里看到配置信息,更新全局配置并通過 unix 套接字 發(fā)送給 其他客戶端
Data::OnlineStatus(_) => {
let x = config::ONLINE
.lock()
.unwrap()
.values()
.max()
.unwrap_or(&0)
.clone();
let confirmed = Config::get_key_confirmed();
// 配置秘鑰信息以及中繼器 服務信息 全部發(fā)送給 其他客戶端
allow_err!(stream.send(&Data::OnlineStatus(Some((x, confirmed)))).await);
}
Data::ConfirmedKey(None) => {
let out = if Config::get_key_confirmed() {
Some(Config::get_key_pair())
} else {
None
};
// 自己的秘鑰信息 發(fā)送給 其他客戶端
allow_err!(stream.send(&Data::ConfirmedKey(out)).await);
}
// 捕獲 name、value 數(shù)據(jù), 按照 value Option 類型進行判定
Data::Config((name, value)) => match value {
// 如果 value 為 None
None => {
let value;
if name == "id" {
// 獲取 id
value = Some(Config::get_id());
// 獲取密碼
} else if name == "password" {
value = Some(Config::get_password());
// 獲取 salt 約定秘鑰
} else if name == "salt" {
value = Some(Config::get_salt());
// 獲取 中繼器服務 地址
} else if name == "rendezvous_server" {
value = Some(Config::get_rendezvous_server().to_string());
} else {
value = None;
}
allow_err!(stream.send(&Data::Config((name, value))).await);
}
// 如果存在 value 數(shù)據(jù)
Some(value) => {
// 判定 key 為 id 插入數(shù)據(jù)到 value
if name == "id" {
Config::set_id(&value);
// 判定 key 為 password 插入數(shù)據(jù)到 value
} else if name == "password" {
Config::set_password(&value);
// 判定 key 為 salt 插入數(shù)據(jù)到 value
} else if name == "salt" {
Config::set_salt(&value);
} else {
return;
}
log::info!("{} updated", name);
}
},
// Option 主要是通過發(fā)送 參數(shù)指令 告訴 服務端
// 停止服務,或其他指令(目前未看到實現(xiàn))
Data::Options(value) => match value {
None => {
let v = Config::get_options();
allow_err!(stream.send(&Data::Options(Some(v))).await);
}
Some(value) => {
Config::set_options(value);
}
},
// 網(wǎng)絡連接 類型
// 目前有三種
// 默認采用中繼器網(wǎng)絡類型 SYMMETRIC
// 一共有兩種類型 ASYMMETRIC、SYMMETRIC
Data::NatType(_) => {
let t = Config::get_nat_type();
allow_err!(stream.send(&Data::NatType(Some(t))).await);
}
_ => {}
}
}
對外啟動聚合引擎服務代碼分析
- crate::RendezvousMediator::start_all() 聚合服務流程分析
impl RendezvousMediator {
pub async fn start_all() {
let mut nat_tested = false;
// 檢查當前的進程是否可以正常創(chuàng)建線程
// 如果正常創(chuàng)建 正常 調(diào)用 drop 休眠 100 毫秒
check_zombie();
// 初始化服務端
// 并增加所需要啟動的子服務
// 初始化并啟動 audio_service 音頻服務
// 初始化并啟動 video_service 視頻服務
// 初始化并啟動 clipboard_service 剪切板服務
// 初始化并啟動 input_service -> MouseCursorService 鼠標服務
// 初始化并啟動 input_service -> GenericService 通用服務
let server = new_server();
// 當前的網(wǎng)絡模式是否是未知的
// 如果是未知網(wǎng)絡就進行 test_nat_type
// 判斷當前網(wǎng)絡是什么類型的(公共中繼器,還是直接做穿透)這塊的理解不一定準確
if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
crate::common::test_nat_type();
nat_tested = true;
}
loop {
// 重置 ONLINE HashMap 共享的連接信息配置信息
Config::reset_online();
// 如果配置選項里的發(fā)送指令 stop-service 為空
if Config::get_option("stop-service").is_empty() {
if !nat_tested {
crate::common::test_nat_type();
nat_tested = true;
}
let mut futs = Vec::new();
// 獲取默認中繼器的配置信息,默認是 rustdesk 的香港一組服務器
// ["rs-ny.rustdesk.com", "rs-sg.rustdesk.com", "rs-cn.rustdesk.com"]
let servers = Config::get_rendezvous_servers();
for host in servers.clone() {
let server = server.clone();
let servers = servers.clone();
// 創(chuàng)建線程
futs.push(tokio::spawn(async move {
allow_err!(Self::start(server, host, servers).await);
}));
}
// 等待所有線程停止
join_all(futs).await;
}
//
sleep(1.).await;
}
}
pub async fn start(
server: ServerPtr,
host: String,
rendezvous_servers: Vec<String>,
) -> ResultType<()> {
log::info!("start rendezvous mediator of {}", host);
let host_prefix: String = host
.split(".")
.next()
.map(|x| {
if x.parse::<i32>().is_ok() {
host.clone()
} else {
x.to_string()
}
})
.unwrap_or(host.to_owned());
let mut rz = Self {
addr: Config::get_any_listen_addr(),
host: host.clone(),
host_prefix,
rendezvous_servers,
last_id_pk_registery: "".to_owned(),
};
allow_err!(rz.dns_check());
// 通過 socket 連接服務端
let mut socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
const TIMER_OUT: Duration = Duration::from_secs(1);
let mut timer = interval(TIMER_OUT);
let mut last_timer = SystemTime::UNIX_EPOCH;
const REG_INTERVAL: i64 = 12_000;
const REG_TIMEOUT: i64 = 3_000;
const MAX_FAILS1: i64 = 3;
const MAX_FAILS2: i64 = 6;
const DNS_INTERVAL: i64 = 60_000;
let mut fails = 0;
// 注冊 unix 套接字 請求
let mut last_register_resp = SystemTime::UNIX_EPOCH;
// 注冊 unix 套接字 發(fā)送端
let mut last_register_sent = SystemTime::UNIX_EPOCH;
let mut last_dns_check = SystemTime::UNIX_EPOCH;
let mut old_latency = 0;
let mut ema_latency = 0;
loop {
select! {
Some(Ok((bytes, _))) = socket.next() => {
if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
match msg_in.union {
Some(rendezvous_message::Union::register_peer_response(rpr)) => {
update_latency();
if rpr.request_pk {
log::info!("request_pk received from {}", host);
allow_err!(rz.register_pk(&mut socket).await);
continue;
}
}
Some(rendezvous_message::Union::register_pk_response(rpr)) => {
update_latency();
match rpr.result.enum_value_or_default() {
register_pk_response::Result::OK => {
Config::set_key_confirmed(true);
Config::set_host_key_confirmed(&rz.host_prefix, true);
*SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
}
register_pk_response::Result::UUID_MISMATCH => {
allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
}
}
}
Some(rendezvous_message::Union::punch_hole(ph)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_punch_hole(ph, server).await);
});
}
Some(rendezvous_message::Union::request_relay(rr)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_request_relay(rr, server).await);
});
}
Some(rendezvous_message::Union::fetch_local_addr(fla)) => {
let rz = rz.clone();
let server = server.clone();
tokio::spawn(async move {
allow_err!(rz.handle_intranet(fla, server).await);
});
}
Some(rendezvous_message::Union::configure_update(cu)) => {
Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
Config::set_serial(cu.serial);
}
_ => {}
}
} else {
log::debug!("Non-protobuf message bytes received: {:?}", bytes);
}
},
_ = timer.tick() => {
if Config::get_rendezvous_servers() != rz.rendezvous_servers {
break;
}
if !Config::get_option("stop-service").is_empty() {
break;
}
if rz.addr.port() == 0 {
allow_err!(rz.dns_check());
if rz.addr.port() == 0 {
continue;
} else {
// have to do this for osx, to avoid "Can't assign requested address"
// when socket created before OS network ready
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
}
}
let now = SystemTime::now();
if now.duration_since(last_timer).map(|d| d < TIMER_OUT).unwrap_or(false) {
// a workaround of tokio timer bug
continue;
}
last_timer = now;
let elapsed_resp = now.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(REG_INTERVAL);
let timeout = last_register_sent.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(0) >= REG_TIMEOUT;
if timeout || elapsed_resp >= REG_INTERVAL {
allow_err!(rz.register_peer(&mut socket).await);
last_register_sent = now;
if timeout {
fails += 1;
if fails > MAX_FAILS2 {
Config::update_latency(&host, -1);
old_latency = 0;
if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
if let Ok(_) = rz.dns_check() {
// in some case of network reconnect (dial IP network),
// old UDP socket not work any more after network recover
socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
}
last_dns_check = now;
}
} else if fails > MAX_FAILS1 {
Config::update_latency(&host, 0);
old_latency = 0;
}
}
}
}
}
}
Ok(())
}
}
待續(xù).......