rustdesk 源碼解析

rustdesk 深度剖析

背景

  • 由于公司需求遠程桌面功能,和作者做了一些溝通,但作者比較忙也沒怎么回復我的問題,所以以自己個人能力能去了解一下最近比較火的遠程桌面技術 rustdesk 項目

  • 但是整體粗略看下來(個人評價,不代表大眾),實現(xiàn)的代碼比較像 C++,整體給我的感覺比較亂,而且嵌套很嚴重,整體面向?qū)ο蟮睦砟钣行┢睿旧蠜]有任何備注,整體看下來困難重重,本身 rust 特性就非常多,作者也補充了很多,也讓我學習了很多,但是真心希望代碼能多多優(yōu)化一下,確實能看出來,時間很緊,也比較體諒作者,后續(xù)也希望自己能一起維護下社區(qū)代碼

  • 中繼器目前沒有開放出來,開源的 demo 版本很簡單,就是告訴大家實現(xiàn)的主流程,具體性能優(yōu)化需要具體問題具體分析,邏輯的實現(xiàn)其實并不復雜,可以自己來補充,比如針對遠程桌面需要實現(xiàn)的遠程調(diào)用指令以及功能抽象

整體實現(xiàn)邏輯(不對的請?zhí)岢?,大家一起進步)

  • 其實整體看下來,大概后端服務會啟動兩個,一個負責 config 配置的管理,通過 unix 套接字 進行交互獲取配置以及發(fā)送指令

  • 另一個服務的話通過 unix 套接字 socket 進行交互管理配置,服務會初始化,并且會啟動 音頻、視頻、剪切板、輸入等功能服務的啟動操作,并初始化對外的 server 連接并接收來自客戶端的連接請求

  • client 端就是連接上面講到的 server 的代碼邏輯實現(xiàn), ui.rs 是頁面的具體邏輯實現(xiàn)抽象,并調(diào)用 client.rs 的相關功能

rustdesk 啟動源碼分析

main 函數(shù)分析

[features]
inline = []
cli = []
  1. 定義了 cfg features 功能,測功能需要執(zhí)行以下參數(shù)選擇編譯分支:
cargo build --features="inline"
cargo build --features="cli"
  1. 生成的二進制文件
ls -als target/debug/odontoceti
  1. 由于用了 cfg 的 features 所以在編譯時會選擇匹配的編譯分支
  • 如果匹配 系統(tǒng)為 android 和 ios 會編譯,<u>但是很可惜,這個功能并沒有支持</u>
fn main() {
    common::test_rendezvous_server();
    common::test_nat_type();
    #[cfg(target_os = "android")]
        crate::common::check_software_update();
    mobile::Session::start("");
}
  • 如果不匹配 android 和 ios 以及 feature = "cli",這個也是默認以及完全實現(xiàn)了的啟動方式
先獲取參數(shù):

let mut args = Vec::new();
let mut i = 0;
for arg in std::env::args() {
    if i > 0 {
        args.push(arg);
    }
    i += 1;
}
如果第一個參數(shù)等于 --version 輸出 版本信息:

println!("{}", crate::VERSION);
return;

... ... 忽略旁支

如果參數(shù)為空,也是默認的啟動項:

