【實戰(zhàn):超實用,10分鐘教你,服務(wù)間數(shù)據(jù)同步】

背景

最近接手了個項目,項目代碼不多,但是問題不少,尤其是項目中涉及了服務(wù)之間的數(shù)據(jù)同步。數(shù)據(jù)不是丟,就是亂 。每天提心吊膽 ,生怕又有數(shù)據(jù)不一致了,需要手動介入處理 ,偶爾周末還要個”意外驚喜”。
數(shù)據(jù)同步不一致 ,真是讓人頭禿的病。下面一起來分析下 ,對癥下藥,

數(shù)據(jù)同步

場景重現(xiàn)

項目內(nèi)有2個服務(wù)需要進(jìn)行數(shù)據(jù)同步。
服務(wù)A是個訂單系統(tǒng), 不斷產(chǎn)生訂單數(shù)據(jù),
服務(wù)B是個計費系統(tǒng),需要拉取到服務(wù)A里的訂單信息,留存后 ,做些計費等后置處理。

這里涉及到 服務(wù)間的數(shù)據(jù)同步。服務(wù)間傳遞的數(shù)據(jù)格式如下:

{
    id: "ID000001",        // message id 
    timestamp: 1733281956, // message timestamp
    data: {
        orderId: "ORD000001", // id
        orderStatus: "PROCESSING", // status
        ...
        updatetime: 1733281950, // order update time
        createtime: 1733281000  // order create time 
    }
}

服務(wù)間數(shù)據(jù)同步

服務(wù)之間的數(shù)據(jù)同步,服務(wù)A 向 服務(wù)B同步數(shù)據(jù)。服務(wù)A中包含兩類數(shù)據(jù) ,

  • 一些正在進(jìn)行的數(shù)據(jù)(綠色部分),這些數(shù)據(jù)后續(xù)可能還會進(jìn)行更新。比如正在進(jìn)行的訂單,用戶后續(xù)可能還會修改。
  • 一些已經(jīng)完結(jié)的數(shù)據(jù)(灰色部分),這些數(shù)據(jù)已經(jīng)完結(jié),不會在更新。

以下是個圖示,

問題匯總

幾類典型問題,

  • 數(shù)據(jù)重復(fù)插入:相同數(shù)據(jù)因為接收到了多次,被重復(fù)插入。
  • 老數(shù)據(jù)覆蓋新數(shù)據(jù):老數(shù)據(jù)由于網(wǎng)絡(luò)延遲或者超時重傳,接收時是亂序的,后到達(dá)接收端,覆蓋了先到達(dá)的新數(shù)據(jù)。
  • 數(shù)據(jù)丟失:有些數(shù)據(jù)可能只發(fā)了一次,中途由于網(wǎng)絡(luò)等原因丟失了。
  • 數(shù)據(jù)更新太慢:有些數(shù)據(jù)的對時效性比較高。
  • 數(shù)據(jù)不完整:接收到了數(shù)據(jù),但是數(shù)據(jù)本身不完整。

考慮因素

針對于上述問題 ,則抽取出以下要考慮的因素,(這里被同步方的存儲方式,使用的是數(shù)據(jù)庫存儲。當(dāng)然如果是其它存儲方式,也可按此因素考慮設(shè)計)

1)數(shù)據(jù)防重
唯一id,更新時應(yīng)該保障id在數(shù)據(jù)庫中的唯一性。

2)數(shù)據(jù)防亂序
在數(shù)據(jù)更新的時候,必須要求拿到的數(shù)據(jù)比數(shù)據(jù)庫里的數(shù)據(jù)新,才會去數(shù)據(jù)庫里更新。

3)數(shù)據(jù)防丟
同步的數(shù)據(jù)不應(yīng)該丟失,或者 丟失后,也有補償機(jī)制。

4)數(shù)據(jù)同步時效性
要考慮業(yè)務(wù)范圍內(nèi)能承受的延時 。

5)數(shù)據(jù)完整性
被同步方的數(shù)據(jù)個數(shù),和數(shù)據(jù)內(nèi)容應(yīng)該是一致的。

實踐參考

由上述可知,做個數(shù)據(jù)同步,要考慮如下因素,
1)數(shù)據(jù)防重
2)數(shù)據(jù)防亂序
3)數(shù)據(jù)防丟
4)數(shù)據(jù)同步時效性
5)數(shù)據(jù)完整性
廢話不多說,上方案,逐點擊破。

解決方案

1)數(shù)據(jù)防重 2)數(shù)據(jù)防亂序
基于一些常用數(shù)據(jù)庫 mongodb / mysql等,1個sql語句, 直接解決這2個問題

# 示例數(shù)據(jù)
id = "ID00001"
data = {
    "timestamp": 1727891234,
    "some_field": "some_value"
}

# 定義過濾器和更新文檔
# data timestamp 代表數(shù)據(jù)的原始時間
filter = {
    "id": id,
    "updatetime": {"$lte": data["timestamp"]}
}
update = {"$set": data}

# 執(zhí)行 FindOneAndUpdate 操作
doc = collection.find_one_and_update(
    filter,
    update,
    upsert=True,
    return_document=ReturnDocument.AFTER
)

