TX-LCN 核心流程源碼分析

TX-LCN核心源碼解讀

TX-LCN是基于Java編寫(xiě)的分布式事務(wù)解決方案框架,主要提供三種主流的解決方案

  • LCN模式,通過(guò)代理JDBC Connection來(lái)控制協(xié)調(diào)多組本地原子事務(wù)的提交與關(guān)閉
  • TCC模式,屬于框架級(jí)別解決方案,對(duì)業(yè)務(wù)入侵性極大
  • TXC模式,核心方案為查詢(xún) + 分布式鎖的分布式事務(wù)解決方案,由淘寶團(tuán)隊(duì)提出

核心組件

  • TC,作為分布式事務(wù)組件的客戶(hù)端角色,主要作用在于治理本地事務(wù)
  • TM,作為分布式事務(wù)組件的服務(wù)端角色,主要作用在于協(xié)調(diào)事務(wù)組

核心概念

  • 事務(wù)組,group,描述整個(gè)分布式環(huán)境下運(yùn)行的各個(gè)事務(wù)(以一個(gè)request所需要完成的事務(wù))組合而成的一組事務(wù)。
  • 事務(wù)單元,unit,描述一個(gè)事務(wù)組內(nèi)除開(kāi)主事務(wù)之外的從事務(wù),一個(gè)從事務(wù)表示一個(gè)事務(wù)單元。

核心流程

事務(wù)核心流程.png

核心源碼

LCN模式作為實(shí)踐方案,源碼也以LCN 作為解讀對(duì)象。LCN 模式是什么模式?

LCN模式是通過(guò)代理Connection的方式實(shí)現(xiàn)對(duì)本地事務(wù)的操作,然后在由TxManager統(tǒng)一協(xié)調(diào)控制事務(wù)。當(dāng)本地事務(wù)提交回滾或者關(guān)閉連接時(shí)將會(huì)執(zhí)行假操作,該代理的連接將由LCN連接池管理。

創(chuàng)建事務(wù)組

/**
 * Client創(chuàng)建事務(wù)組操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionInfo transactionInfo
 * @param transactionType transactionType
 * @throws TransactionException 創(chuàng)建group失敗時(shí)拋出
 */
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
    throws TransactionException {
    //創(chuàng)建事務(wù)組
    try {
        // 日志
        txLogger.transactionInfo(groupId, unitId,
                                 "create group > {} > groupId: {xid}, unitId: {uid}", transactionType);
        // 創(chuàng)建事務(wù)組消息
        reliableMessenger.createGroup(groupId);
        // 緩存發(fā)起方切面信息
        aspectLogger.trace(groupId, unitId, transactionInfo);
    } catch (RpcException e) {
        // 通訊異常
        dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
    } catch (LcnBusinessException e) {
        // 創(chuàng)建事務(wù)組業(yè)務(wù)失敗
        dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
    }
    txLogger.transactionInfo(groupId, unitId, "create group over");
}

TC 發(fā)起創(chuàng)建事務(wù)組僅僅是像服務(wù)端發(fā)起請(qǐng)求,參數(shù)為groupId ,核心的代碼在于TM 接到請(qǐng)求后的處理。


public class CreateGroupExecuteService implements RpcExecuteService {

    // ...
    @Override
    public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
        // ...
        transactionManager.begin(transactionCmd.getGroupId());
    }
}

public class DefaultDTXContextRegistry implements DTXContextRegistry {

    private final FastStorage fastStorage;

    @Override
    public DTXContext create(String groupId) throws TransactionException {
        // ..
        fastStorage.initGroup(groupId);
    }
}


public class RedisStorage implements FastStorage {

    // ...
    @Override
    public void initGroup(String groupId) {
        // 將groupId存入Redis
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, "root", "");
        redisTemplate.expire(REDIS_GROUP_PREFIX + groupId, managerConfig.getDtxTime() + 10000, TimeUnit.MILLISECONDS);
    }
}

加入事務(wù)組


/**
 * Client加入事務(wù)組操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param transactionInfo transactionInfo
 * @throws TransactionException 加入事務(wù)組失敗時(shí)拋出
 */
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo) throws TransactionException {
    
    // 詢(xún)問(wèn)TM加入事務(wù)組
    // 該groupId由遠(yuǎn)程RPC通過(guò)header方式攜帶到從事務(wù)
    reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState());

    // 異步檢測(cè)
    dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
    
    // ...
}

