Spring Boot 事務(wù)實(shí)戰(zhàn):如何優(yōu)雅解決 DB 與 MQ 的“雙寫不一致”?

摘要:在分布式系統(tǒng)中,“先存數(shù)據(jù)庫還是先發(fā)消息”是一個(gè)經(jīng)典的架構(gòu)難題。特別是在 IM 系統(tǒng)的多媒體消息處理場景中,如果處理順序不當(dāng),不僅會(huì)導(dǎo)致對(duì)象存儲(chǔ)(OSS)中產(chǎn)生無法回收的“孤兒文件”,還會(huì)引發(fā)并發(fā)重復(fù)處理的問題。本文結(jié)合代碼案例,探討如何利用 Spring 的 TransactionSynchronizationManager 實(shí)現(xiàn) 事務(wù)提交后觸發(fā) (Trigger After Commit) 機(jī)制,優(yōu)雅解決數(shù)據(jù)庫與消息隊(duì)列的“雙寫一致性”問題。


1. 引言:一個(gè)看似簡單的順序問題

在“信令與媒體分離”的架構(gòu)中,核心流程通常如下:API 服務(wù)收到消息 -> 落庫(標(biāo)記為 Pending) -> 異步通知 Worker 搬運(yùn)文件。

這一流程涉及兩個(gè)異構(gòu)系統(tǒng)的寫操作:

  1. DB Write:將消息元數(shù)據(jù)寫入 MySQL。
  2. MQ Write:將搬運(yùn)任務(wù)發(fā)布到 NATS。

在實(shí)際開發(fā)中,直覺性的代碼編寫往往會(huì)陷入以下誤區(qū):

誤區(qū)一:先發(fā)消息,后入庫

// ? 錯(cuò)誤示范
natsPublisher.publish(task); // 1. 消息發(fā)出,Worker 開始下載轉(zhuǎn)存
messageRepository.save(message); // 2. 數(shù)據(jù)庫報(bào)錯(cuò)(如字段超長、唯一鍵沖突)
// 后果:DB 回滾,業(yè)務(wù)無記錄,但 OSS 中產(chǎn)生了一個(gè)永遠(yuǎn)無法被引用的“孤兒文件”。

誤區(qū)二:在事務(wù)內(nèi)發(fā)消息

// ? 錯(cuò)誤示范
@Transactional
public void handle() {
    messageRepository.save(message);
    natsPublisher.publish(task); 
    // 3. 代碼執(zhí)行完畢,但在事務(wù)提交(Commit)的一瞬間數(shù)據(jù)庫連接斷開
}
// 后果:Worker 收到任務(wù)并完成處理,但在回調(diào)更新狀態(tài)時(shí)發(fā)現(xiàn) DB 中不存在該記錄。

2. 核心方案:事務(wù)提交后的“驚險(xiǎn)一躍”

為了保證 “只有數(shù)據(jù)庫確確實(shí)實(shí)持久化成功了,才去觸發(fā)異步任務(wù)”,最佳實(shí)踐是利用 Spring 框架提供的事務(wù)同步機(jī)制。

以下是優(yōu)化后的代碼實(shí)現(xiàn):

2.1 主業(yè)務(wù)邏輯

// 1. 準(zhǔn)備階段:預(yù)生成任務(wù)(純內(nèi)存操作,無副作用)
// 此時(shí)并沒有真正發(fā)送 NATS 消息,只是構(gòu)建了對(duì)象
List<MediaTransferTask> mediaTasks = prepareMediaTransferTasks(msg, ids.sessionId());

// 2. 構(gòu)建消息實(shí)體
WxMessage message = buildMessage(msg, accountId, ids.sessionId(), ids.senderId());

try {
    // 【核心步驟 A】數(shù)據(jù)庫落庫 (Source of Truth)
    // 這是唯一的“事實(shí)來源”。如果這里失敗,后續(xù)一切都不應(yīng)發(fā)生。
    messageRepository.save(message);
    log.info("Message saved: id={}, wxid={}", message.getId(), message.getWxid());

    // 3. 發(fā)布會(huì)話更新事件 (內(nèi)存事件或 MQ)
    SessionUpdateEvent event = SessionUpdateEvent.builder()
        .accountId(accountId)
        // ... build params
        .build();
    sessionEventPublisher.publishSessionUpdate(event);

    // 【核心步驟 B】注冊(cè)事務(wù)回調(diào)
    // 關(guān)鍵點(diǎn):這里不是立即發(fā)送,而是“預(yù)約”發(fā)送
    publishMediaTransferTasksAfterCommit(mediaTasks);

    return ProcessResult.success();

} catch (DataIntegrityViolationException e) {
    // 【并發(fā)場景的保護(hù)】
    // 如果兩個(gè)線程同時(shí)處理同一條消息(如網(wǎng)絡(luò)重放或客戶端重試),
    // 數(shù)據(jù)庫的唯一索引會(huì)拋出此異常。
    // 由于消息發(fā)送邏輯在事務(wù)提交后執(zhí)行,失敗的線程事務(wù)回滾,
    // 因此“afterCommit”鉤子不會(huì)被觸發(fā),完美避免了 Worker 重復(fù)搬運(yùn)文件。
    log.debug("Duplicate message (concurrent): {}", msg.getMessageId());
    return ProcessResult.duplicate();
}

