背景
最近接手了個項目,項目代碼不多,但是問題不少,尤其是項目中涉及了服務(wù)之間的數(shù)據(jù)同步。數(shù)據(jù)不是丟,就是亂 。每天提心吊膽 ,生怕又有數(shù)據(jù)不一致了,需要手動介入處理 ,偶爾周末還要個”意外驚喜”。
數(shù)據(jù)同步不一致 ,真是讓人頭禿的病。下面一起來分析下 ,對癥下藥,

數(shù)據(jù)同步
場景重現(xiàn)
項目內(nèi)有2個服務(wù)需要進(jìn)行數(shù)據(jù)同步。
服務(wù)A是個訂單系統(tǒng), 不斷產(chǎn)生訂單數(shù)據(jù),
服務(wù)B是個計費系統(tǒng),需要拉取到服務(wù)A里的訂單信息,留存后 ,做些計費等后置處理。
這里涉及到 服務(wù)間的數(shù)據(jù)同步。服務(wù)間傳遞的數(shù)據(jù)格式如下:
{
id: "ID000001", // message id
timestamp: 1733281956, // message timestamp
data: {
orderId: "ORD000001", // id
orderStatus: "PROCESSING", // status
...
updatetime: 1733281950, // order update time
createtime: 1733281000 // order create time
}
}
服務(wù)間數(shù)據(jù)同步
服務(wù)之間的數(shù)據(jù)同步,服務(wù)A 向 服務(wù)B同步數(shù)據(jù)。服務(wù)A中包含兩類數(shù)據(jù) ,
- 一些正在進(jìn)行的數(shù)據(jù)(綠色部分),這些數(shù)據(jù)后續(xù)可能還會進(jìn)行更新。比如正在進(jìn)行的訂單,用戶后續(xù)可能還會修改。
- 一些已經(jīng)完結(jié)的數(shù)據(jù)(灰色部分),這些數(shù)據(jù)已經(jīng)完結(jié),不會在更新。
以下是個圖示,

