會(huì)話(huà)更新的防抖進(jìn)化 —— 填補(bǔ)“亂序”與“丟數(shù)據(jù)”的深坑

摘要:在上一篇文章中,我們?cè)O(shè)計(jì)了一個(gè)基于 Actor 模式的“寫(xiě)緩沖(Write-Behind)”防抖系統(tǒng),看似美好,但是還是有消息亂序與數(shù)據(jù)丟失的隱患。本文將詳細(xì)記錄 V2 版本的重構(gòu)思路:通過(guò)引入 阻塞背壓 (Blocking Backpressure)、延遲確認(rèn) (Deferred ACK)事件循環(huán) (Event Loop),構(gòu)建一個(gè)更加健壯、嚴(yán)謹(jǐn)?shù)姆蓝断到y(tǒng)。


1. 背景與挑戰(zhàn):從“跑通”到“跑穩(wěn)”

在構(gòu)建即時(shí)通訊系統(tǒng)的會(huì)話(huà)列表時(shí),核心矛盾在于海量消息吞吐數(shù)據(jù)庫(kù)有限寫(xiě)入能力之間的沖突。業(yè)務(wù)要求每條消息都能更新會(huì)話(huà)的 last_active_timedigest(摘要),但直接的數(shù)據(jù)庫(kù) UPDATE 操作會(huì)導(dǎo)致嚴(yán)重的寫(xiě)放大(Write Amplification)。

上期的 V1 采用了“流水線(xiàn) + Actor”模式:利用內(nèi)存隊(duì)列暫存消息,通過(guò) Map 進(jìn)行去重合并,最后批量落庫(kù)。這種方案有三個(gè)隱患:

  1. 消息亂序風(fēng)險(xiǎn):利用 NAK(拒絕消息)進(jìn)行限流,會(huì)導(dǎo)致 NATS 將消息重新投遞。在重試過(guò)程中,舊消息可能排在新消息之后,破壞 FIFO 順序,導(dǎo)致會(huì)話(huà)狀態(tài)“時(shí)光倒流”。
  2. 數(shù)據(jù)丟失風(fēng)險(xiǎn):消息一旦執(zhí)行 ACK,若此時(shí)服務(wù)發(fā)生 OOM 或斷電,內(nèi)存中未落庫(kù)的數(shù)據(jù)將永久丟失,違背了數(shù)據(jù)可靠性原則。
  3. 驅(qū)動(dòng)模型冗余:依賴(lài)定時(shí)器(ScheduledExecutor)輪詢(xún),在低負(fù)載時(shí)存在空轉(zhuǎn)資源浪費(fèi),在高負(fù)載時(shí)又難以做到極致的“貪婪消費(fèi)”。

針對(duì)上述問(wèn)題,V2 版本對(duì)核心數(shù)據(jù)流轉(zhuǎn)邏輯進(jìn)行了深度重構(gòu)。


2. 核心重構(gòu)一:流量控制 —— 棄用 NAK,擁抱阻塞 (Blocking Backpressure)

2.1 隱患分析:NAK 帶來(lái)的“亂序風(fēng)暴”

初始版本為了保持消費(fèi)者線(xiàn)程的非阻塞特性,在隊(duì)列滿(mǎn)時(shí)選擇直接 NAK 消息。NATS Server 收到 NAK 后會(huì)將消息重新加入待發(fā)送隊(duì)列。

這引發(fā)了一個(gè)嚴(yán)重的邏輯漏洞:重試打破了順序。
假設(shè)用戶(hù)先后發(fā)送了消息 A(內(nèi)容:Hello)和消息 B(內(nèi)容:Bye)。若 A 被 NAK 而 B 成功入隊(duì),A 稍后被重試投遞時(shí),將排在 B 之后。系統(tǒng)處理時(shí)會(huì)誤認(rèn)為 A 是最新?tīng)顟B(tài),導(dǎo)致會(huì)話(huà)摘要錯(cuò)誤地顯示為 "Hello" 而非 "Bye"。此外,頻繁的 NAK 還會(huì)增加中間件的負(fù)載,甚至觸發(fā)最大投遞次數(shù)限制導(dǎo)致消息被丟棄。

2.2 解決方案:基于 TCP 的自然背壓

