模塊地址: https://github.com/netwarps/libp2p-rs/tree/master/swarm/src/metrics
libp2p-rs 作為一個 p2p 網絡項目,有時候我們可能需要觀察網絡數據的收發(fā)情況,并對其進行收集和匯總?;谶@個前提,設計了一個 metric 模塊去實現相關內容。
metric實現構想
由于 libp2p 支持連接多個 peer,而每個 peer 支持的 protocol 類型也不盡相同。我們不但需要匯總收發(fā)包的數據,同時也需要根據 peer_id 和 protocol,去分類記錄相應的網絡流量情況。很明顯,這是一個 key-value 結構,自然會想到使用 HashMap 去存儲相關數據,但是 HashMap 不是一個線程安全的數據結構,那我們就需要考慮實現一個支持多線程安全并發(fā)的 HashMap。
安全并發(fā)
在設計的初始,首先考慮到的就是使用 Arc 包裹 Mutex 的方式去保證線程安全,但由于目前的使用場景是統(tǒng)計網絡收發(fā)包情況,如果頻繁進行 lock 的操作,會導致性能極其低下。于是我參考了go-libp2p 的相關 metric 實現,Go 的底層是使用了一個 sync.Map 的結構,通過 Atomic+Mutex 保證了多線程并發(fā)安全。因此設計的邏輯就變成了,能否使用 CAS 之類的原子操作,實現一個 lock-free 的 HashMap。
垃圾回收
除了線程安全,還有一種情況也需要考慮。在Java和Go中,變量使用完后,GC會自動幫我們執(zhí)行釋放內存的操作。在 Rust 中,裸指針是指向內存地址的指針,只能通過手動釋放的方式去回收內存;同時,在手動回收的時候,還需要考慮是否有其他線程正在通過裸指針使用某塊內存地址。而 AtomicPtr 的 compare_and_swap() 方法返回的恰好是一個可變的裸指針(即*mut T),這無疑是一個棘手的問題。
crossbeam-epoch
針對上述兩種情況,我們可以使用 Crossbeam-Epoch 來解決遇到的問題。它提供了 Atomic 的相關原子操作和一個延遲刪除的功能。正如其名,epoch 使用世代和延遲隊列的方式,當 local epoch 與 global epoch 相差兩代時,代表可以安全回收隊列中兩代前的內存地址,彌補了前文提到的裸指針釋放操作帶來的漏洞。crossbeam 通過 epoch 這個機制,保證了所有的對象只有在未被引用的情況下才會被刪除,避免了出現野指針的情況。
MetricMap
MetricMap 作為 Metric 的核心,內部實現是一個包裹了crossbeam_epoch::Atomic 的 HashMap。通過 crossbeam_epoch 提供的 pin(), load(),defer_destroy() 等一系列方法,實現了 lock-free 的 HashMap。
MetricMap 的實現與 go-libp2p 中的 DeepCopyMap 相似,都是通過深拷貝的方式實現 map 結構的替換。Clone() 操作在 map 的數據量較大時,對性能的影響較為明顯,后續(xù)考慮優(yōu)化相關結構。
以 store_or_modify() 方法舉例:
- 首先使用 pin() 方法"pin"住當前 thread,防止全局 epoch 升級導致當前線程的 drop() 方法被調用;
- 然后起一個 loop,循環(huán)加載 Atomic 中的 HashMap;
- 對 HashMap 解引用,由于在 rust 中解裸指針的引用是不安全的,因此需要用 unsafe 方法包裹;
- as_ref() 方法返回的是不可變引用,需要通過 clone() 得到一份新的 HashMap。如果 key 值存在,通過向閉包傳值獲取新的返回值,更新 value;否則插入新的 key-value;
- 調用 Owned::new 為新的 HashMap 分配一個在堆上的內存地址,執(zhí)行 CAS 操作;
- 如果 CAS 成功,將舊的 HashMap 地址添加到待清除的列表中,這個列表就是前文提到的延遲刪除的隊列。
/// If map contains key, replaces original value with the result that return by F.
/// Otherwise, create a new key-value and insert.
pub fn store_or_modify<F: Fn(&K, &V) -> V>(&self, key: &K, value: V, on_modify: F) {
let guard = crossbeam_epoch::pin();
loop {
let shared = self.data.load(SeqCst, &guard);
let mut new_hash = HashMap::new();
match unsafe { shared.as_ref() } {
Some(old_hash) => {
new_hash = old_hash.clone();
if let Some(old_value) = new_hash.get(key) {
let new_value = on_modify(key, old_value);
new_hash.insert(key.clone(), new_value.clone());
} else {
new_hash.insert(key.clone(), value.clone());
}
}
None => {
new_hash.insert(key.clone(), value.clone());
}
}
let owned = Owned::new(new_hash);
match self.data.compare_and_set(shared, owned, SeqCst, &guard) {
Ok(_) => {
unsafe {
guard.defer_destroy(shared);
break;
}
// break;
}
Err(_e) => {}
}
}
}
Metric
Metric 的主體實現如下,可以看到與 peer 和 protocol 相關的數據結構都是基于 MetricMap 的??倲祿膫€數和字節(jié)數大小不需要區(qū)分,所以直接使用 std 的 AtomicUize 即可:
pub struct Metric {
/// The accumulative counter of packets sent.
pkt_sent: AtomicUsize,
/// The accumulative counter of packets received.
pkt_recv: AtomicUsize,
/// The accumulative counter of bytes sent.
byte_sent: AtomicUsize,
/// The accumulative counter of bytes received.
byte_recv: AtomicUsize,
/// A hashmap that key is protocol name and value is a counter of bytes received.
protocol_in: MetricMap<ProtocolId, usize>,
/// A hashmap that key is protocol name and value is a counter of bytes sent.
protocol_out: MetricMap<ProtocolId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes received.
peer_in: MetricMap<PeerId, usize>,
/// A hashmap that key is peer_id and value is a counter of bytes sent.
peer_out: MetricMap<PeerId, usize>,
}
總結
以上是 Metric 相關結構從實現到完工,中間若有理解上的錯誤,還請各位不吝賜教。目前而言,MetricMap 的設計適合于一次新增多次修改的情況。后續(xù)考慮通過起一個 Web Server 的方式,通過 Restful API 的方式暴露相關監(jiān)控數據,方便在外部查看。
Netwarps 由國內資深的云計算和分布式技術開發(fā)團隊組成,該團隊在金融、電力、通信及互聯網行業(yè)有非常豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發(fā)中心,團隊規(guī)模30+,其中大部分為具備十年以上開發(fā)經驗的技術人員,分別來自互聯網、金融、云計算、區(qū)塊鏈以及科研機構等專業(yè)領域。
Netwarps 專注于安全存儲技術產品的研發(fā)與應用,主要產品有去中心化文件系統(tǒng)(DFS)、去中心化計算平臺(DCP),致力于提供基于去中心化網絡技術實現的分布式存儲和分布式計算平臺,具有高可用、低功耗和低網絡的技術特點,適用于物聯網、工業(yè)互聯網等場景。