問題匯總
幾類典型問題,
- 數(shù)據(jù)重復(fù)插入:相同數(shù)據(jù)因為接收到了多次,被重復(fù)插入。
- 老數(shù)據(jù)覆蓋新數(shù)據(jù):老數(shù)據(jù)由于網(wǎng)絡(luò)延遲或者超時重傳,接收時是亂序的,后到達(dá)接收端,覆蓋了先到達(dá)的新數(shù)據(jù)。
- 數(shù)據(jù)丟失:有些數(shù)據(jù)可能只發(fā)了一次,中途由于網(wǎng)絡(luò)等原因丟失了。
- 數(shù)據(jù)更新太慢:有些數(shù)據(jù)的對時效性比較高。
- 數(shù)據(jù)不完整:接收到了數(shù)據(jù),但是數(shù)據(jù)本身不完整。
考慮因素
針對于上述問題 ,則抽取出以下要考慮的因素,(這里被同步方的存儲方式,使用的是數(shù)據(jù)庫存儲。當(dāng)然如果是其它存儲方式,也可按此因素考慮設(shè)計)
1)數(shù)據(jù)防重
唯一id,更新時應(yīng)該保障id在數(shù)據(jù)庫中的唯一性。
2)數(shù)據(jù)防亂序
在數(shù)據(jù)更新的時候,必須要求拿到的數(shù)據(jù)比數(shù)據(jù)庫里的數(shù)據(jù)新,才會去數(shù)據(jù)庫里更新。
3)數(shù)據(jù)防丟
同步的數(shù)據(jù)不應(yīng)該丟失,或者 丟失后,也有補償機(jī)制。
4)數(shù)據(jù)同步時效性
要考慮業(yè)務(wù)范圍內(nèi)能承受的延時 。
5)數(shù)據(jù)完整性
被同步方的數(shù)據(jù)個數(shù),和數(shù)據(jù)內(nèi)容應(yīng)該是一致的。
實踐參考
由上述可知,做個數(shù)據(jù)同步,要考慮如下因素,
1)數(shù)據(jù)防重
2)數(shù)據(jù)防亂序
3)數(shù)據(jù)防丟
4)數(shù)據(jù)同步時效性
5)數(shù)據(jù)完整性
廢話不多說,上方案,逐點擊破。
解決方案
1)數(shù)據(jù)防重 2)數(shù)據(jù)防亂序
基于一些常用數(shù)據(jù)庫 mongodb / mysql等,1個sql語句, 直接解決這2個問題
# 示例數(shù)據(jù)
id = "ID00001"
data = {
"timestamp": 1727891234,
"some_field": "some_value"
}
# 定義過濾器和更新文檔
# data timestamp 代表數(shù)據(jù)的原始時間
filter = {
"id": id,
"updatetime": {"$lte": data["timestamp"]}
}
update = {"$set": data}
# 執(zhí)行 FindOneAndUpdate 操作
doc = collection.find_one_and_update(
filter,
update,
upsert=True,
return_document=ReturnDocument.AFTER
)
find_one_and_update ,這里重點看下參數(shù) upsert=True 以及 "$lte": data["timestamp"] 。
- upsert :配合主鍵id,沒有則插入,有則更新。 可以保障數(shù)據(jù)的在庫中的唯一性。
- $lte :進(jìn)行數(shù)據(jù)篩選,更新的數(shù)據(jù)比數(shù)據(jù)庫中已有數(shù)據(jù)時間大,才能更新。 新數(shù)據(jù)才能存儲進(jìn)來,老數(shù)據(jù)不可以存進(jìn)來。
如果id存在但是時間靠后,請求進(jìn)來,也就是老數(shù)據(jù)進(jìn)到這里,是不應(yīng)該覆蓋數(shù)據(jù)庫中已有的數(shù)據(jù)的。同時這里明確下,補充下所有可能出現(xiàn)的情況,以及對應(yīng)現(xiàn)象。
- 請求的id 在數(shù)據(jù)庫中不存在
- 同步的數(shù)據(jù) 第一次進(jìn)來,這種數(shù)據(jù)庫中之前也沒有存儲,找不到,直接插入。
- 請求的id 在數(shù)據(jù)庫中已存在
如果請求插入的數(shù)據(jù)時間是老的,也就是老數(shù)據(jù)來了。$lte去找數(shù)據(jù)庫中小于老數(shù)據(jù)的時間,而數(shù)據(jù)庫中已存的時間比老數(shù)據(jù)的時間一定大, 所以filter中的語句查不出,此時同時因為有upsert=True,則會嘗試插入一條數(shù)據(jù),又由于id在數(shù)據(jù)庫中唯一,則會插入失敗,報錯 duplicated key insert error。
如果請求插入的數(shù)據(jù)時間是新的,也就是新數(shù)據(jù)來了,根據(jù)filter語句 ,可以找到,找到并更新。
聰明的小伙伴看到這里,其實可以發(fā)現(xiàn)上述整體的處理思路,其實就是參考了 樂觀鎖。
3)數(shù)據(jù)防丟
數(shù)據(jù)防丟,最終 同步方 和 被同步方 的數(shù)據(jù)要一致。
這里需要考慮,數(shù)據(jù)是否被同步到,如果沒有同步到,還有哪些沒同步過來。
這里可以按如下進(jìn)行考慮,
- 如果 能確保數(shù)據(jù)一定被同步到。
一般是使用包含有ack機(jī)制的消息隊列服務(wù),比如 Apache Kafka,RabbitMQ。這些消息隊列服務(wù)可以確保,發(fā)送端(同步方)發(fā)送數(shù)據(jù)后, 消費端(被同步方)一定能接收到數(shù)據(jù),從而保障數(shù)據(jù)一定可以被同步到。
- 如果 不能確保數(shù)據(jù)一定被同步到,則需要使用 補償機(jī)制 。
有時在某些場景下 ,同步的內(nèi)容不多,引入消息隊列服務(wù),會比較重。所以在不引入這種服務(wù)情況下,無法確保數(shù)據(jù)一定會被同步到,這時可以引入 補償機(jī)制。
補償機(jī)制包括
全量同步:天級別把數(shù)據(jù)全量同步過去,數(shù)據(jù)量少的時候適用。
-
增量同步: 增量同步 , 舉個例子,下次同步數(shù)據(jù)時,
- 以上一次數(shù)據(jù)最后更新時間做為起始時間,當(dāng)前時間做為最后時間,拉取這段時間內(nèi)的增量數(shù)據(jù)。
- 也有些業(yè)務(wù)數(shù)據(jù)包含類似active或者status字段,那么增量的部分則該是active為true 或者 status為in progress的數(shù)據(jù)。
這里只是舉例,具體怎么做增量補償要以業(yè)務(wù)數(shù)據(jù)本身為準(zhǔn),
4)數(shù)據(jù)同步時效性
最實時:時效性最高,毫秒ms級別,這種一般要求 同步方數(shù)據(jù)變化時,主動推送數(shù)據(jù) 。
次實時:時效性一般,秒s級別,這種 被同步方 可以定時輪詢拉取同步方的數(shù)據(jù)。
不實時:時效性最低,要求也較寬松, 這個定期同步 ,甚至天級別的同步即可。
最實時的需要 同步方主動推送數(shù)據(jù),可以借助消息隊列 發(fā)布-訂閱模型,有新的數(shù)據(jù)產(chǎn)生也往隊列里放一份,需要實時更新的接收端訂閱這個隊列即可。當(dāng)然也可以自己寫推送和接收代碼實現(xiàn)。
次實時和不實時的, 可以考慮常規(guī)做法,定頻拉取同步。拉取頻率根據(jù)業(yè)務(wù)場景的承受范圍定即可。
5)數(shù)據(jù)完整性
完整性可以從數(shù)據(jù)總數(shù)和數(shù)據(jù)內(nèi)容兩方面考慮。
- 數(shù)據(jù)總數(shù):可能一條帶條件的count SQL語句就搞定了
- 數(shù)據(jù)內(nèi)容:這個比對起來比較麻煩, 可以對數(shù)據(jù)的關(guān)鍵字段的key和value值提取,排序后 ,進(jìn)一步進(jìn)行hash,然后比對兩邊的hash值。
數(shù)據(jù)完整性,更像是數(shù)據(jù)同步后的驗證,可以按需做即可。
結(jié)語
至此,針對數(shù)據(jù)同步的典型問題,給出對應(yīng)方案,繪制到我們最開始的圖上。

數(shù)據(jù)同步,穩(wěn)了!
