1-fescar源碼分析-server端
一.官網(wǎng)介紹(以下截取至自官網(wǎng))
-
1.1 定義一個(gè)分布式事務(wù)
首先,很自然的,我們可以把一個(gè)分布式事務(wù)理解成一個(gè)包含了若干 分支事務(wù) 的 全局事務(wù)。全局事務(wù) 的職責(zé)是協(xié)調(diào)其下管轄的 分支事務(wù) 達(dá)成一致,要么一起成功提交,要么一起失敗回滾。此外,通常 分支事務(wù) 本身就是一個(gè)滿足 ACID 的 本地事務(wù)。這是我們對(duì)分布式事務(wù)結(jié)構(gòu)的基本認(rèn)識(shí),與 XA 是一致的。
image.png
-
定義三個(gè)組件
image.png
Transaction Coordinator (TC): 事務(wù)協(xié)調(diào)器,維護(hù)全局事務(wù)的運(yùn)行狀態(tài),負(fù)責(zé)協(xié)調(diào)并驅(qū)動(dòng)全局事務(wù)的提交或回滾。
Transaction Manager (TM): 控制全局事務(wù)的邊界,負(fù)責(zé)開啟一個(gè)全局事務(wù),并最終發(fā)起全局提交或全局回滾的決議。
Resource Manager (RM): 控制分支事務(wù),負(fù)責(zé)分支注冊(cè)、狀態(tài)匯報(bào),并接收事務(wù)協(xié)調(diào)器的指令,驅(qū)動(dòng)分支(本地)事務(wù)的提交和回滾。
-
具體配合
image.png 具體流程
- TM 向 TC 申請(qǐng)開啟一個(gè)全局事務(wù),全局事務(wù)創(chuàng)建成功并生成一個(gè)全局唯一的 XID。
- XID 在微服務(wù)調(diào)用鏈路的上下文中傳播。
- RM 向 TC 注冊(cè)分支事務(wù),將其納入 XID 對(duì)應(yīng)全局事務(wù)的管轄。
- TM 向 TC 發(fā)起針對(duì) XID 的全局提交或回滾決議。
- TC 調(diào)度 XID 下管轄的全部分支事務(wù)完成提交或回滾請(qǐng)求。
-
提交與回滾
image.png
--
二、主要流程(模塊)概要
- 1.fescar-server:TC模塊,server端啟動(dòng):fescar的主服務(wù),主要是netty服務(wù)端,接收TM的提交或者回滾消息以及向RM發(fā)送提交或者回滾消息指令等。
- 2.fescar-tm:TM模塊,主要提供事務(wù)管理,向TC發(fā)送消息,觸發(fā)事務(wù)的提交或者回滾。
- 3.rm-datasource: RM模塊,數(shù)據(jù)庫相關(guān),覆蓋rpc服務(wù)的執(zhí)行邏輯,備份腳本執(zhí)行前后結(jié)果,生成回滾腳本,向TC注冊(cè)事務(wù)分支,執(zhí)行正向腳本(邏輯)。
- 4.fescar-core:核心對(duì)象打包
- 5.fescar-spring:提供注解,保證對(duì)業(yè)務(wù)的非侵入性
1.服務(wù)啟動(dòng),通過@GlobalTransactional執(zhí)行到GlobalTransactionalInterceptor攔截器,進(jìn)入對(duì)業(yè)務(wù)邏輯進(jìn)行增強(qiáng),執(zhí)行TransactionalTemplate邏輯
2.TransactionalTemplate對(duì)全局事務(wù)的獲取、執(zhí)行業(yè)務(wù)邏輯、異常回滾、事務(wù)提交進(jìn)行統(tǒng)籌,并同時(shí)通過服務(wù)的維持channel將每個(gè)步驟與server的netty進(jìn)行交涉.
3.同時(shí)TransactionalTemplate在執(zhí)行業(yè)務(wù)邏輯的同時(shí),通過spring jdbc的覆蓋,通過直接調(diào)用RM模塊生成相關(guān)的備份及回滾腳本。
4.server接收到相關(guān)服務(wù)的消息后,觸發(fā)相關(guān)的動(dòng)作,并最終觸發(fā)RM去進(jìn)行數(shù)據(jù)庫層面的操作。
--
三、(原理)源碼分析
其實(shí)細(xì)看上面官網(wǎng)的介紹,基本能清楚fescar的大致思想。下面從源碼角度出發(fā)一步一步的將流程以代碼的形式讀完。
3.1 demo
-
先看下官網(wǎng)的結(jié)構(gòu)圖:
image.png
項(xiàng)目中存在官方的example模塊,里面就模擬了上圖的相關(guān)流程:先回到本節(jié)主題:server
3.2.TC server
-
3.2.1.Server服務(wù)端啟動(dòng)
public class Server { private static final ThreadPoolExecutor WORKING_THREADS = new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, new LinkedBlockingQueue(20000), new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) throws IOException { // netty實(shí)現(xiàn)的一個(gè)簡(jiǎn)單的rpc服務(wù)端 RpcServer rpcServer = new RpcServer(WORKING_THREADS); int port = 8091; if (args.length == 0) { rpcServer.setListenPort(port); } if (args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { System.err.println("Usage: sh fescar-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA"); System.exit(0); } rpcServer.setListenPort(port); } String dataDir = null; if (args.length > 1) { dataDir = args[1]; } /** * SessionHolder主要負(fù)責(zé)事務(wù)信息存儲(chǔ),對(duì)應(yīng)的ROOT_SESSION_MANAGER,ASYNC_COMMITTING_SESSION_MANAGER,RETRY_COMMITTING_SESSION_MANAGER,RETRY_ROLLBACKING_SESSION_MANAGER分別對(duì)應(yīng)相應(yīng)的文件存儲(chǔ)到本地 */ SessionHolder.init(dataDir); //協(xié)調(diào)者初始化 /** * DefaultCoordinator 繼承至 AbstractTCInboundHandler, AbstractTCInboundHandler handle方法會(huì)調(diào)用doGlobalRollback()方法,doGlobalRollback()方法會(huì)調(diào)用DefaultCore的rollback() */ DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(new DefaultCoordinator(rpcServer)); //全局事務(wù)發(fā)號(hào)器初始化 UUIDGenerator.init(1); XID.setIpAddress(NetUtil.getLocalIp()); XID.setPort(rpcServer.getListenPort()); /** * 完成服務(wù)的啟動(dòng)及監(jiān)聽 * RpcServer啟動(dòng)之后就會(huì)監(jiān)聽來自TM的消息,如上代碼所示,在開啟RpcServer之前會(huì)注冊(cè)一個(gè)DefaultServerMessageListenerImpl用于對(duì)TM發(fā)過來的消息進(jìn)行監(jiān)聽。 * */ rpcServer.init(); System.exit(0); } } -
RpcServer就是netty一個(gè)服務(wù)端
public RpcServer(ThreadPoolExecutor messageExecutor) { super(new NettyServerConfig(), messageExecutor); } #AbstractRpcRemotingServer public AbstractRpcRemotingServer(final NettyServerConfig nettyServerConfig, final ThreadPoolExecutor messageExecutor, final ChannelHandler... handlers) { super(messageExecutor); this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; if (NettyServerConfig.enableEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } if (null != handlers) { channelHandlers = handlers; } // init listenPort in constructor so that getListenPort() will always get the exact port setListenPort(nettyServerConfig.getDefaultListenPort()); }看到這里就是netty服務(wù)端構(gòu)造的慣用邏輯了。
-
緊接著初始化SessionHolder:
SessionHolder.init(dataDir); # SessionHolder public static void init(String sessionStorePath) throws IOException { if (sessionStorePath == null) { ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME); ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME); RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME); RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME); } else { if (!sessionStorePath.endsWith("/")) { sessionStorePath = sessionStorePath + "/"; } ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath); ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME); RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME); RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME); } }無非就是暫存各種DefaultSessionManager,共后續(xù)任務(wù)輪訓(xùn)時(shí)使用。DefaultSessionManager繼承至
AbstractSessionManager,看下面父類屬性sessionMap,無非就是管理GlobalSession了。
而GlobalSession就是server根據(jù)XID緩存全局事務(wù)的對(duì)象,里面屬性branchSessions記錄了全局事務(wù)里對(duì)應(yīng)的所有分支事務(wù),這樣就能根據(jù)TM的相關(guān)觸發(fā)消息去處理全局事務(wù),進(jìn)而處理分支事務(wù)了。#AbstractSessionManager protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();public class GlobalSession implements SessionLifecycle, SessionStorable { private private long transactionId; private GlobalStatus status; private String applicationId; private String transactionServiceGroup; private String transactionName; private int timeout; private long beginTime; private boolean active; ArrayList<BranchSession> branchSessions = new ArrayList<>(); ... }
-
再者初始化coordinator:
coordinator.init(); public void init() { retryRollbacking.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { handleRetryRollbacking(); } catch (Exception e) { LOGGER.info("Exception retry rollbacking ... ", e); } } }, 0, 5, TimeUnit.MILLISECONDS); retryCommitting.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { handleRetryCommitting(); } catch (Exception e) { LOGGER.info("Exception retry committing ... ", e); } } }, 0, 5, TimeUnit.MILLISECONDS); asyncCommitting.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { handleAsyncCommitting(); } catch (Exception e) { LOGGER.info("Exception async committing ... ", e); } } }, 0, 10, TimeUnit.MILLISECONDS); timeoutCheck.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { timeoutCheck(); } catch (Exception e) { LOGGER.info("Exception timeout checking ... ", e); } } }, 0, 2, TimeUnit.MILLISECONDS); }這里無非就是啟動(dòng)一些異步任務(wù)處理邏輯,其實(shí)這里最主要的就是commit異步任務(wù)刪除回滾日志的邏輯。因?yàn)檫@里涉及到服務(wù)的相關(guān)觸發(fā)邏輯,暫時(shí)先暫停到此。
-
繼續(xù)rpcServer
rpcServer.init(); #init @Override public void init() { super.init(); setChannelHandlers(RpcServer.this); DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(transactionMessageHandler); defaultServerMessageListenerImpl.setServerMessageSender(this); this.setServerMessageListener(defaultServerMessageListenerImpl); super.start(); }
1.父類初始化邏輯
2.設(shè)置處理器handler
3.構(gòu)造消息設(shè)置監(jiān)聽器并監(jiān)聽
-
4.啟動(dòng)服務(wù)
這里最重要的就是設(shè)置監(jiān)聽器了,因?yàn)檫@里關(guān)乎如何監(jiān)聽TM的事務(wù)提交或者回滾消息的。直接看實(shí)現(xiàn):
#DefaultServerMessageListenerImpl @Override public void onTrxMessage(long msgId, ChannelHandlerContext ctx, Object message, ServerMessageSender sender) { RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("server received:" + message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup()); } else { messageStrings.offer(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup()); } if (!(message instanceof AbstractMessage)) { return; } if (message instanceof MergedWarpMessage) { AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()]; for (int i = 0; i < results.length; i++) { final AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i); results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext); } MergeResultMessage resultMessage = new MergeResultMessage(); resultMessage.setMsgs(results); sender.sendResponse(msgId, ctx.channel(), resultMessage); } else if (message instanceof AbstractResultMessage) { transactionMessageHandler.onResponse((AbstractResultMessage)message, rpcContext); } }這里就是監(jiān)聽事務(wù)消息的觸發(fā)點(diǎn)了,看下調(diào)用鏈:
@Override public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) { if (msg instanceof RegisterRMRequest) { serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this, checkAuthHandler); } else { if (ChannelManager.isRegistered(ctx.channel())) { serverMessageListener.onTrxMessage(msgId, ctx, msg, this); } else { try { closeChannelHandlerContext(ctx); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } if (LOGGER.isInfoEnabled()) { LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString())); } } } }繼續(xù)往上追:
#AbstractRpcRemoting @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcMessage) { final RpcMessage rpcMessage = (RpcMessage)msg; if (rpcMessage.isRequest()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } try { AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() { @Override public void run() { try { dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody()); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th); } } }); } } }
到了netty的channel層了,SocketChannel就是netty本身的消息讀寫通道,一切消息交換由此追溯,即netty完成通信后觸發(fā)了監(jiān)聽器邏輯,進(jìn)而繼續(xù)TC服務(wù)邏輯的處理。
-
啟動(dòng)服務(wù)
super.start(); #AbstractRpcRemotingServer @Override public void start() { this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ) .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, nettyServerConfig.getWriteBufferHighWaterMark()) .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, nettyServerConfig.getWriteBufferLowWaterMark()) .localAddress(new InetSocketAddress(listenPort)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) .addLast(new MessageCodecHandler()); if (null != channelHandlers) { addChannelPipelineLast(ch, channelHandlers); } } }); if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) { this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR); } try { ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); LOGGER.info("Server started ... "); future.channel().closeFuture().sync(); } catch (InterruptedException exx) { throw new RuntimeException(exx); } }
慣用的netty服務(wù)啟動(dòng)方式,至此,TC Server服務(wù)啟動(dòng)OK,下面總結(jié)下。
--
四.TC Server服務(wù)端啟動(dòng)總結(jié)
- 1.構(gòu)造RpcServer的netty服務(wù)端
- 2.初始化SessionHolder,用于初始化各種DefaultSessionManager,DefaultSessionManager就是管理GlobalSession和BranchSession的管理器
- 3.初始化DefaultCoordinator,DefaultCoordinator就是具體執(zhí)行提交、回滾、注冊(cè)分支事務(wù)等邏輯的類,同時(shí)啟動(dòng)異步執(zhí)行任務(wù),比如提交全局事務(wù)后,異步刪除回滾sql。
- 4.初始化RpcServer,設(shè)置事務(wù)消息監(jiān)聽器
- 5.啟動(dòng)RpcServer的netty服務(wù)端。
--
五.未完待續(xù)。。。
后續(xù)分析主要還是根據(jù)example官方實(shí)例分為:全局事務(wù)的獲取、事務(wù)邏輯執(zhí)行、事務(wù)回滾、事務(wù)提交進(jìn)行。
同時(shí)后續(xù)每一流程都緊密關(guān)聯(lián)Server,因此還會(huì)頻繁回到上敘server啟動(dòng)后,收到消息被觸發(fā)的后續(xù)邏輯。




