摘要:在上一篇文章中,我們?cè)O(shè)計(jì)了一個(gè)基于 Actor 模式的“寫(xiě)緩沖(Write-Behind)”防抖系統(tǒng),看似美好,但是還是有消息亂序與數(shù)據(jù)丟失的隱患。本文將詳細(xì)記錄 V2 版本的重構(gòu)思路:通過(guò)引入 阻塞背壓 (Blocking Backpressure)、延遲確認(rèn) (Deferred ACK) 和 事件循環(huán) (Event Loop),構(gòu)建一個(gè)更加健壯、嚴(yán)謹(jǐn)?shù)姆蓝断到y(tǒng)。
1. 背景與挑戰(zhàn):從“跑通”到“跑穩(wěn)”
在構(gòu)建即時(shí)通訊系統(tǒng)的會(huì)話(huà)列表時(shí),核心矛盾在于海量消息吞吐與數(shù)據(jù)庫(kù)有限寫(xiě)入能力之間的沖突。業(yè)務(wù)要求每條消息都能更新會(huì)話(huà)的 last_active_time 和 digest(摘要),但直接的數(shù)據(jù)庫(kù) UPDATE 操作會(huì)導(dǎo)致嚴(yán)重的寫(xiě)放大(Write Amplification)。
上期的 V1 采用了“流水線(xiàn) + Actor”模式:利用內(nèi)存隊(duì)列暫存消息,通過(guò) Map 進(jìn)行去重合并,最后批量落庫(kù)。這種方案有三個(gè)隱患:
-
消息亂序風(fēng)險(xiǎn):利用
NAK(拒絕消息)進(jìn)行限流,會(huì)導(dǎo)致 NATS 將消息重新投遞。在重試過(guò)程中,舊消息可能排在新消息之后,破壞 FIFO 順序,導(dǎo)致會(huì)話(huà)狀態(tài)“時(shí)光倒流”。 -
數(shù)據(jù)丟失風(fēng)險(xiǎn):消息一旦執(zhí)行
ACK,若此時(shí)服務(wù)發(fā)生 OOM 或斷電,內(nèi)存中未落庫(kù)的數(shù)據(jù)將永久丟失,違背了數(shù)據(jù)可靠性原則。 - 驅(qū)動(dòng)模型冗余:依賴(lài)定時(shí)器(ScheduledExecutor)輪詢(xún),在低負(fù)載時(shí)存在空轉(zhuǎn)資源浪費(fèi),在高負(fù)載時(shí)又難以做到極致的“貪婪消費(fèi)”。
針對(duì)上述問(wèn)題,V2 版本對(duì)核心數(shù)據(jù)流轉(zhuǎn)邏輯進(jìn)行了深度重構(gòu)。
2. 核心重構(gòu)一:流量控制 —— 棄用 NAK,擁抱阻塞 (Blocking Backpressure)
2.1 隱患分析:NAK 帶來(lái)的“亂序風(fēng)暴”
初始版本為了保持消費(fèi)者線(xiàn)程的非阻塞特性,在隊(duì)列滿(mǎn)時(shí)選擇直接 NAK 消息。NATS Server 收到 NAK 后會(huì)將消息重新加入待發(fā)送隊(duì)列。
這引發(fā)了一個(gè)嚴(yán)重的邏輯漏洞:重試打破了順序。
假設(shè)用戶(hù)先后發(fā)送了消息 A(內(nèi)容:Hello)和消息 B(內(nèi)容:Bye)。若 A 被 NAK 而 B 成功入隊(duì),A 稍后被重試投遞時(shí),將排在 B 之后。系統(tǒng)處理時(shí)會(huì)誤認(rèn)為 A 是最新?tīng)顟B(tài),導(dǎo)致會(huì)話(huà)摘要錯(cuò)誤地顯示為 "Hello" 而非 "Bye"。此外,頻繁的 NAK 還會(huì)增加中間件的負(fù)載,甚至觸發(fā)最大投遞次數(shù)限制導(dǎo)致消息被丟棄。
2.2 解決方案:基于 TCP 的自然背壓
V2 版本將接收隊(duì)列替換為阻塞隊(duì)列 (BlockingQueue),并使用 put() 操作替代 offer()。
-
機(jī)制:設(shè)置極小的隊(duì)列容量(如 Size=8)。當(dāng)消費(fèi)者處理速度低于生產(chǎn)者時(shí),隊(duì)列瞬間填滿(mǎn)。此時(shí),NATS 消費(fèi)線(xiàn)程在嘗試
put時(shí)被操作系統(tǒng)掛起(Block)。 - 連鎖反應(yīng):消費(fèi)線(xiàn)程停止從 Socket 讀取數(shù)據(jù) -> 系統(tǒng)的 TCP 接收窗口(Receive Window)被填滿(mǎn) -> NATS Server 感知擁塞 -> 自動(dòng)降低對(duì)該客戶(hù)端的推送速率。
-
收益:
- 嚴(yán)格順序:消息在服務(wù)端排隊(duì)等待,進(jìn)入系統(tǒng)的順序永遠(yuǎn)是絕對(duì)的 FIFO。
- 零丟包:不再有 NAK,也就消除了因“重試次數(shù)超限”而被丟棄的風(fēng)險(xiǎn)。
3. 核心重構(gòu)二:數(shù)據(jù)安全 —— 延遲確認(rèn) (Deferred ACK) 與原子性提交
3.1 隱患分析:過(guò)早 ACK 導(dǎo)致的數(shù)據(jù)“裸奔”
初始版本遵循 接收 -> 轉(zhuǎn)換 -> ACK -> 入隊(duì) 的流程。ACK 意味著“責(zé)任轉(zhuǎn)移完成”。但在 Write-Behind 模式下,數(shù)據(jù)此時(shí)僅僅存在于易失性的內(nèi)存中。一旦發(fā)生服務(wù)宕機(jī),這部分已確認(rèn)但未持久化的數(shù)據(jù)將徹底丟失。
3.2 解決方案:持有句柄的事務(wù)性處理
為了實(shí)現(xiàn) At-Least-Once(至少一次) 的投遞保證,ACK 的時(shí)機(jī)必須后移至數(shù)據(jù)庫(kù)事務(wù)提交之后。這要求內(nèi)存中的防抖容器(Map)不僅要存儲(chǔ)業(yè)務(wù)數(shù)據(jù),還必須持有原始消息的引用(Handle)。
-
數(shù)據(jù)結(jié)構(gòu)升級(jí):Map 的 Value 包裝為
MessageWrapper,包含SessionUpdateEvent(業(yè)務(wù)對(duì)象)和Message(NATS 原生句柄)。 -
智能合并策略:
- 覆蓋場(chǎng)景:當(dāng)新消息更新了同一個(gè) Session,舊消息的數(shù)據(jù)價(jià)值失效。此時(shí)應(yīng)立即 ACK 舊消息,僅保留新消息在 Map 中。
-
持久化場(chǎng)景:僅當(dāng)
flushAll()方法成功執(zhí)行完數(shù)據(jù)庫(kù)batchUpdate后,才遍歷當(dāng)前批次的所有 Message 執(zhí)行ack()。 -
異常兜底:若入庫(kù)失敗,則對(duì)該批次所有 Message 執(zhí)行
nak(),觸發(fā) NATS 重發(fā),等待下一次處理。
4. 核心重構(gòu)三:驅(qū)動(dòng)模型 —— 單線(xiàn)程事件循環(huán) (Event Loop)
4.1 隱患分析:定時(shí)器的局限性
使用 ScheduledExecutorService 存在天然的滯后性。即使隊(duì)列瞬間被打滿(mǎn),消費(fèi)者也必須等待定時(shí)器觸發(fā)。且此前嘗試的虛擬線(xiàn)程模型并不適合這種“單例、長(zhǎng)期駐留、CPU 密集型(Map 操作)”的任務(wù)。
4.2 解決方案:貪婪的單線(xiàn)程 Loop
V2 版本采用了類(lèi)似 Redis 或 Node.js 的 Single Thread Event Loop 模型。創(chuàng)建一個(gè)長(zhǎng)期駐留的平臺(tái)線(xiàn)程,運(yùn)行一個(gè)永不停止的 while 循環(huán)。
-
貪婪消費(fèi):使用
queue.poll(timeout)。一旦有數(shù)據(jù),立即喚醒處理;處理完一條后,繼續(xù)循環(huán)嘗試獲取,最大化吞吐。 -
自帶心跳:
poll的超時(shí)時(shí)間(如 500ms)即為“最大落庫(kù)延遲”。如果系統(tǒng)空閑,線(xiàn)程休眠;一旦超時(shí)醒來(lái),強(qiáng)制檢查是否需要刷盤(pán)。 - 無(wú)鎖設(shè)計(jì):由于數(shù)據(jù)讀取、Map 合并、數(shù)據(jù)庫(kù)寫(xiě)入均在同一個(gè)線(xiàn)程內(nèi)串行執(zhí)行,徹底消除了并發(fā)競(jìng)爭(zhēng),既安全又高效。
5. 核心代碼實(shí)現(xiàn) (V2)
Talk is cheap. 下面是經(jīng)過(guò) V2 重構(gòu)后的核心代碼實(shí)現(xiàn)。請(qǐng)注意代碼是如何通過(guò) BlockingQueue 和 LinkedHashMap 的組合來(lái)實(shí)現(xiàn)上述流控與安全邏輯的。
5.1 整體結(jié)構(gòu)與雙隊(duì)列定義
我們摒棄了復(fù)雜的第三方緩存組件,直接使用 JVM 內(nèi)部數(shù)據(jù)結(jié)構(gòu),減少網(wǎng)絡(luò)開(kāi)銷(xiāo)。
@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {
// 【Queue A】接收隊(duì)列:容量極小 (8),核心作用是建立"背壓"
// 當(dāng)消費(fèi)者處理不過(guò)來(lái)時(shí),這個(gè)隊(duì)列會(huì)滿(mǎn),從而阻塞 NATS 客戶(hù)端線(xiàn)程
private static final int RECEIVE_QUEUE_SIZE = 8;
private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);
// 【Queue B】防抖容器:LRU 模式,用于合并重復(fù)的 Session 更新
// accessOrder=true 確保我們能在容量滿(mǎn)時(shí)擠出"最老"的數(shù)據(jù)
private static final int PENDING_MAP_SIZE = 1000;
private final LinkedHashMap<String, MessageWrapper> pendingMap =
new LinkedHashMap<>(16, 0.75f, true);
// 單線(xiàn)程處理器:系統(tǒng)的"心臟"
private final Thread processingThread;
public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
this.sessionUpdateService = sessionUpdateService;
// 啟動(dòng)一個(gè)守護(hù)線(xiàn)程,專(zhuān)門(mén)負(fù)責(zé)"消費(fèi) -> 合并 -> 落庫(kù)"的全流程
processingThread = new Thread(this::processLoop, "session-update-processor");
processingThread.setDaemon(true);
processingThread.start();
}
// 包裝類(lèi):持有 NATS 原始句柄,直到入庫(kù)成功才 ACK
private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}
5.2 生產(chǎn)者:優(yōu)雅的阻塞背壓
這是 V2 版本最大的改變。我們不再使用 offer + NAK,而是直接使用 put。
@SneakyThrows
@Override
public void onMessage(Message msg) {
// 【關(guān)鍵點(diǎn)】阻塞式寫(xiě)入
// 如果 receiveQueue 滿(mǎn)了,當(dāng)前線(xiàn)程(NATS Client 線(xiàn)程)會(huì)在此掛起 (WAITING)
// 這會(huì)導(dǎo)致 TCP 接收窗口填滿(mǎn),進(jìn)而讓 NATS Server 自動(dòng)降低推送速率
// 保證了所有消息嚴(yán)格 FIFO,不亂序,不丟棄
receiveQueue.put(msg);
}
5.3 消費(fèi)者:貪婪的事件循環(huán) (Event Loop)
我們移除了 ScheduledExecutor,改用 poll(timeout) 機(jī)制。這既保證了高負(fù)載下的實(shí)時(shí)消費(fèi),又充當(dāng)了低負(fù)載下的心跳機(jī)制。
private void processLoop() {
while (true) {
try {
// 1. 貪婪獲?。簬?500ms 超時(shí)
// 有數(shù)據(jù) -> 立即返回,微秒級(jí)延遲
// 無(wú)數(shù)據(jù) -> 等待 500ms,相當(dāng)于心跳間隔
Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
if (msg != null) {
addToPendingMap(msg); // 進(jìn)貨:放入 Map 合并
} else {
flushAll(); // 心跳:超時(shí)沒(méi)數(shù)據(jù),強(qiáng)制刷盤(pán),防止數(shù)據(jù)滯留
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
throw (RuntimeException) e;
}
log.error("Error processing queue", e);
}
}
}
5.4 核心邏輯:智能合并與延遲 ACK
這是解決“亂序”與“丟數(shù)據(jù)”的關(guān)鍵邏輯。注意我們是如何處理 ACK 的:只 ACK 被覆蓋的舊消息,保留最新消息的句柄直到落庫(kù)。
private void addToPendingMap(Message msg) {
// ... 解析代碼略 (ConvertUtils) ...
// 智能合并邏輯 (compute)
pendingMap.compute(sessionKey, (key, existing) -> {
if (existing != null) {
if (existing.event().getTimestamp() > event.getTimestamp()) {
// 【場(chǎng)景 A:亂序】
// 內(nèi)存里的消息比新來(lái)的更"新",說(shuō)明新消息是遲到的舊狀態(tài)
// 策略:直接 ACK 新消息(丟棄),保留內(nèi)存里的現(xiàn)有值
msg.ack();
return existing;
} else {
// 【場(chǎng)景 B:正常更新/覆蓋】
// 新消息是最新的,舊消息已經(jīng)沒(méi)用了
// 策略:ACK 舊消息(它完成了歷史使命),將新消息放入 Map
existing.message().ack();
return new MessageWrapper(msg, event);
}
}
// 【場(chǎng)景 C:新值】直接存入
return new MessageWrapper(msg, event);
});
// 兜底策略:如果 Map 滿(mǎn)了,主動(dòng)擠出最老的數(shù)據(jù)單獨(dú)落庫(kù)
if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
evictOldest();
}
}
5.5 提交階段:原子性批量落庫(kù)
最后,在刷盤(pán)階段,我們嚴(yán)格遵循 先寫(xiě)庫(kù),后 ACK 的原則,實(shí)現(xiàn)了 At-Least-Once 語(yǔ)義。
private void flushAll() {
if (pendingMap.isEmpty()) return;
try {
// 1. 數(shù)據(jù)庫(kù)批量寫(xiě)入 (Batch Insert/Update)
int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
// 2. 只有入庫(kù)成功,才對(duì)這批消息進(jìn)行 ACK
// 如果 DB 報(bào)錯(cuò),這里不會(huì)執(zhí)行 ACK,NATS 會(huì)在超時(shí)后自動(dòng)重發(fā)所有消息
for (MessageWrapper wrapper : pendingMap.values()) {
wrapper.message().ack();
}
pendingMap.clear();
log.debug("Batch updated {} sessions", updated);
} catch (Exception e) {
// 異常處理:日志記錄
// 注意:這里沒(méi)有顯式調(diào)用 NAK,而是依靠 NATS 的 AckWait 機(jī)制自動(dòng)重試
// 或者也可以在這里顯式調(diào)用 nak() 加速重試
log.error("Batch update failed...", e);
}
}
6. 總結(jié)
從 V1 到 V2 的演進(jìn),體現(xiàn)了架構(gòu)設(shè)計(jì)中從“功能實(shí)現(xiàn)”到“生產(chǎn)高可用”的思維轉(zhuǎn)變:
- 一致性?xún)?yōu)先:在消息順序和數(shù)據(jù)不丟面前,非阻塞的極致吞吐量是可以被權(quán)衡的。阻塞隊(duì)列提供了系統(tǒng)自我保護(hù)的底線(xiàn),防止了雪崩和亂序。
- 事務(wù)延伸:通過(guò)延遲 ACK,將中間件的消息生命周期與數(shù)據(jù)庫(kù)事務(wù)強(qiáng)綁定,實(shí)現(xiàn)了跨系統(tǒng)的最終一致性。
- 架構(gòu)極簡(jiǎn):單線(xiàn)程模型在 I/O 密集型(數(shù)據(jù)庫(kù)寫(xiě))與計(jì)算密集型(Map 操作)混合的場(chǎng)景下,通過(guò)串行化設(shè)計(jì)去除了復(fù)雜的鎖機(jī)制,反而提升了系統(tǒng)的穩(wěn)定性和可維護(hù)性。
通過(guò)這次優(yōu)化,系統(tǒng)成功填補(bǔ)了高并發(fā)下的并發(fā)陷阱,不僅解決了寫(xiě)放大問(wèn)題,更確保了會(huì)話(huà)數(shù)據(jù)的精準(zhǔn)與安全。
本文由mdnice多平臺(tái)發(fā)布