2.2 事務(wù)同步器的實(shí)現(xiàn)

publishMediaTransferTasksAfterCommit 方法利用了 Spring 的 TransactionSynchronizationManager 來掛載回調(diào)。

private void publishMediaTransferTasksAfterCommit(List<MediaTransferTask> tasks) {
    if (CollectionUtils.isEmpty(tasks)) {
        return;
    }
    
    // 判斷當(dāng)前是否在事務(wù)中
    if (TransactionSynchronizationManager.isActualTransactionActive()) {
        // 注冊(cè)同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void afterCommit() {
                // 【真正發(fā)送的時(shí)機(jī)】
                // 只有當(dāng) DB 事務(wù)成功 Commit 后,這一行才會(huì)執(zhí)行
                // 此時(shí) DB 里一定有數(shù)據(jù),Worker 回調(diào)一定能成功
                tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
                log.debug("Async media tasks published after commit: size={}", tasks.size());
            }
        });
    } else {
        // 如果不在事務(wù)中(比如非事務(wù)方法調(diào)用),則立即發(fā)送(降級(jí)策略)
        tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
    }
}

3. 深度解析:方案優(yōu)勢

3.1 杜絕“孤兒資源”

通過 afterCommit 鉤子,嚴(yán)格保證了因果關(guān)系:因(DB落庫成功) -> 果(觸發(fā)搬運(yùn))。
如果 messageRepository.save(message) 因?yàn)槿魏卧颍I(yè)務(wù)校驗(yàn)失敗、數(shù)據(jù)庫異常)導(dǎo)致事務(wù)回滾,afterCommit 回調(diào)將永遠(yuǎn)不會(huì)被執(zhí)行,NATS 消息也就不會(huì)發(fā)出,從而從源頭上避免了 OSS 資源的浪費(fèi)。

3.2 天然的冪等性防護(hù)

代碼中對(duì) DataIntegrityViolationException 的捕獲處理是該方案的另一大亮點(diǎn)。
在分布式場景下,消息重復(fù)投遞是常見現(xiàn)象。

  • 無保護(hù)模式:若未加控制,兩個(gè)線程可能都會(huì)發(fā)出 NATS 消息,導(dǎo)致 Worker 下載上傳兩次同樣的圖片,浪費(fèi)帶寬和計(jì)算資源。
  • 事務(wù)同步模式:數(shù)據(jù)庫的唯一約束(Unique Key)充當(dāng)了“守門員”。第二個(gè)線程在 save 時(shí)會(huì)因沖突被拒絕,隨之事務(wù)回滾。由于事務(wù)未成功提交,其注冊(cè)的 afterCommit 鉤子自動(dòng)失效。最終,只有搶鎖成功的線程才會(huì)發(fā)出唯一的一條異步任務(wù)。

4. 兜底策略:應(yīng)對(duì)“反向不一致”

雖然該方案解決了“有文件沒記錄”的問題,但理論上仍存在極低概率的“反向不一致”:DB 提交成功了,但在執(zhí)行 afterCommit 發(fā)送 NATS 消息的一瞬間,服務(wù)宕機(jī)或斷電。

此時(shí),數(shù)據(jù)庫中存在一條狀態(tài)為 PENDING 的記錄,但永遠(yuǎn)不會(huì)有 Worker 來處理它。

為了達(dá)到金融級(jí)的一致性,系統(tǒng)應(yīng)補(bǔ)充一個(gè)兜底補(bǔ)償機(jī)制

  1. 定時(shí)任務(wù) (Compensation Job):每隔一定周期(如 5 分鐘)掃描一次消息表。
  2. 篩選條件create_time < 5分鐘前 AND media_status = 'PENDING'。
  3. 補(bǔ)償動(dòng)作:重新構(gòu)建 MediaTransferTask 并補(bǔ)發(fā)到 NATS。

5. 總結(jié)

在處理“數(shù)據(jù)庫事務(wù)”與“外部系統(tǒng)調(diào)用(MQ/RPC)”混合的業(yè)務(wù)場景時(shí),“事務(wù)同步器(Transaction Synchronization)” 是 Spring 體系中解決雙寫一致性問題的利器。

通過這一模式的重構(gòu),系統(tǒng)實(shí)現(xiàn)了:

  1. 資源一致性:杜絕了 OSS 孤兒文件。
  2. 并發(fā)安全性:利用數(shù)據(jù)庫鎖自動(dòng)解決并發(fā)任務(wù)重復(fù)發(fā)布問題。
  3. 邏輯嚴(yán)密性:確保狀態(tài)流轉(zhuǎn)嚴(yán)格遵循業(yè)務(wù)時(shí)序。

核心原則:先落庫,再提交,回調(diào)之中發(fā)消息。

本文由mdnice多平臺(tái)發(fā)布

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容