源碼分析
本地事務(wù)部分依賴(lài)于spring事務(wù)
@TxTransactional注解和spring自帶的Transactional注解沒(méi)聯(lián)系,是手動(dòng)調(diào)用TransactionManager的一些方法來(lái)使用spring的事務(wù)的。具體是在攔截@TxTransactional注解時(shí),調(diào)用getTransaction方法中,其內(nèi)部調(diào)用doGetTransaction時(shí),若使用jdbc會(huì)將AutoCommit置為false,執(zhí)行切面方法,然后阻塞。這里就是兩階段提交中的第一階段,這時(shí)候并沒(méi)有真正的提交數(shù)據(jù)庫(kù),需要等待協(xié)調(diào)者的反饋。然后再根據(jù)協(xié)調(diào)者即raincat-manager返回的狀態(tài),手動(dòng)調(diào)用commit或者rollback方法來(lái)使用spring的事務(wù)。
事務(wù)的協(xié)調(diào)者是raincat-manager
raincat-manager也作為EurekaServer兼職了服務(wù)注冊(cè)的作用。
事務(wù)協(xié)調(diào)者和事務(wù)參與者通信是通過(guò)netty
事務(wù)參與者在發(fā)起事務(wù)、加入事務(wù)組、預(yù)提交、事務(wù)完成、回滾等操作時(shí)都會(huì)通過(guò)netty-handler發(fā)送心跳數(shù)據(jù)。協(xié)調(diào)者即raincat-manager在收到心跳包后會(huì)判斷行為來(lái)進(jìn)行協(xié)調(diào)的操作,并回發(fā)心跳來(lái)決定參與者的下一步操作。
事務(wù)組的管理部分全部在協(xié)調(diào)者即raincat-manager。比如說(shuō),事務(wù)發(fā)起者在執(zhí)行本地業(yè)務(wù)前會(huì)發(fā)送CREATE_GROUP的心跳給協(xié)調(diào)者,協(xié)調(diào)者在收到心跳包后會(huì)創(chuàng)建事務(wù)組,若是創(chuàng)建失敗則會(huì)通知事務(wù)發(fā)起者不必執(zhí)行本地業(yè)務(wù),若是創(chuàng)建成功則通知事務(wù)發(fā)起者可以進(jìn)行下一步操作。加入事務(wù)組之類(lèi)的邏輯同理。
協(xié)調(diào)者根據(jù)事務(wù)組來(lái)協(xié)調(diào)事務(wù)所有的參與者
事務(wù)發(fā)起者會(huì)在業(yè)務(wù)代碼執(zhí)行前生成一個(gè)事務(wù)組Id,將其放入ThreadLocal中,在使用rpc時(shí),因?yàn)槭峭粋€(gè)線程,可以從ThreadLocal中獲取事務(wù)組Id,再將事務(wù)組Id放進(jìn)請(qǐng)求頭,調(diào)用其他服務(wù)時(shí)在rpc的提供者即事務(wù)的參與者就可以獲取同一個(gè)事務(wù)組Id,這樣就可以將發(fā)起者和參與者放進(jìn)同一個(gè)事務(wù)組,事務(wù)管理器會(huì)協(xié)調(diào)事務(wù)組的整體提交。
分布式事務(wù)發(fā)起者在使用rpc組件調(diào)用其他服務(wù)時(shí),會(huì)將生成的事務(wù)Id放在請(qǐng)求頭中,主要代碼是在RpcMediator類(lèi)中,通過(guò)transmit方法添加,通過(guò)aquire方法獲取。不同的rpc放入的位置不同,若是feign就是通過(guò)RequestInterceprot放在requestTemplate::header,若是dubbo就是通過(guò)Filter放在RpcContext.getContext()::setAttachment。在服務(wù)提供端會(huì)在調(diào)用業(yè)務(wù)代碼前從請(qǐng)求頭中取出,具體代碼是在各個(gè)rpc對(duì)TxTransactionInterceptor的實(shí)現(xiàn)中。
通過(guò)是否存在txGroupId來(lái)判斷當(dāng)前服務(wù)是事務(wù)的發(fā)起者還是事務(wù)的參與者
因?yàn)閠xGroupId是事務(wù)發(fā)起者創(chuàng)建的,因此在請(qǐng)求事務(wù)發(fā)起者時(shí),請(qǐng)求頭中不存在txGroupId,而發(fā)起者在調(diào)用參與者時(shí)將txGroupId放進(jìn)了請(qǐng)求頭。具體的判斷代碼在TxTransactionFactoryServiceImpl.factoryOf(txTransactionInfo)。
參與者使用ReentrantLock和Condition來(lái)阻塞自己
參與者在發(fā)送心跳包后會(huì)阻塞自己,因?yàn)樾枰却齾f(xié)調(diào)者的反饋。得到協(xié)調(diào)者的反饋后會(huì)根據(jù)回寫(xiě)的心跳包中的key找到對(duì)應(yīng)的鎖,解鎖后繼續(xù)執(zhí)行。鎖的實(shí)現(xiàn)代碼是在BlockTask類(lèi)中,鎖的使用代碼主要是在NettyClientMessageHandler.sendTxManagerMessage(heartBeat)
事務(wù)參與者的提交和回滾需要獲取到對(duì)應(yīng)的Channel
事務(wù)發(fā)起者在發(fā)起事務(wù)或事務(wù)參與者在加入事務(wù)組時(shí)會(huì)將自己的remoteAddress保存起來(lái)。事務(wù)協(xié)調(diào)者在發(fā)起提交或者回滾時(shí)會(huì)根據(jù)上面保存的remoteAddress找到對(duì)應(yīng)的Channel,代碼在SocketManager.getChannelByModelName(name),找到對(duì)應(yīng)的Channel后就會(huì)向?qū)?yīng)的參與者的Channel發(fā)送心跳包。
攔截器部分
因?yàn)橐谡{(diào)用業(yè)務(wù)方法前找到當(dāng)前操作所在的事務(wù)組,并將當(dāng)前操作加入事務(wù)組,因此在最外面對(duì)于TxTransaction的Aspect攔截部分就做了rpc的抽象,要根據(jù)特定的rpc找到請(qǐng)求頭中的事務(wù)組。
不同的rpc實(shí)現(xiàn)通過(guò)spi調(diào)用。
要分清客戶(hù)端和服務(wù)端
客戶(hù)端只連一個(gè)服務(wù)端,因此在NetyClientMessageHandler中對(duì)當(dāng)前連接的服務(wù)端的ctx做了保存,在和服務(wù)端主動(dòng)通信時(shí)直接使用這個(gè)ctx來(lái)write。
源碼疑問(wèn)
在org.dromara.raincat.core.service.impl.TxManagerLocator中存儲(chǔ)對(duì)象的部分使用的是AtomicReference。這里的修改部分又加了鎖,如果不用原子類(lèi),直接用volatile不可以嗎。