// 
@Override
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
    txLogger.taskInfo(groupId, unitId, "start delay checking task");
    // 異步阻塞的方式
    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
        try {
            TxContext txContext = globalContext.txContext(groupId);
            if (Objects.nonNull(txContext)) {
                synchronized (txContext.getLock()) {
                    txLogger.info(groupId, unitId, Transactions.TAG_TASK,
                            "checking waiting for business code finish.");
                    txContext.getLock().wait();
                }
            }
            int state = reliableMessenger.askTransactionState(groupId, unitId);// 詢(xún)問(wèn)事務(wù)組是否成功
            txLogger.taskInfo(groupId, unitId, "ask transaction state {}", state);
            if (state == -1) {
                txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
                onAskTransactionStateException(groupId, unitId, transactionType);
            } else {
                transactionCleanTemplate.clean(groupId, unitId, transactionType, state);// 事務(wù)清理,即commit or rollback
                aspectLogger.clearLog(groupId, unitId);
            }

        } catch (RpcException e) {
            onAskTransactionStateException(groupId, unitId, transactionType);
        } catch (TransactionClearException | InterruptedException e) {
            txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
        }
    }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);// 時(shí)間為最大事務(wù)時(shí)間,該時(shí)間由TM配置,在TC初始化時(shí)從TM放取到
    delayTasks.put(groupId + unitId, scheduledFuture);
}

//  詢(xún)問(wèn)事務(wù)是否成功
private void onAskTransactionStateException(String groupId, String unitId, String transactionType) {
    try {
        // 通知TxManager事務(wù)補(bǔ)償
        txMangerReporter.reportTransactionState(groupId, unitId, TxExceptionParams.ASK_ERROR, 0);
        log.warn("{} > has compensation info!", transactionType);

        // 事務(wù)回滾, 保留適當(dāng)?shù)难a(bǔ)償信息
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, 0);
    } catch (TransactionClearException e) {
        log.error("{} > clean transaction error.", transactionType);
    }
}

TC 核心工作是申請(qǐng)加入事務(wù)組,并啟動(dòng)異步任務(wù)在事務(wù)最大時(shí)間后訪問(wèn)TM 事務(wù)組的全局事務(wù)狀態(tài)來(lái)進(jìn)行事務(wù)協(xié)調(diào)。

創(chuàng)建異步任務(wù)對(duì)象并將其緩存在本地內(nèi)存的delayTasks, 如果在事務(wù)最大時(shí)間內(nèi)已經(jīng)完成并調(diào)用則將該任務(wù)取消。

@Override
public void stopDelayChecking(String groupId, String unitId) {
    ScheduledFuture scheduledFuture = delayTasks.get(groupId + unitId);
    if (Objects.nonNull(scheduledFuture)) {
        txLogger.taskInfo(groupId, unitId, "cancel {}:{} checking.", groupId, unitId);
        scheduledFuture.cancel(true); // 取消任務(wù)
    }
}

而在TM 端則進(jìn)行如下處理


// 加入事務(wù)組
// RedisStorage
public void saveTransactionUnitToGroup(String groupId, TransactionUnit transactionUnit) throws FastStorageException {
    if (Optional.ofNullable(redisTemplate.hasKey(REDIS_GROUP_PREFIX + groupId)).orElse(false)) {
        redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, transactionUnit.getUnitId(), transactionUnit);
        return;
    }
    throw new FastStorageException("attempts to the non-existent transaction group " + groupId, FastStorageException.EX_CODE_NON_GROUP);
}

// TC 詢(xún)問(wèn)該事務(wù)組全局事務(wù)狀態(tài)
public int transactionState(String groupId) {

    
    int state = exceptionService.transactionState(groupId); // 查詢(xún)數(shù)據(jù)庫(kù)t_tx_exception得到該事務(wù)的狀態(tài)
    
    //存在數(shù)據(jù)時(shí)返回?cái)?shù)據(jù)狀態(tài)
    if (state != -1) {
        return state;
    }
    
    // 查詢(xún)r(jià)edis該事務(wù)組的全局狀態(tài)
    return dtxContextRegistry.transactionState(groupId);
    
    // 為什么會(huì)先查數(shù)據(jù)庫(kù),再查redis ? 后文會(huì)有說(shuō)明,主要是考慮事務(wù)補(bǔ)償時(shí)的問(wèn)題
}

