TX-LCN核心源碼解讀
TX-LCN是基于Java編寫(xiě)的分布式事務(wù)解決方案框架,主要提供三種主流的解決方案
- LCN模式,通過(guò)代理JDBC Connection來(lái)控制協(xié)調(diào)多組本地原子事務(wù)的提交與關(guān)閉
- TCC模式,屬于框架級(jí)別解決方案,對(duì)業(yè)務(wù)入侵性極大
- TXC模式,核心方案為查詢(xún) + 分布式鎖的分布式事務(wù)解決方案,由淘寶團(tuán)隊(duì)提出
核心組件
- TC,作為分布式事務(wù)組件的客戶(hù)端角色,主要作用在于治理本地事務(wù)
- TM,作為分布式事務(wù)組件的服務(wù)端角色,主要作用在于協(xié)調(diào)事務(wù)組
核心概念
- 事務(wù)組,
group,描述整個(gè)分布式環(huán)境下運(yùn)行的各個(gè)事務(wù)(以一個(gè)request所需要完成的事務(wù))組合而成的一組事務(wù)。 - 事務(wù)單元,
unit,描述一個(gè)事務(wù)組內(nèi)除開(kāi)主事務(wù)之外的從事務(wù),一個(gè)從事務(wù)表示一個(gè)事務(wù)單元。
核心流程