V2 版本將接收隊(duì)列替換為阻塞隊(duì)列 (BlockingQueue),并使用 put() 操作替代 offer()。

  • 機(jī)制:設(shè)置極小的隊(duì)列容量(如 Size=8)。當(dāng)消費(fèi)者處理速度低于生產(chǎn)者時(shí),隊(duì)列瞬間填滿(mǎn)。此時(shí),NATS 消費(fèi)線(xiàn)程在嘗試 put 時(shí)被操作系統(tǒng)掛起(Block)。
  • 連鎖反應(yīng):消費(fèi)線(xiàn)程停止從 Socket 讀取數(shù)據(jù) -> 系統(tǒng)的 TCP 接收窗口(Receive Window)被填滿(mǎn) -> NATS Server 感知擁塞 -> 自動(dòng)降低對(duì)該客戶(hù)端的推送速率。
  • 收益
    • 嚴(yán)格順序:消息在服務(wù)端排隊(duì)等待,進(jìn)入系統(tǒng)的順序永遠(yuǎn)是絕對(duì)的 FIFO。
    • 零丟包:不再有 NAK,也就消除了因“重試次數(shù)超限”而被丟棄的風(fēng)險(xiǎn)。

3. 核心重構(gòu)二:數(shù)據(jù)安全 —— 延遲確認(rèn) (Deferred ACK) 與原子性提交

3.1 隱患分析:過(guò)早 ACK 導(dǎo)致的數(shù)據(jù)“裸奔”

初始版本遵循 接收 -> 轉(zhuǎn)換 -> ACK -> 入隊(duì) 的流程。ACK 意味著“責(zé)任轉(zhuǎn)移完成”。但在 Write-Behind 模式下,數(shù)據(jù)此時(shí)僅僅存在于易失性的內(nèi)存中。一旦發(fā)生服務(wù)宕機(jī),這部分已確認(rèn)但未持久化的數(shù)據(jù)將徹底丟失。

3.2 解決方案:持有句柄的事務(wù)性處理

為了實(shí)現(xiàn) At-Least-Once(至少一次) 的投遞保證,ACK 的時(shí)機(jī)必須后移至數(shù)據(jù)庫(kù)事務(wù)提交之后。這要求內(nèi)存中的防抖容器(Map)不僅要存儲(chǔ)業(yè)務(wù)數(shù)據(jù),還必須持有原始消息的引用(Handle)。

  • 數(shù)據(jù)結(jié)構(gòu)升級(jí):Map 的 Value 包裝為 MessageWrapper,包含 SessionUpdateEvent(業(yè)務(wù)對(duì)象)和 Message(NATS 原生句柄)。
  • 智能合并策略
    • 覆蓋場(chǎng)景:當(dāng)新消息更新了同一個(gè) Session,舊消息的數(shù)據(jù)價(jià)值失效。此時(shí)應(yīng)立即 ACK 舊消息,僅保留新消息在 Map 中。
    • 持久化場(chǎng)景:僅當(dāng) flushAll() 方法成功執(zhí)行完數(shù)據(jù)庫(kù) batchUpdate 后,才遍歷當(dāng)前批次的所有 Message 執(zhí)行 ack()
    • 異常兜底:若入庫(kù)失敗,則對(duì)該批次所有 Message 執(zhí)行 nak(),觸發(fā) NATS 重發(fā),等待下一次處理。

4. 核心重構(gòu)三:驅(qū)動(dòng)模型 —— 單線(xiàn)程事件循環(huán) (Event Loop)

4.1 隱患分析:定時(shí)器的局限性

使用 ScheduledExecutorService 存在天然的滯后性。即使隊(duì)列瞬間被打滿(mǎn),消費(fèi)者也必須等待定時(shí)器觸發(fā)。且此前嘗試的虛擬線(xiàn)程模型并不適合這種“單例、長(zhǎng)期駐留、CPU 密集型(Map 操作)”的任務(wù)。

4.2 解決方案:貪婪的單線(xiàn)程 Loop

V2 版本采用了類(lèi)似 Redis 或 Node.js 的 Single Thread Event Loop 模型。創(chuàng)建一個(gè)長(zhǎng)期駐留的平臺(tái)線(xiàn)程,運(yùn)行一個(gè)永不停止的 while 循環(huán)。

  • 貪婪消費(fèi):使用 queue.poll(timeout)。一旦有數(shù)據(jù),立即喚醒處理;處理完一條后,繼續(xù)循環(huán)嘗試獲取,最大化吞吐。
  • 自帶心跳poll 的超時(shí)時(shí)間(如 500ms)即為“最大落庫(kù)延遲”。如果系統(tǒng)空閑,線(xiàn)程休眠;一旦超時(shí)醒來(lái),強(qiáng)制檢查是否需要刷盤(pán)。
  • 無(wú)鎖設(shè)計(jì):由于數(shù)據(jù)讀取、Map 合并、數(shù)據(jù)庫(kù)寫(xiě)入均在同一個(gè)線(xiàn)程內(nèi)串行執(zhí)行,徹底消除了并發(fā)競(jìng)爭(zhēng),既安全又高效。