// 返回的最終狀態(tài)
public class AskTransactionStateExecuteService implements RpcExecuteService {

    @Override
    public Serializable execute(TransactionCmd transactionCmd) {
        int state = transactionManager.transactionState(transactionCmd.getGroupId());
        return state == -1 ? 0 : state;
    }
}

通知事務(wù)組

主事務(wù)的業(yè)務(wù)代碼執(zhí)行完畢,最終必須調(diào)用通知事務(wù)組進(jìn)行全局事務(wù)協(xié)調(diào)。通知完成后進(jìn)行事務(wù)清理


/**
 * Client通知事務(wù)組操作集合
 *
 * @param groupId         groupId
 * @param unitId          unitId
 * @param transactionType transactionType
 * @param state           transactionState
 */
public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
    // ...
    // 事務(wù)通知
    reliableMessenger.notifyGroup(groupId, state);
    // 事務(wù)清理
    transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
    // 通知異常(RPC調(diào)用異常)
    dtxExceptionHandler.handleNotifyGroupMessageException(Arrays.asList(groupId, state, unitId, transactionType), e);
    // ...
    
}

// 當(dāng)TC調(diào)用TM拋出異常時(shí),會(huì)正常的按照當(dāng)前事務(wù)的狀態(tài)進(jìn)行提交,并將結(jié)果上報(bào)到TM

public void handleNotifyGroupMessageException(Object params, Throwable ex) {
    
    // 參數(shù)中取出事務(wù)的狀態(tài)
    // ....

    // 按狀態(tài)正常結(jié)束事務(wù)(切面補(bǔ)償記錄將保留)
    // TxManager 存在請(qǐng)求異?;蛘唔憫?yīng)異常兩種情況。當(dāng)請(qǐng)求異常時(shí)這里的業(yè)務(wù)需要補(bǔ)償,當(dāng)響應(yīng)異常的時(shí)候需要做狀態(tài)的事務(wù)清理。
    // 請(qǐng)求異常時(shí)
    //     參與放會(huì)根據(jù)上報(bào)補(bǔ)償記錄做事務(wù)的提交。
    // 響應(yīng)異常時(shí)
    //     參與反會(huì)正常提交事務(wù),本地業(yè)務(wù)提示事務(wù)。

    // 該兩種情況下補(bǔ)償信息均可以忽略,可直接把本地補(bǔ)償記錄數(shù)據(jù)刪除。


    String unitId = (String) paramList.get(2);
    String transactionType = (String) paramList.get(3);
    try {
        transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, state);// 本地事務(wù)提交
    } catch (TransactionClearException e) {
        log.error("{} > compensationClean transaction error.", transactionType);
    }

    // 上報(bào)Manager,上報(bào)直到成功.
    txMangerReporter.reportTransactionState(groupId, null, TxExceptionParams.NOTIFY_GROUP_ERROR, state);
    // 提交的事務(wù)記錄到t_tx_exception表中,所以會(huì)看到前文TC詢(xún)問(wèn)事務(wù)狀態(tài)時(shí),會(huì)優(yōu)先查詢(xún)數(shù)據(jù)庫(kù),而不是直接查redis
}


通知事務(wù)組的概念,應(yīng)該理解為,主事務(wù)告知TM 進(jìn)行全部的事務(wù)協(xié)調(diào),即TM 僅會(huì)通知各個(gè)從事務(wù)進(jìn)行commit or rollback,而不會(huì)通知主事務(wù)進(jìn)行commit or rollback 。因?yàn)樵谇拔目吹絼?chuàng)建事務(wù)組時(shí),TM 并沒(méi)有將主事務(wù)unitId 記錄下來(lái)。而從事務(wù)加入事務(wù)組時(shí),除了記錄全局事務(wù)組Id,還包括事務(wù)單元unitId .


public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
    try {
        // 從redis取事務(wù)狀態(tài)
        int transactionState = transactionManager.transactionStateFromFastStorage(transactionCmd.getGroupId());
        boolean hasThrow = false;
        if (transactionState == 0) {
            commitState = 0;
            hasThrow = true;
        }
        // 事務(wù)狀態(tài)為1進(jìn)行全局事務(wù)提交
        if (commitState == 1) {
            transactionManager.commit(dtxContext);
        } else if (commitState == 0) {
            transactionManager.rollback(dtxContext);
        }
        // ...
    } catch (TransactionException e) {
        throw new TxManagerException(e);
    } finally {
        transactionManager.close(transactionCmd.getGroupId());
        // 系統(tǒng)日志
        txLogger.transactionInfo(transactionCmd.getGroupId(), "", "notify group over");
    }
    return null;
}