// 啟動線程
if args.is_empty() {
    std::thread::spawn(move || start_server(false, false));
... ...

// 啟動 web 服務
ui::start(&mut args[..]);

server 端啟動流程分析

  1. 啟動服務 start_server,先確認是否有已經(jīng)啟動的服務端
  • 默認無參數(shù)并且沒有服務端變量
// 默認沒有參數(shù)會進入這個分支邏輯
else {
    // 連接 ipc unix 已經(jīng)存在的套接字,并且已經(jīng)有了服務端
    // 默認在 /tmp/rustdesk
    match crate::ipc::connect(1000, "").await {
        返回 result 類型
        Ok(mut conn) => {
            // 先通過 unix 套接字 發(fā)送給服務端一個空的請求嘗試下是否可以正常通信
            allow_err!(conn.send(&Data::SystemInfo(None)).await);
            // 請求配置信息,利用 inner 迭代器獲取數(shù)據(jù),超時1秒
            if let Ok(Some(data)) = conn.next_timeout(1000).await {
                log::info!("server info: {:?}", data);
            }
            // 同步 秘鑰 配置,異步監(jiān)聽 
            // 確認秘鑰是否為最新的
            if Config::get_key_confirmed() {
                ... ...
            } else {
                // 更新秘鑰 pair 數(shù)據(jù)到共享配置
                if let Ok(Some(Data::ConfirmedKey(Some(pair)))) =
                    conn.next_timeout(1000).await
                {
                    Config::set_key_pair(pair);
                    Config::set_key_confirmed(true);
                    log::info!("key pair synced");
                    break;
                }
            }
        }
        // 捕獲 match 錯誤,證明沒有服務端已經(jīng)啟用,所以執(zhí)行線程啟動服務指令
        Err(err) => {
            log::info!("{}", err);
            std::thread::spawn(|| start_server(true, false));
        }
        
// 實例化 ServerPtr 結構 服務端
pub fn new() -> ServerPtr {
    let mut server = Server {
        connections: HashMap::new(),
        services: HashMap::new(),
        id_count: 0,
    };
    server.add_service(Box::new(audio_service::new()));
    server.add_service(Box::new(video_service::new()));
    server.add_service(Box::new(clipboard_service::new()));
    server.add_service(Box::new(input_service::new_cursor()));
    server.add_service(Box::new(input_service::new_pos()));
    Arc::new(RwLock::new(server))
}
  1. 執(zhí)行真正的初次服務端啟動
  • 此時 is_server 參數(shù)已經(jīng)修改為 true
// 定義 cfg 宏,target_os 為 linux 執(zhí)行以下代碼,獲取變量值
#[cfg(target_os = "linux")]
    {
        // 在 Linux/Unix 類操作系統(tǒng)上,
        // DISPLAY 用來設置將圖形顯示到何處,
        // 直接登陸圖形界面或者登陸命令行界面后使用 startx 啟動圖形
        log::info!("DISPLAY={:?}", std::env::var("DISPLAY"));
        // 該文件用于將憑據(jù)存儲在用于 xauthX 會話身份驗證的 cookie 中,
        // XAUTHORITY 路徑 用于驗證與該特定顯示器的連接
        log::info!("XAUTHORITY={:?}", std::env::var("XAUTHORITY"));
    }
    // 如果 is_server 為 true 執(zhí)行這段邏輯
    if is_server {
        // 啟動線程,移動 引用所有權 到 閉包內(nèi)
        std::thread::spawn(move || {
            // 如果啟動時報錯退出,調(diào)用 unix panic 返回 unix 狀態(tài)碼
            if let Err(err) = crate::ipc::start("") {
                log::error!("Failed to start ipc: {}", err);
                std::process::exit(-1);
            }
        });
        // TODO
        crate::RendezvousMediator::start_all().await;
    }

ipc 服務代碼流程分析

  1. ipc.rs start 流程分析
// tokio::main 宏 使用 Tokio 默認的單線程運行時
// current_thread runtime flavor 是一個輕量的、單線程 Runtime
// 在只需要創(chuàng)建少量任務并且處理少量套接字的情況下,他是一個不錯的選擇
// 比如為客戶端的異步函數(shù)提供一個同步接口的橋梁時,他就能工作的很好
#[tokio::main(flavor = "current_thread")]
pub async fn start(postfix: &str) -> ResultType<()> {
    // 新建 套接字 listener 
    let mut incoming = new_listener(postfix).await?;
    loop {
        // accept 函數(shù)主要用于服務器端
        // 初始化 listen 之后,默認會阻塞進程,
        // 直到有一個客戶請求連接,建立好連接后
        // 它返回的一個新的套接字 socketfd_new 
        // 此后,服務器端即可使用這個新的套接字 socketfd_new 與該客戶端進行通信
        // 而 sockfd 則繼續(xù)用于監(jiān)聽其他客戶端的連接請求
        // accept 返回句柄信息(源IP、源端口號、目的IP、目的端口號)
        // incoming.next() 調(diào)用 self.socket.accept().await 返回 Result stream 信息
        if let Some(result) = incoming.next().await {
            match result {
                Ok(stream) => {
                    // 初始化 tokio framed 連接
                    // Tokio 幫助函數(shù)將字節(jié)流轉(zhuǎn)換為幀流
                    // 字節(jié)流的例子包括 TCP 連接,管道,文件對象以及標準輸入和輸出
                    let mut stream = Connection::new(stream);
                    let postfix = postfix.to_owned();
                    // 創(chuàng)建 tokio 線程,遷移所有權到閉包
                    tokio::spawn(async move {
                        loop {
                            // stream 迭代器 Result 類型
                            match stream.next().await {
                                Err(err) => {
                                    log::trace!("ipc{} connection closed: {}", postfix, err);
                                    break;
                                }
                                Ok(Some(data)) => {
                                    // 服務端處理 stream 數(shù)據(jù)邏輯
                                    handle(data, &mut stream).await;
                                }
                                _ => {}
                            }
                        }
                    });
                }
                Err(err) => {
                    log::error!("Couldn't get client: {:?}", err);
                }
            }
        }
    }
}
  1. ipc.rs new_listener 流程分析
// 創(chuàng)建 套接字 
// postfix 直接寫死在上一級,直接就是空的
pub async fn new_listener(postfix: &str) -> ResultType<Incoming> {
    // #[cfg(not(windows))] 如果不是 windows 給予默認的創(chuàng)建 套接字 路徑地址 
    // format!("/tmp/{}", APP_NAME).into();
    // 直接(創(chuàng)建目錄、設置權限、屬主屬組、格式化) 并檢查是否創(chuàng)建成功,否則報錯
    let path = Config::ipc_path(postfix);
    #[cfg(not(windows))]
    // pid 
    // get_pid_file 創(chuàng)建 pid 文件,并且返回 pid 文件路徑 string
    // pid 檢查其正確性
    // 拿到 pid 進程號并創(chuàng)建,以及檢查是否已經(jīng)存在以及創(chuàng)建是否成功
    // 檢查 pid 進程的屬主是否和當前啟動 rustdesk 進程屬主一致
    // pid 進程的屬主如果不一致再次嘗試檢查連接
    // 服務關閉后,或者有問題,刪除 pid 
    check_pid(postfix).await;
    // 復制 unix 套接字的 path 棧引用
    // 初始化 endpoint 結構
    let mut endpoint = Endpoint::new(path.clone());
    // 屬性安全設置
    match SecurityAttributes::allow_everyone_create() {
        Ok(attr) => endpoint.set_security_attributes(attr),
        Err(err) => log::error!("Failed to set ipc{} security: {}", postfix, err),
    };
    // 設置 unix 套接字目錄的權限
    // Result 類型 match 值
    match endpoint.incoming() {
        Ok(incoming) => {
            log::info!("Started ipc{} server at path: {}", postfix, &path);
            #[cfg(not(windows))]
            {
                // 設置目錄權限,并把 pid 進程號寫入 pid 文件
                // 并且把 postfix 名字追加到 pid 文件名上
                use std::os::unix::fs::PermissionsExt;
                std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o0777)).ok();
                write_pid(postfix);
            }
            Ok(incoming)
        }
        // 錯誤類型的 直接輸出 沒啥東西
        Err(err) => {
            log::error!(
                "Faild to start ipc{} server at path {}: {}",
                postfix,
                path,
                err
            );
            Err(err.into())
        }
    }
}
  1. ipc.rs handle 流程分析
// handle 主要功能是通過 unix 套接字連接過來的客戶端
// 需要請求的 配置,并由 ipc 服務 send 給客戶端所需要的配置
// Data 是所有配置的數(shù)據(jù) rust 枚舉 類型,這個是服務端返回給客戶端的配置數(shù)據(jù)
// stream 是所需要發(fā)送的 connection 的實例化
async fn handle(data: Data, stream: &mut Connection) {
    match data {
        // 如果是需要 Data::SystemInfo(_) 類型的數(shù)據(jù)
        // 那就 send 共享的數(shù)據(jù)給客戶端
        Data::SystemInfo(_) => {
            let info = format!(
                "log_path: {}, config: {}, username: {}",
                Config::log_path().to_str().unwrap_or(""),
                Config::file().to_str().unwrap_or(""),
                crate::username(),
            );
            allow_err!(stream.send(&Data::SystemInfo(Some(info))).await);
        }
        // 如果是 close 指令數(shù)據(jù),直接關閉服務,退出 返回 unix exit 狀態(tài)碼
        Data::Close => {
            log::info!("Receive close message");
            std::process::exit(0);
        }
        // 這里會獲取 中繼器的 地址以及延遲
        // config::ONLINE 這個配置會在 啟動 
        // crate::RendezvousMediator::start_all().await 時
        // 會初始化所有配置數(shù)據(jù),包括 hashmap{"中繼器地址", "延遲時間"}
        // 之后會在這里看到配置信息,更新全局配置并通過 unix 套接字 發(fā)送給 其他客戶端
        Data::OnlineStatus(_) => {
            let x = config::ONLINE
                .lock()
                .unwrap()
                .values()
                .max()
                .unwrap_or(&0)
                .clone();
            let confirmed = Config::get_key_confirmed();
            // 配置秘鑰信息以及中繼器 服務信息 全部發(fā)送給 其他客戶端
            allow_err!(stream.send(&Data::OnlineStatus(Some((x, confirmed)))).await);
        }
        Data::ConfirmedKey(None) => {
            let out = if Config::get_key_confirmed() {
                Some(Config::get_key_pair())
            } else {
                None
            };
            // 自己的秘鑰信息 發(fā)送給 其他客戶端
            allow_err!(stream.send(&Data::ConfirmedKey(out)).await);
        }
        // 捕獲 name、value 數(shù)據(jù), 按照 value Option 類型進行判定
        Data::Config((name, value)) => match value {
            // 如果 value 為 None 
            None => {
                let value;
                if name == "id" {
                    // 獲取 id
                    value = Some(Config::get_id());
                    // 獲取密碼
                } else if name == "password" {
                    value = Some(Config::get_password());
                    // 獲取 salt 約定秘鑰
                } else if name == "salt" {
                    value = Some(Config::get_salt());
                    // 獲取 中繼器服務 地址
                } else if name == "rendezvous_server" {
                    value = Some(Config::get_rendezvous_server().to_string());
                } else {
                    value = None;
                }
                allow_err!(stream.send(&Data::Config((name, value))).await);
            }
            // 如果存在 value 數(shù)據(jù)
            Some(value) => {
                // 判定 key 為 id 插入數(shù)據(jù)到 value
                if name == "id" {
                    Config::set_id(&value);
                // 判定 key 為 password 插入數(shù)據(jù)到 value    
                } else if name == "password" {
                    Config::set_password(&value);
                // 判定 key 為 salt 插入數(shù)據(jù)到 value    
                } else if name == "salt" {
                    Config::set_salt(&value);
                } else {
                    return;
                }
                log::info!("{} updated", name);
            }
        },
        // Option 主要是通過發(fā)送 參數(shù)指令 告訴 服務端
        // 停止服務,或其他指令(目前未看到實現(xiàn))
        Data::Options(value) => match value {
            None => {
                let v = Config::get_options();
                allow_err!(stream.send(&Data::Options(Some(v))).await);
            }
            Some(value) => {
                Config::set_options(value);
            }
        },
        // 網(wǎng)絡連接 類型
        // 目前有三種 
        // 默認采用中繼器網(wǎng)絡類型 SYMMETRIC
        // 一共有兩種類型 ASYMMETRIC、SYMMETRIC
        Data::NatType(_) => {
            let t = Config::get_nat_type();
            allow_err!(stream.send(&Data::NatType(Some(t))).await);
        }
        _ => {}
    }
}

對外啟動聚合引擎服務代碼分析

  1. crate::RendezvousMediator::start_all() 聚合服務流程分析
impl RendezvousMediator {
    pub async fn start_all() {
        let mut nat_tested = false;
        // 檢查當前的進程是否可以正常創(chuàng)建線程
        // 如果正常創(chuàng)建 正常 調(diào)用 drop 休眠 100 毫秒
        check_zombie();
        // 初始化服務端
        // 并增加所需要啟動的子服務
        // 初始化并啟動 audio_service 音頻服務
        // 初始化并啟動 video_service 視頻服務
        // 初始化并啟動 clipboard_service 剪切板服務
        // 初始化并啟動 input_service -> MouseCursorService 鼠標服務
        // 初始化并啟動 input_service -> GenericService 通用服務
        let server = new_server();
        // 當前的網(wǎng)絡模式是否是未知的
        // 如果是未知網(wǎng)絡就進行 test_nat_type 
        // 判斷當前網(wǎng)絡是什么類型的(公共中繼器,還是直接做穿透)這塊的理解不一定準確
        if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
            crate::common::test_nat_type();
            nat_tested = true;
        }
        loop {
            // 重置 ONLINE HashMap 共享的連接信息配置信息
            Config::reset_online();
            // 如果配置選項里的發(fā)送指令 stop-service 為空
            if Config::get_option("stop-service").is_empty() {
                if !nat_tested {
                    crate::common::test_nat_type();
                    nat_tested = true;
                }
                let mut futs = Vec::new();
                // 獲取默認中繼器的配置信息,默認是 rustdesk 的香港一組服務器
                // ["rs-ny.rustdesk.com", "rs-sg.rustdesk.com", "rs-cn.rustdesk.com"]
                let servers = Config::get_rendezvous_servers();
                for host in servers.clone() {
                    let server = server.clone();
                    let servers = servers.clone();
                    // 創(chuàng)建線程
                    futs.push(tokio::spawn(async move {
                        allow_err!(Self::start(server, host, servers).await);
                    }));
                }
                // 等待所有線程停止
                join_all(futs).await;
            }
            //
            sleep(1.).await;
        }
    }

    pub async fn start(
        server: ServerPtr,
        host: String,
        rendezvous_servers: Vec<String>,
    ) -> ResultType<()> {
        log::info!("start rendezvous mediator of {}", host);
        let host_prefix: String = host
            .split(".")
            .next()
            .map(|x| {
                if x.parse::<i32>().is_ok() {
                    host.clone()
                } else {
                    x.to_string()
                }
            })
            .unwrap_or(host.to_owned());
        let mut rz = Self {
            addr: Config::get_any_listen_addr(),
            host: host.clone(),
            host_prefix,
            rendezvous_servers,
            last_id_pk_registery: "".to_owned(),
        };
        allow_err!(rz.dns_check());
        // 通過 socket 連接服務端
        let mut socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
        const TIMER_OUT: Duration = Duration::from_secs(1);
        let mut timer = interval(TIMER_OUT);
        let mut last_timer = SystemTime::UNIX_EPOCH;
        const REG_INTERVAL: i64 = 12_000;
        const REG_TIMEOUT: i64 = 3_000;
        const MAX_FAILS1: i64 = 3;
        const MAX_FAILS2: i64 = 6;
        const DNS_INTERVAL: i64 = 60_000;
        let mut fails = 0;
        // 注冊 unix 套接字 請求
        let mut last_register_resp = SystemTime::UNIX_EPOCH;
        // 注冊 unix 套接字 發(fā)送端
        let mut last_register_sent = SystemTime::UNIX_EPOCH;
        let mut last_dns_check = SystemTime::UNIX_EPOCH;
        let mut old_latency = 0;
        let mut ema_latency = 0;
        loop {
            select! {
                Some(Ok((bytes, _))) = socket.next() => {
                    if let Ok(msg_in) = Message::parse_from_bytes(&bytes) {
                        match msg_in.union {
                            Some(rendezvous_message::Union::register_peer_response(rpr)) => {
                                update_latency();
                                if rpr.request_pk {
                                    log::info!("request_pk received from {}", host);
                                    allow_err!(rz.register_pk(&mut socket).await);
                                    continue;
                                }
                            }
                            Some(rendezvous_message::Union::register_pk_response(rpr)) => {
                                update_latency();
                                match rpr.result.enum_value_or_default() {
                                    register_pk_response::Result::OK => {
                                        Config::set_key_confirmed(true);
                                        Config::set_host_key_confirmed(&rz.host_prefix, true);
                                        *SOLVING_PK_MISMATCH.lock().unwrap() = "".to_owned();
                                    }
                                    register_pk_response::Result::UUID_MISMATCH => {
                                        allow_err!(rz.handle_uuid_mismatch(&mut socket).await);
                                    }
                                }
                            }
                            Some(rendezvous_message::Union::punch_hole(ph)) => {
                                let rz = rz.clone();
                                let server = server.clone();
                                tokio::spawn(async move {
                                    allow_err!(rz.handle_punch_hole(ph, server).await);
                                });
                            }
                            Some(rendezvous_message::Union::request_relay(rr)) => {
                                let rz = rz.clone();
                                let server = server.clone();
                                tokio::spawn(async move {
                                    allow_err!(rz.handle_request_relay(rr, server).await);
                                });
                            }
                            Some(rendezvous_message::Union::fetch_local_addr(fla)) => {
                                let rz = rz.clone();
                                let server = server.clone();
                                tokio::spawn(async move {
                                    allow_err!(rz.handle_intranet(fla, server).await);
                                });
                            }
                            Some(rendezvous_message::Union::configure_update(cu)) => {
                                Config::set_option("rendezvous-servers".to_owned(), cu.rendezvous_servers.join(","));
                                Config::set_serial(cu.serial);
                            }
                            _ => {}
                        }
                    } else {
                        log::debug!("Non-protobuf message bytes received: {:?}", bytes);
                    }
                },
                _ = timer.tick() => {
                    if Config::get_rendezvous_servers() != rz.rendezvous_servers {
                        break;
                    }
                    if !Config::get_option("stop-service").is_empty() {
                        break;
                    }
                    if rz.addr.port() == 0 {
                        allow_err!(rz.dns_check());
                        if rz.addr.port() == 0 {
                            continue;
                        } else {
                            // have to do this for osx, to avoid "Can't assign requested address"
                            // when socket created before OS network ready
                            socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
                        }
                    }
                    let now = SystemTime::now();
                    if now.duration_since(last_timer).map(|d| d < TIMER_OUT).unwrap_or(false) {
                        // a workaround of tokio timer bug
                        continue;
                    }
                    last_timer = now;
                    let elapsed_resp = now.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(REG_INTERVAL);
                    let timeout = last_register_sent.duration_since(last_register_resp).map(|d| d.as_millis() as i64).unwrap_or(0) >= REG_TIMEOUT;
                    if timeout || elapsed_resp >= REG_INTERVAL {
                        allow_err!(rz.register_peer(&mut socket).await);
                        last_register_sent = now;
                        if timeout {
                            fails += 1;
                            if fails > MAX_FAILS2 {
                                Config::update_latency(&host, -1);
                                old_latency = 0;
                                if now.duration_since(last_dns_check).map(|d| d.as_millis() as i64).unwrap_or(0) > DNS_INTERVAL {
                                    if let Ok(_) = rz.dns_check() {
                                        // in some case of network reconnect (dial IP network),
                                        // old UDP socket not work any more after network recover
                                        socket = FramedSocket::new(Config::get_any_listen_addr()).await?;
                                    }
                                    last_dns_check = now;
                                }
                            } else if fails > MAX_FAILS1 {
                                Config::update_latency(&host, 0);
                                old_latency = 0;
                            }
                        }
                    }
                }
            }
        }
        Ok(())
    }
}

待續(xù).......

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容