5. 核心代碼實(shí)現(xiàn) (V2)

Talk is cheap. 下面是經(jīng)過(guò) V2 重構(gòu)后的核心代碼實(shí)現(xiàn)。請(qǐng)注意代碼是如何通過(guò) BlockingQueueLinkedHashMap 的組合來(lái)實(shí)現(xiàn)上述流控與安全邏輯的。

5.1 整體結(jié)構(gòu)與雙隊(duì)列定義

我們摒棄了復(fù)雜的第三方緩存組件,直接使用 JVM 內(nèi)部數(shù)據(jù)結(jié)構(gòu),減少網(wǎng)絡(luò)開(kāi)銷(xiāo)。

@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {

  // 【Queue A】接收隊(duì)列:容量極小 (8),核心作用是建立"背壓"
  // 當(dāng)消費(fèi)者處理不過(guò)來(lái)時(shí),這個(gè)隊(duì)列會(huì)滿(mǎn),從而阻塞 NATS 客戶(hù)端線(xiàn)程
  private static final int RECEIVE_QUEUE_SIZE = 8;
  private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);

  // 【Queue B】防抖容器:LRU 模式,用于合并重復(fù)的 Session 更新
  // accessOrder=true 確保我們能在容量滿(mǎn)時(shí)擠出"最老"的數(shù)據(jù)
  private static final int PENDING_MAP_SIZE = 1000;
  private final LinkedHashMap<String, MessageWrapper> pendingMap =
    new LinkedHashMap<>(16, 0.75f, true);

  // 單線(xiàn)程處理器:系統(tǒng)的"心臟"
  private final Thread processingThread;

  public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
    this.sessionUpdateService = sessionUpdateService;
    // 啟動(dòng)一個(gè)守護(hù)線(xiàn)程,專(zhuān)門(mén)負(fù)責(zé)"消費(fèi) -> 合并 -> 落庫(kù)"的全流程
    processingThread = new Thread(this::processLoop, "session-update-processor");
    processingThread.setDaemon(true);
    processingThread.start();
  }
  
  // 包裝類(lèi):持有 NATS 原始句柄,直到入庫(kù)成功才 ACK
  private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}

5.2 生產(chǎn)者:優(yōu)雅的阻塞背壓

這是 V2 版本最大的改變。我們不再使用 offer + NAK,而是直接使用 put。

  @SneakyThrows
  @Override
  public void onMessage(Message msg) {
    // 【關(guān)鍵點(diǎn)】阻塞式寫(xiě)入
    // 如果 receiveQueue 滿(mǎn)了,當(dāng)前線(xiàn)程(NATS Client 線(xiàn)程)會(huì)在此掛起 (WAITING)
    // 這會(huì)導(dǎo)致 TCP 接收窗口填滿(mǎn),進(jìn)而讓 NATS Server 自動(dòng)降低推送速率
    // 保證了所有消息嚴(yán)格 FIFO,不亂序,不丟棄
    receiveQueue.put(msg);
  }

5.3 消費(fèi)者:貪婪的事件循環(huán) (Event Loop)

我們移除了 ScheduledExecutor,改用 poll(timeout) 機(jī)制。這既保證了高負(fù)載下的實(shí)時(shí)消費(fèi),又充當(dāng)了低負(fù)載下的心跳機(jī)制。

  private void processLoop() {
    while (true) {
      try {
        // 1. 貪婪獲?。簬?500ms 超時(shí)
        // 有數(shù)據(jù) -> 立即返回,微秒級(jí)延遲
        // 無(wú)數(shù)據(jù) -> 等待 500ms,相當(dāng)于心跳間隔
        Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
        
        if (msg != null) {
          addToPendingMap(msg); // 進(jìn)貨:放入 Map 合并
        } else {
          flushAll(); // 心跳:超時(shí)沒(méi)數(shù)據(jù),強(qiáng)制刷盤(pán),防止數(shù)據(jù)滯留
        }
      } catch (Exception e) {
         if (e instanceof InterruptedException) {
          throw (RuntimeException) e;
        }
        log.error("Error processing queue", e);
      }
    }
  }