核心源碼
以LCN模式作為實(shí)踐方案,源碼也以LCN 作為解讀對(duì)象。LCN 模式是什么模式?
LCN模式是通過(guò)代理Connection的方式實(shí)現(xiàn)對(duì)本地事務(wù)的操作,然后在由TxManager統(tǒng)一協(xié)調(diào)控制事務(wù)。當(dāng)本地事務(wù)提交回滾或者關(guān)閉連接時(shí)將會(huì)執(zhí)行假操作,該代理的連接將由LCN連接池管理。
創(chuàng)建事務(wù)組
/**
* Client創(chuàng)建事務(wù)組操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionInfo transactionInfo
* @param transactionType transactionType
* @throws TransactionException 創(chuàng)建group失敗時(shí)拋出
*/
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
throws TransactionException {
//創(chuàng)建事務(wù)組
try {
// 日志
txLogger.transactionInfo(groupId, unitId,
"create group > {} > groupId: {xid}, unitId: {uid}", transactionType);
// 創(chuàng)建事務(wù)組消息
reliableMessenger.createGroup(groupId);
// 緩存發(fā)起方切面信息
aspectLogger.trace(groupId, unitId, transactionInfo);
} catch (RpcException e) {
// 通訊異常
dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
} catch (LcnBusinessException e) {
// 創(chuàng)建事務(wù)組業(yè)務(wù)失敗
dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
}
txLogger.transactionInfo(groupId, unitId, "create group over");
}
TC 發(fā)起創(chuàng)建事務(wù)組僅僅是像服務(wù)端發(fā)起請(qǐng)求,參數(shù)為groupId ,核心的代碼在于TM 接到請(qǐng)求后的處理。
public class CreateGroupExecuteService implements RpcExecuteService {
// ...
@Override
public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
// ...
transactionManager.begin(transactionCmd.getGroupId());
}
}
public class DefaultDTXContextRegistry implements DTXContextRegistry {
private final FastStorage fastStorage;
@Override
public DTXContext create(String groupId) throws TransactionException {
// ..
fastStorage.initGroup(groupId);
}
}
public class RedisStorage implements FastStorage {
// ...
@Override
public void initGroup(String groupId) {
// 將groupId存入Redis
redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, "root", "");
redisTemplate.expire(REDIS_GROUP_PREFIX + groupId, managerConfig.getDtxTime() + 10000, TimeUnit.MILLISECONDS);
}
}
加入事務(wù)組
/**
* Client加入事務(wù)組操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionType transactionType
* @param transactionInfo transactionInfo
* @throws TransactionException 加入事務(wù)組失敗時(shí)拋出
*/
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo) throws TransactionException {
// 詢(xún)問(wèn)TM加入事務(wù)組
// 該groupId由遠(yuǎn)程RPC通過(guò)header方式攜帶到從事務(wù)
reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState());
// 異步檢測(cè)
dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
// ...
}
//
@Override
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
txLogger.taskInfo(groupId, unitId, "start delay checking task");
// 異步阻塞的方式
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
try {
TxContext txContext = globalContext.txContext(groupId);
if (Objects.nonNull(txContext)) {
synchronized (txContext.getLock()) {
txLogger.info(groupId, unitId, Transactions.TAG_TASK,
"checking waiting for business code finish.");
txContext.getLock().wait();
}
}
int state = reliableMessenger.askTransactionState(groupId, unitId);// 詢(xún)問(wèn)事務(wù)組是否成功
txLogger.taskInfo(groupId, unitId, "ask transaction state {}", state);
if (state == -1) {
txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
onAskTransactionStateException(groupId, unitId, transactionType);
} else {
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);// 事務(wù)清理,即commit or rollback
aspectLogger.clearLog(groupId, unitId);
}
} catch (RpcException e) {
onAskTransactionStateException(groupId, unitId, transactionType);
} catch (TransactionClearException | InterruptedException e) {
txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
}
}, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);// 時(shí)間為最大事務(wù)時(shí)間,該時(shí)間由TM配置,在TC初始化時(shí)從TM放取到
delayTasks.put(groupId + unitId, scheduledFuture);
}
// 詢(xún)問(wèn)事務(wù)是否成功
private void onAskTransactionStateException(String groupId, String unitId, String transactionType) {
try {
// 通知TxManager事務(wù)補(bǔ)償
txMangerReporter.reportTransactionState(groupId, unitId, TxExceptionParams.ASK_ERROR, 0);
log.warn("{} > has compensation info!", transactionType);
// 事務(wù)回滾, 保留適當(dāng)?shù)难a(bǔ)償信息
transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, 0);
} catch (TransactionClearException e) {
log.error("{} > clean transaction error.", transactionType);
}
}
TC 核心工作是申請(qǐng)加入事務(wù)組,并啟動(dòng)異步任務(wù)在事務(wù)最大時(shí)間后訪問(wèn)TM 事務(wù)組的全局事務(wù)狀態(tài)來(lái)進(jìn)行事務(wù)協(xié)調(diào)。
創(chuàng)建異步任務(wù)對(duì)象并將其緩存在本地內(nèi)存的delayTasks, 如果在事務(wù)最大時(shí)間內(nèi)已經(jīng)完成并調(diào)用則將該任務(wù)取消。
@Override
public void stopDelayChecking(String groupId, String unitId) {
ScheduledFuture scheduledFuture = delayTasks.get(groupId + unitId);
if (Objects.nonNull(scheduledFuture)) {
txLogger.taskInfo(groupId, unitId, "cancel {}:{} checking.", groupId, unitId);
scheduledFuture.cancel(true); // 取消任務(wù)
}
}
而在TM 端則進(jìn)行如下處理
// 加入事務(wù)組
// RedisStorage
public void saveTransactionUnitToGroup(String groupId, TransactionUnit transactionUnit) throws FastStorageException {
if (Optional.ofNullable(redisTemplate.hasKey(REDIS_GROUP_PREFIX + groupId)).orElse(false)) {
redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, transactionUnit.getUnitId(), transactionUnit);
return;
}
throw new FastStorageException("attempts to the non-existent transaction group " + groupId, FastStorageException.EX_CODE_NON_GROUP);
}
// TC 詢(xún)問(wèn)該事務(wù)組全局事務(wù)狀態(tài)
public int transactionState(String groupId) {
int state = exceptionService.transactionState(groupId); // 查詢(xún)數(shù)據(jù)庫(kù)t_tx_exception得到該事務(wù)的狀態(tài)
//存在數(shù)據(jù)時(shí)返回?cái)?shù)據(jù)狀態(tài)
if (state != -1) {
return state;
}
// 查詢(xún)r(jià)edis該事務(wù)組的全局狀態(tài)
return dtxContextRegistry.transactionState(groupId);
// 為什么會(huì)先查數(shù)據(jù)庫(kù),再查redis ? 后文會(huì)有說(shuō)明,主要是考慮事務(wù)補(bǔ)償時(shí)的問(wèn)題
}
// 返回的最終狀態(tài)
public class AskTransactionStateExecuteService implements RpcExecuteService {
@Override
public Serializable execute(TransactionCmd transactionCmd) {
int state = transactionManager.transactionState(transactionCmd.getGroupId());
return state == -1 ? 0 : state;
}
}
通知事務(wù)組
主事務(wù)的業(yè)務(wù)代碼執(zhí)行完畢,最終必須調(diào)用通知事務(wù)組進(jìn)行全局事務(wù)協(xié)調(diào)。通知完成后進(jìn)行事務(wù)清理
/**
* Client通知事務(wù)組操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionType transactionType
* @param state transactionState
*/
public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
// ...
// 事務(wù)通知
reliableMessenger.notifyGroup(groupId, state);
// 事務(wù)清理
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
// 通知異常(RPC調(diào)用異常)
dtxExceptionHandler.handleNotifyGroupMessageException(Arrays.asList(groupId, state, unitId, transactionType), e);
// ...
}
// 當(dāng)TC調(diào)用TM拋出異常時(shí),會(huì)正常的按照當(dāng)前事務(wù)的狀態(tài)進(jìn)行提交,并將結(jié)果上報(bào)到TM
public void handleNotifyGroupMessageException(Object params, Throwable ex) {
// 參數(shù)中取出事務(wù)的狀態(tài)
// ....
// 按狀態(tài)正常結(jié)束事務(wù)(切面補(bǔ)償記錄將保留)
// TxManager 存在請(qǐng)求異?;蛘唔憫?yīng)異常兩種情況。當(dāng)請(qǐng)求異常時(shí)這里的業(yè)務(wù)需要補(bǔ)償,當(dāng)響應(yīng)異常的時(shí)候需要做狀態(tài)的事務(wù)清理。
// 請(qǐng)求異常時(shí)
// 參與放會(huì)根據(jù)上報(bào)補(bǔ)償記錄做事務(wù)的提交。
// 響應(yīng)異常時(shí)
// 參與反會(huì)正常提交事務(wù),本地業(yè)務(wù)提示事務(wù)。
// 該兩種情況下補(bǔ)償信息均可以忽略,可直接把本地補(bǔ)償記錄數(shù)據(jù)刪除。
String unitId = (String) paramList.get(2);
String transactionType = (String) paramList.get(3);
try {
transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, state);// 本地事務(wù)提交
} catch (TransactionClearException e) {
log.error("{} > compensationClean transaction error.", transactionType);
}
// 上報(bào)Manager,上報(bào)直到成功.
txMangerReporter.reportTransactionState(groupId, null, TxExceptionParams.NOTIFY_GROUP_ERROR, state);
// 提交的事務(wù)記錄到t_tx_exception表中,所以會(huì)看到前文TC詢(xún)問(wèn)事務(wù)狀態(tài)時(shí),會(huì)優(yōu)先查詢(xún)數(shù)據(jù)庫(kù),而不是直接查redis
}
通知事務(wù)組的概念,應(yīng)該理解為,主事務(wù)告知TM 進(jìn)行全部的事務(wù)協(xié)調(diào),即TM 僅會(huì)通知各個(gè)從事務(wù)進(jìn)行commit or rollback,而不會(huì)通知主事務(wù)進(jìn)行commit or rollback 。因?yàn)樵谇拔目吹絼?chuàng)建事務(wù)組時(shí),TM 并沒(méi)有將主事務(wù)unitId 記錄下來(lái)。而從事務(wù)加入事務(wù)組時(shí),除了記錄全局事務(wù)組Id,還包括事務(wù)單元unitId .
public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
try {
// 從redis取事務(wù)狀態(tài)
int transactionState = transactionManager.transactionStateFromFastStorage(transactionCmd.getGroupId());
boolean hasThrow = false;
if (transactionState == 0) {
commitState = 0;
hasThrow = true;
}
// 事務(wù)狀態(tài)為1進(jìn)行全局事務(wù)提交
if (commitState == 1) {
transactionManager.commit(dtxContext);
} else if (commitState == 0) {
transactionManager.rollback(dtxContext);
}
// ...
} catch (TransactionException e) {
throw new TxManagerException(e);
} finally {
transactionManager.close(transactionCmd.getGroupId());
// 系統(tǒng)日志
txLogger.transactionInfo(transactionCmd.getGroupId(), "", "notify group over");
}
return null;
}
// 事務(wù)通知
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
for (TransactionUnit transUnit : dtxContext.transactionUnits()) {
NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
notifyUnitParams.setGroupId(dtxContext.getGroupId());
notifyUnitParams.setUnitId(transUnit.getUnitId());
notifyUnitParams.setUnitType(transUnit.getUnitType());
notifyUnitParams.setState(transactionState);
txLogger.info(dtxContext.getGroupId(),
notifyUnitParams.getUnitId(), Transactions.TAG_TRANSACTION, "notify %s's unit: %s",
transUnit.getModId(), transUnit.getUnitId());
try {
// 這里在5.0.1會(huì)出現(xiàn)信道問(wèn)題,什么是信道問(wèn)題?比如此時(shí)有兩臺(tái)push注冊(cè)到TM上,而某一刻的全局事務(wù)所在的本地事務(wù)只在其中一臺(tái),而通知的時(shí)候如果modId一致則會(huì)取到第一個(gè)
// 如下get(0) . 解決的辦法是生成modId時(shí)去的是Mac地址+端口+服務(wù)名稱(chēng),保證了不同實(shí)例的全局唯一
List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());
if (modChannelKeys.isEmpty()) {
// record exception
throw new RpcException("offline mod.");
}
MessageDto respMsg =
rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));
if (!MessageUtils.statusOk(respMsg)) {
// 提交/回滾失敗的消息處理
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
}
} catch (RpcException e) {
// 提交/回滾通訊失敗
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
} finally {
txLogger.transactionInfo(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
}
}
}
// 當(dāng)通知出現(xiàn)異常時(shí),將信息記錄到t_tx_exception表
public class ManagerRpcExceptionHandler implements RpcExceptionHandler {
@Override
public void handleNotifyUnitMessageException(Object params, Throwable e) {
// notify unit message error, write txEx
List paramList = ((List) params);
String modName = (String) paramList.get(1);
NotifyUnitParams notifyUnitParams = (NotifyUnitParams) paramList.get(0);
WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(notifyUnitParams.getGroupId(),
notifyUnitParams.getUnitId(), modName, notifyUnitParams.getState());
writeTxExceptionReq.setRegistrar((short) 0);
compensationService.writeTxException(writeTxExceptionReq);// 記錄到t_tx_exception
// 記住客戶(hù)端主動(dòng)查詢(xún)時(shí),優(yōu)先查數(shù)據(jù)庫(kù),再查redis的事務(wù)狀態(tài)
}
}
總結(jié)
TX-LCN作為分布式解決方案是比較優(yōu)秀的方案,代碼邏輯也比較簡(jiǎn)單,但是如果應(yīng)用Crash,就可能出現(xiàn)數(shù)據(jù)不一致的情況,而且這種數(shù)據(jù)不一致的情況必須人肉修復(fù)。
比如主事務(wù)在進(jìn)行NotifyGroup 時(shí)出現(xiàn)RpcException 主事務(wù)會(huì)根據(jù)當(dāng)前事務(wù)的狀態(tài)進(jìn)行commit or rollback ,之后會(huì)上報(bào)TM 記錄補(bǔ)償信息,假如在記錄補(bǔ)償時(shí)失敗了(應(yīng)用在這個(gè)點(diǎn)Crash)了,那么主事務(wù)提交了,并且TM 并不能完整地協(xié)調(diào)好從事務(wù) 的全局事務(wù)狀態(tài)。
為什么需要人肉修復(fù)呢?其實(shí)從源碼上可以分析出,TX-LCN解決的場(chǎng)景時(shí)將本地的事務(wù)通過(guò)事務(wù)協(xié)調(diào)器進(jìn)行協(xié)調(diào),但是本質(zhì)上并沒(méi)有將事務(wù)分布式節(jié)點(diǎn)化,即本地事務(wù)的成功與失敗無(wú)法在不同的節(jié)點(diǎn)進(jìn)行處理。