// 事務(wù)通知
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
    for (TransactionUnit transUnit : dtxContext.transactionUnits()) {
        NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
        notifyUnitParams.setGroupId(dtxContext.getGroupId());
        notifyUnitParams.setUnitId(transUnit.getUnitId());
        notifyUnitParams.setUnitType(transUnit.getUnitType());
        notifyUnitParams.setState(transactionState);
        txLogger.info(dtxContext.getGroupId(),
                notifyUnitParams.getUnitId(), Transactions.TAG_TRANSACTION, "notify %s's unit: %s",
                transUnit.getModId(), transUnit.getUnitId());
        try {
            // 這里在5.0.1會(huì)出現(xiàn)信道問(wèn)題,什么是信道問(wèn)題?比如此時(shí)有兩臺(tái)push注冊(cè)到TM上,而某一刻的全局事務(wù)所在的本地事務(wù)只在其中一臺(tái),而通知的時(shí)候如果modId一致則會(huì)取到第一個(gè)
            // 如下get(0) . 解決的辦法是生成modId時(shí)去的是Mac地址+端口+服務(wù)名稱(chēng),保證了不同實(shí)例的全局唯一
            List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());
            if (modChannelKeys.isEmpty()) {
                // record exception
                throw new RpcException("offline mod.");
            }
            MessageDto respMsg =
                    rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));
            if (!MessageUtils.statusOk(respMsg)) {
                // 提交/回滾失敗的消息處理
                List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
                rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
            }
        } catch (RpcException e) {
            // 提交/回滾通訊失敗
            List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
            rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
        } finally {
            txLogger.transactionInfo(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
        }
    }
}

// 當(dāng)通知出現(xiàn)異常時(shí),將信息記錄到t_tx_exception表
public class ManagerRpcExceptionHandler implements RpcExceptionHandler {

    @Override
    public void handleNotifyUnitMessageException(Object params, Throwable e) {
        // notify unit message error, write txEx
        List paramList = ((List) params);
        String modName = (String) paramList.get(1);

        NotifyUnitParams notifyUnitParams = (NotifyUnitParams) paramList.get(0);
        WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(notifyUnitParams.getGroupId(),
                notifyUnitParams.getUnitId(), modName, notifyUnitParams.getState());
        writeTxExceptionReq.setRegistrar((short) 0);
        compensationService.writeTxException(writeTxExceptionReq);// 記錄到t_tx_exception
        // 記住客戶(hù)端主動(dòng)查詢(xún)時(shí),優(yōu)先查數(shù)據(jù)庫(kù),再查redis的事務(wù)狀態(tài)
    }
}

總結(jié)

TX-LCN作為分布式解決方案是比較優(yōu)秀的方案,代碼邏輯也比較簡(jiǎn)單,但是如果應(yīng)用Crash,就可能出現(xiàn)數(shù)據(jù)不一致的情況,而且這種數(shù)據(jù)不一致的情況必須人肉修復(fù)。

比如主事務(wù)在進(jìn)行NotifyGroup 時(shí)出現(xiàn)RpcException 主事務(wù)會(huì)根據(jù)當(dāng)前事務(wù)的狀態(tài)進(jìn)行commit or rollback ,之后會(huì)上報(bào)TM 記錄補(bǔ)償信息,假如在記錄補(bǔ)償時(shí)失敗了(應(yīng)用在這個(gè)點(diǎn)Crash)了,那么主事務(wù)提交了,并且TM 并不能完整地協(xié)調(diào)好從事務(wù) 的全局事務(wù)狀態(tài)。

為什么需要人肉修復(fù)呢?其實(shí)從源碼上可以分析出,TX-LCN解決的場(chǎng)景時(shí)將本地的事務(wù)通過(guò)事務(wù)協(xié)調(diào)器進(jìn)行協(xié)調(diào),但是本質(zhì)上并沒(méi)有將事務(wù)分布式節(jié)點(diǎn)化,即本地事務(wù)的成功與失敗無(wú)法在不同的節(jié)點(diǎn)進(jìn)行處理。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容