5.4 核心邏輯:智能合并與延遲 ACK

這是解決“亂序”與“丟數(shù)據(jù)”的關(guān)鍵邏輯。注意我們是如何處理 ACK 的:只 ACK 被覆蓋的舊消息,保留最新消息的句柄直到落庫(kù)。

  private void addToPendingMap(Message msg) {
    // ... 解析代碼略 (ConvertUtils) ...

    // 智能合并邏輯 (compute)
    pendingMap.compute(sessionKey, (key, existing) -> {
      if (existing != null) {
        if (existing.event().getTimestamp() > event.getTimestamp()) {
          // 【場(chǎng)景 A:亂序】
          // 內(nèi)存里的消息比新來(lái)的更"新",說(shuō)明新消息是遲到的舊狀態(tài)
          // 策略:直接 ACK 新消息(丟棄),保留內(nèi)存里的現(xiàn)有值
          msg.ack();
          return existing;
        } else {
          // 【場(chǎng)景 B:正常更新/覆蓋】
          // 新消息是最新的,舊消息已經(jīng)沒(méi)用了
          // 策略:ACK 舊消息(它完成了歷史使命),將新消息放入 Map
          existing.message().ack();
          return new MessageWrapper(msg, event);
        }
      }
      // 【場(chǎng)景 C:新值】直接存入
      return new MessageWrapper(msg, event);
    });
    
    // 兜底策略:如果 Map 滿(mǎn)了,主動(dòng)擠出最老的數(shù)據(jù)單獨(dú)落庫(kù)
    if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
        evictOldest();
    }
  }

5.5 提交階段:原子性批量落庫(kù)

最后,在刷盤(pán)階段,我們嚴(yán)格遵循 先寫(xiě)庫(kù),后 ACK 的原則,實(shí)現(xiàn)了 At-Least-Once 語(yǔ)義。

  private void flushAll() {
    if (pendingMap.isEmpty()) return;

    try {
      // 1. 數(shù)據(jù)庫(kù)批量寫(xiě)入 (Batch Insert/Update)
      int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
      
      // 2. 只有入庫(kù)成功,才對(duì)這批消息進(jìn)行 ACK
      // 如果 DB 報(bào)錯(cuò),這里不會(huì)執(zhí)行 ACK,NATS 會(huì)在超時(shí)后自動(dòng)重發(fā)所有消息
      for (MessageWrapper wrapper : pendingMap.values()) {
        wrapper.message().ack();
      }
      pendingMap.clear();
      log.debug("Batch updated {} sessions", updated);
    } catch (Exception e) {
      // 異常處理:日志記錄
      // 注意:這里沒(méi)有顯式調(diào)用 NAK,而是依靠 NATS 的 AckWait 機(jī)制自動(dòng)重試
      // 或者也可以在這里顯式調(diào)用 nak() 加速重試
      log.error("Batch update failed...", e);
    }
  }

6. 總結(jié)

從 V1 到 V2 的演進(jìn),體現(xiàn)了架構(gòu)設(shè)計(jì)中從“功能實(shí)現(xiàn)”到“生產(chǎn)高可用”的思維轉(zhuǎn)變:

  1. 一致性?xún)?yōu)先:在消息順序和數(shù)據(jù)不丟面前,非阻塞的極致吞吐量是可以被權(quán)衡的。阻塞隊(duì)列提供了系統(tǒng)自我保護(hù)的底線(xiàn),防止了雪崩和亂序。
  2. 事務(wù)延伸:通過(guò)延遲 ACK,將中間件的消息生命周期與數(shù)據(jù)庫(kù)事務(wù)強(qiáng)綁定,實(shí)現(xiàn)了跨系統(tǒng)的最終一致性。
  3. 架構(gòu)極簡(jiǎn)單線(xiàn)程模型在 I/O 密集型(數(shù)據(jù)庫(kù)寫(xiě))與計(jì)算密集型(Map 操作)混合的場(chǎng)景下,通過(guò)串行化設(shè)計(jì)去除了復(fù)雜的鎖機(jī)制,反而提升了系統(tǒng)的穩(wěn)定性和可維護(hù)性。

通過(guò)這次優(yōu)化,系統(tǒng)成功填補(bǔ)了高并發(fā)下的并發(fā)陷阱,不僅解決了寫(xiě)放大問(wèn)題,更確保了會(huì)話(huà)數(shù)據(jù)的精準(zhǔn)與安全。

本文由mdnice多平臺(tái)發(fā)布

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容