find_one_and_update ,這里重點看下參數(shù) upsert=True 以及 "$lte": data["timestamp"] 。

  • upsert :配合主鍵id,沒有則插入,有則更新。 可以保障數(shù)據(jù)的在庫中的唯一性。
  • $lte :進(jìn)行數(shù)據(jù)篩選,更新的數(shù)據(jù)比數(shù)據(jù)庫中已有數(shù)據(jù)時間大,才能更新。 新數(shù)據(jù)才能存儲進(jìn)來,老數(shù)據(jù)不可以存進(jìn)來。

如果id存在但是時間靠后,請求進(jìn)來,也就是老數(shù)據(jù)進(jìn)到這里,是不應(yīng)該覆蓋數(shù)據(jù)庫中已有的數(shù)據(jù)的。同時這里明確下,補充下所有可能出現(xiàn)的情況,以及對應(yīng)現(xiàn)象。

  • 請求的id 在數(shù)據(jù)庫中不存在
    • 同步的數(shù)據(jù) 第一次進(jìn)來,這種數(shù)據(jù)庫中之前也沒有存儲,找不到,直接插入。
  • 請求的id 在數(shù)據(jù)庫中已存在
    • 如果請求插入的數(shù)據(jù)時間是老的,也就是老數(shù)據(jù)來了。$lte去找數(shù)據(jù)庫中小于老數(shù)據(jù)的時間,而數(shù)據(jù)庫中已存的時間比老數(shù)據(jù)的時間一定大, 所以filter中的語句查不出,此時同時因為有upsert=True,則會嘗試插入一條數(shù)據(jù),又由于id在數(shù)據(jù)庫中唯一,則會插入失敗,報錯 duplicated key insert error。

    • 如果請求插入的數(shù)據(jù)時間是新的,也就是新數(shù)據(jù)來了,根據(jù)filter語句 ,可以找到,找到并更新。

聰明的小伙伴看到這里,其實可以發(fā)現(xiàn)上述整體的處理思路,其實就是參考了 樂觀鎖

3)數(shù)據(jù)防丟

數(shù)據(jù)防丟,最終 同步方 和 被同步方 的數(shù)據(jù)要一致。
這里需要考慮,數(shù)據(jù)是否被同步到,如果沒有同步到,還有哪些沒同步過來。
這里可以按如下進(jìn)行考慮,

  • 如果 能確保數(shù)據(jù)一定被同步到。

一般是使用包含有ack機(jī)制的消息隊列服務(wù),比如 Apache Kafka,RabbitMQ。這些消息隊列服務(wù)可以確保,發(fā)送端(同步方)發(fā)送數(shù)據(jù)后, 消費端(被同步方)一定能接收到數(shù)據(jù),從而保障數(shù)據(jù)一定可以被同步到。

  • 如果 不能確保數(shù)據(jù)一定被同步到,則需要使用 補償機(jī)制

有時在某些場景下 ,同步的內(nèi)容不多,引入消息隊列服務(wù),會比較重。所以在不引入這種服務(wù)情況下,無法確保數(shù)據(jù)一定會被同步到,這時可以引入 補償機(jī)制。

補償機(jī)制包括

  • 全量同步:天級別把數(shù)據(jù)全量同步過去,數(shù)據(jù)量少的時候適用。

  • 增量同步: 增量同步 , 舉個例子,下次同步數(shù)據(jù)時,

    • 以上一次數(shù)據(jù)最后更新時間做為起始時間,當(dāng)前時間做為最后時間,拉取這段時間內(nèi)的增量數(shù)據(jù)。
    • 也有些業(yè)務(wù)數(shù)據(jù)包含類似active或者status字段,那么增量的部分則該是active為true 或者 status為in progress的數(shù)據(jù)。

    這里只是舉例,具體怎么做增量補償要以業(yè)務(wù)數(shù)據(jù)本身為準(zhǔn),

4)數(shù)據(jù)同步時效性

最實時:時效性最高,毫秒ms級別,這種一般要求 同步方數(shù)據(jù)變化時,主動推送數(shù)據(jù) 。
次實時:時效性一般,秒s級別,這種 被同步方 可以定時輪詢拉取同步方的數(shù)據(jù)。
不實時:時效性最低,要求也較寬松, 這個定期同步 ,甚至天級別的同步即可。

最實時的需要 同步方主動推送數(shù)據(jù),可以借助消息隊列 發(fā)布-訂閱模型,有新的數(shù)據(jù)產(chǎn)生也往隊列里放一份,需要實時更新的接收端訂閱這個隊列即可。當(dāng)然也可以自己寫推送和接收代碼實現(xiàn)。

次實時和不實時的, 可以考慮常規(guī)做法,定頻拉取同步。拉取頻率根據(jù)業(yè)務(wù)場景的承受范圍定即可。

5)數(shù)據(jù)完整性

完整性可以從數(shù)據(jù)總數(shù)和數(shù)據(jù)內(nèi)容兩方面考慮。

  • 數(shù)據(jù)總數(shù):可能一條帶條件的count SQL語句就搞定了
  • 數(shù)據(jù)內(nèi)容:這個比對起來比較麻煩, 可以對數(shù)據(jù)的關(guān)鍵字段的key和value值提取,排序后 ,進(jìn)一步進(jìn)行hash,然后比對兩邊的hash值。

數(shù)據(jù)完整性,更像是數(shù)據(jù)同步后的驗證,可以按需做即可。

結(jié)語

至此,針對數(shù)據(jù)同步的典型問題,給出對應(yīng)方案,繪制到我們最開始的圖上。


數(shù)據(jù)同步,穩(wěn)了!


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容