淺談 Gossipsub

情況概述

目前l(fā)ibp2p-rs團隊在開發(fā)分支上初步完成了Gossip協(xié)議的相關工作,將會盡快發(fā)布。以下是Gossip協(xié)議的相關簡單介紹:

從Pubsub說起

Pubsub的意思是publish/subscribe,即發(fā)布/訂閱,是消息傳播一種機制。任意一個節(jié)點都可以關注感興趣的主題,并發(fā)布主題相關的消息,此消息將會被發(fā)送到任何訂閱了該主題的節(jié)點上。Pubsub共有三種實現:Floodsub,Gossipsub,Randomsub。本文將主要分析floodsub和gossipsub。

Floodsub

Floodsub顧名思義,是一種效果類似于“泛洪”的發(fā)布/訂閱機制。在libp2p-rs中,每一個節(jié)點通過floodsub收到相關的消息后,首先在本地進行相關的處理,然后通過再發(fā)布的形式擴散到周圍的節(jié)點。

Floodsub的優(yōu)點在于可維護性較強,且在消息的傳播路徑上能最小化網絡延遲所帶來的問題。但是它仍然有不足的地方,當某個節(jié)點擁有大量的連接節(jié)點時,轉發(fā)消息可能將會帶來極大的帶寬問題。

Gossipsub

Gossipsub是在floodsub的基礎上設計開發(fā)的一個協(xié)議。通過施加一些限制,以及設計一些相關的數據結構,解決了floodsub帶來的問題。核心是維護mesh和fanout。

mesh&fanout

mesh是一個針對主題形成小范圍的節(jié)點網格,結構如下:

    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,

這是一個以主題哈希為key,B樹集合為value的哈希表。針對每一個主題,節(jié)點都會生成一棵B樹集合,需要注意的是這棵B樹有節(jié)點個數的最大最小值限制。每收到一個相關主題的消息,節(jié)點都會消息轉發(fā)到對應的B樹集合節(jié)點上,由于節(jié)點是有限的,此時將大幅度減少帶寬的使用。

fanout則是一種比較特殊的網格,結構與mesh相同:

    /// Map of topics to list of peers that we publish to, but don't subscribe to.
    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,

fanout記錄的是發(fā)布了消息但是沒有進行訂閱操作的主題與節(jié)點關系。即代表一個節(jié)點可以不訂閱某個主題而直接發(fā)送與該主題相關的消息。fanout的構建只與publish有關。

fanout和topic_peers是構建mesh的關鍵。topic_peers的結構如下:

    /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
    topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,

topic_peers用來記錄主題與已連接的節(jié)點ID對應關系。
構建mesh時,首先從fanout中查找數據,通過某些特定的條件篩選出節(jié)點添加到mesh;如果mesh當前節(jié)點數小于最小值,再通過topic_peers找出其他已知節(jié)點,完成mesh的構建。同時還將構建類型為GRAFT的消息并向外傳播。

Control Message

Control Message是用來維護gossip的消息。共有四種類型:GRAFT,PRUNE,IHAVE,IWANT。

GRAFT,意為嫁接、移植。在gossip中,如果一個節(jié)點A訂閱了某個主題,那么會向周圍節(jié)點發(fā)布GRAFT消息,通知它們把A加入到mesh中。接收到消息后,周圍節(jié)點會通過一系列的條件判斷來決定是否A節(jié)點添加到mesh。

PRUNE與GRAFT正好相反,是從mesh中移除節(jié)點。PRUNE的觸發(fā)場景一般是在取消訂閱時,需要告知其他節(jié)點執(zhí)行mesh移除的操作。

IHAVE是當前節(jié)點向外廣播,告訴其他節(jié)點“我”的mesh有哪些主題以及這個mesh下所有已知的消息。如果有節(jié)點對某個主題感興趣,就需要回復一個IWANT消息,雙方才能進行信息的交換。

IWANT代表需要一個或者多個主題對應的消息。節(jié)點接收到IWANT消息后,會從本地消息緩存中查找對應的消息ID并返回。需要注意的是,為了防止惡意的多次請求,消息查找有一定的次數限制,這與GossipConfig的設置有關。

Message Cache

針對接收到的消息,Gossip提供全局的Message Cache進行緩存操作:

/// CacheEntry stored in the history.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheEntry {
    mid: MessageId,
    topic: TopicHash,
}

/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
    msgs: HashMap<MessageId, RawGossipsubMessage>,
    /// For every message and peer the number of times this peer asked for the message
    iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
    history: Vec<Vec<CacheEntry>>,
    /// The number of indices in the cache history used for gossipping. That means that a message
    /// won't get gossipped anymore when shift got called `gossip` many times after inserting the
    /// message in the cache.
    gossip: usize,
}

msgs記錄的是消息ID與具體的消息內容;iwant_counts是上面提到的對消息查找的訪問次數限制;history記錄了歷史的緩存信息;gossip代表最大可訪問到的歷史消息條數。

需要注意的是,Message Cache的緩存會在heartbeat執(zhí)行的時候進行清理。

    /// Shift the history array down one and delete messages associated with the
    /// last entry.
    pub fn shift(&mut self) {
        for entry in self.history.pop().expect("history is always > 1") {
            if let Some(msg) = self.msgs.remove(&entry.mid) {
                if !msg.validated {
                    // If GossipsubConfig::validate_messages is true, the implementing
                    // application has to ensure that Gossipsub::validate_message gets called for
                    // each received message within the cache timeout time."
                    debug!(
                        "The message with id {} got removed from the cache without being validated.",
                        &entry.mid
                    );
                }
            }
            debug!("Remove message from the cache: {}", &entry.mid);

            self.iwant_counts.remove(&entry.mid);
        }

        // Insert an empty vec in position 0
        self.history.insert(0, Vec::new());
    }

首先將最早的歷史消息數組從history中取出,其中可能包含了一條或多條消息,再從msgs中消除對應的Gossip Message,并重置消息的訪問次數。最后再往history中新增一條空記錄,確保下一次執(zhí)行put操作時不會出現錯誤情況。

Heartbeat

Heartbeat維護了一個定期執(zhí)行的心跳連接過程,在固定的頻率下向外發(fā)送消息。功能上實現了幾個效果:維持mesh和fanout的穩(wěn)定,以及向外的廣播。

維護mesh

mesh維護比較好理解,因為mesh本身有上下限的限制。mesh少了,從topic_peers中補充;多了就根據評分系統(tǒng)執(zhí)行相關的裁剪功能。此外,對于mesh中的節(jié)點還需要更新評分,以確保系統(tǒng)更好的運行。

維護fanout

前文有提到,fanout的作用是記錄沒有訂閱但是發(fā)布了消息的主題。對于每一個發(fā)布了消息的主題,有一個數據結構fanout_last_pub記錄了發(fā)布的時間:

    /// The last publish time for fanout topics.
    fanout_last_pub: HashMap<TopicHash, Instant>,

如果超過了Gossip預設的過期時間,這條主題就會從fanout中刪除。

fanout的維護還與peer_topics有關。peer_topics是一個關聯節(jié)點與主題的數據結構:

    /// A map of all connected peers to their subscribed topics.
    peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,

如果某個節(jié)點存在于fanout的某個主題中,但是在peer_topics中發(fā)現該節(jié)點已經不再關注此主題,就會被標記為需要移除。

fanout本身有節(jié)點的下限數限制,與mesh一樣,都是從topic_peers中找出節(jié)點進行補充。

向外廣播

這個機制是為了讓不在mesh和fanout中,卻又訂閱了相關主題的節(jié)點,也有機會接收到主題相關的消息。并不是每個節(jié)點都一定會收到消息,隨機選取節(jié)點的算法受到Config中的配置和關注主題的節(jié)點數兩者共同影響。

首先將mesh和fanout合并成一個迭代器,從Message Cache中取出主題對應的消息ID,向topic_peers中隨機選出的節(jié)點發(fā)送IHAVE消息。

總結

Gossip協(xié)議的東西比較多,上述內容只是其中的一小部分介紹,更詳細的代碼可參閱libp2p-rs后續(xù)的版本發(fā)布,感謝各位閱讀。


Netwarps 由國內資深的云計算和分布式技術開發(fā)團隊組成,該團隊在金融、電力、通信及互聯網行業(yè)有非常豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發(fā)中心,團隊規(guī)模30+,其中大部分為具備十年以上開發(fā)經驗的技術人員,分別來自互聯網、金融、云計算、區(qū)塊鏈以及科研機構等專業(yè)領域。

Netwarps 專注于安全存儲技術產品的研發(fā)與應用,主要產品有去中心化文件系統(tǒng)(DFS)、去中心化計算平臺(DCP),致力于提供基于去中心化網絡技術實現的分布式存儲和分布式計算平臺,具有高可用、低功耗和低網絡的技術特點,適用于物聯網、工業(yè)互聯網等場景。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容