1-fescar(seata)源碼分析-server端

1-fescar源碼分析-server端

一.官網(wǎng)介紹(以下截取至自官網(wǎng))

https://github.com/alibaba/fescar/wiki/%E6%A6%82%E8%A7%88

  • 1.1 定義一個(gè)分布式事務(wù)

    首先,很自然的,我們可以把一個(gè)分布式事務(wù)理解成一個(gè)包含了若干 分支事務(wù) 的 全局事務(wù)。全局事務(wù) 的職責(zé)是協(xié)調(diào)其下管轄的 分支事務(wù) 達(dá)成一致,要么一起成功提交,要么一起失敗回滾。此外,通常 分支事務(wù) 本身就是一個(gè)滿足 ACID 的 本地事務(wù)。這是我們對(duì)分布式事務(wù)結(jié)構(gòu)的基本認(rèn)識(shí),與 XA 是一致的。

    image.png
  • 定義三個(gè)組件


    image.png
  • Transaction Coordinator (TC): 事務(wù)協(xié)調(diào)器,維護(hù)全局事務(wù)的運(yùn)行狀態(tài),負(fù)責(zé)協(xié)調(diào)并驅(qū)動(dòng)全局事務(wù)的提交或回滾。

  • Transaction Manager (TM): 控制全局事務(wù)的邊界,負(fù)責(zé)開啟一個(gè)全局事務(wù),并最終發(fā)起全局提交或全局回滾的決議。

  • Resource Manager (RM): 控制分支事務(wù),負(fù)責(zé)分支注冊(cè)、狀態(tài)匯報(bào),并接收事務(wù)協(xié)調(diào)器的指令,驅(qū)動(dòng)分支(本地)事務(wù)的提交和回滾。

  • 具體配合


    image.png
  • 具體流程

  • TM 向 TC 申請(qǐng)開啟一個(gè)全局事務(wù),全局事務(wù)創(chuàng)建成功并生成一個(gè)全局唯一的 XID。
  • XID 在微服務(wù)調(diào)用鏈路的上下文中傳播。
  • RM 向 TC 注冊(cè)分支事務(wù),將其納入 XID 對(duì)應(yīng)全局事務(wù)的管轄。
  • TM 向 TC 發(fā)起針對(duì) XID 的全局提交或回滾決議。
  • TC 調(diào)度 XID 下管轄的全部分支事務(wù)完成提交或回滾請(qǐng)求。
  • 提交與回滾


    image.png

--

二、主要流程(模塊)概要

  • 1.fescar-server:TC模塊,server端啟動(dòng):fescar的主服務(wù),主要是netty服務(wù)端,接收TM的提交或者回滾消息以及向RM發(fā)送提交或者回滾消息指令等。
  • 2.fescar-tm:TM模塊,主要提供事務(wù)管理,向TC發(fā)送消息,觸發(fā)事務(wù)的提交或者回滾。
  • 3.rm-datasource: RM模塊,數(shù)據(jù)庫相關(guān),覆蓋rpc服務(wù)的執(zhí)行邏輯,備份腳本執(zhí)行前后結(jié)果,生成回滾腳本,向TC注冊(cè)事務(wù)分支,執(zhí)行正向腳本(邏輯)。
  • 4.fescar-core:核心對(duì)象打包
  • 5.fescar-spring:提供注解,保證對(duì)業(yè)務(wù)的非侵入性
1.服務(wù)啟動(dòng),通過@GlobalTransactional執(zhí)行到GlobalTransactionalInterceptor攔截器,進(jìn)入對(duì)業(yè)務(wù)邏輯進(jìn)行增強(qiáng),執(zhí)行TransactionalTemplate邏輯
2.TransactionalTemplate對(duì)全局事務(wù)的獲取、執(zhí)行業(yè)務(wù)邏輯、異常回滾、事務(wù)提交進(jìn)行統(tǒng)籌,并同時(shí)通過服務(wù)的維持channel將每個(gè)步驟與server的netty進(jìn)行交涉.
3.同時(shí)TransactionalTemplate在執(zhí)行業(yè)務(wù)邏輯的同時(shí),通過spring jdbc的覆蓋,通過直接調(diào)用RM模塊生成相關(guān)的備份及回滾腳本。
4.server接收到相關(guān)服務(wù)的消息后,觸發(fā)相關(guān)的動(dòng)作,并最終觸發(fā)RM去進(jìn)行數(shù)據(jù)庫層面的操作。

--

三、(原理)源碼分析

其實(shí)細(xì)看上面官網(wǎng)的介紹,基本能清楚fescar的大致思想。下面從源碼角度出發(fā)一步一步的將流程以代碼的形式讀完。

3.1 demo
  • 先看下官網(wǎng)的結(jié)構(gòu)圖:


    image.png
項(xiàng)目中存在官方的example模塊,里面就模擬了上圖的相關(guān)流程:先回到本節(jié)主題:server
3.2.TC server
  • 3.2.1.Server服務(wù)端啟動(dòng)

    public class Server {
    
        private static final ThreadPoolExecutor WORKING_THREADS = new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, new LinkedBlockingQueue(20000), new ThreadPoolExecutor.CallerRunsPolicy());
        public static void main(String[] args) throws IOException {
    
            // netty實(shí)現(xiàn)的一個(gè)簡(jiǎn)單的rpc服務(wù)端
            RpcServer rpcServer = new RpcServer(WORKING_THREADS);
            
            int port = 8091;
            if (args.length == 0) {
                rpcServer.setListenPort(port);
            }
    
            if (args.length > 0) {
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
                    System.err.println("Usage: sh fescar-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA");
                    System.exit(0);
                }
                rpcServer.setListenPort(port);
            }
            
            String dataDir = null;
            if (args.length > 1) {
                dataDir = args[1];
            }
    
            /**
             * SessionHolder主要負(fù)責(zé)事務(wù)信息存儲(chǔ),對(duì)應(yīng)的ROOT_SESSION_MANAGER,ASYNC_COMMITTING_SESSION_MANAGER,RETRY_COMMITTING_SESSION_MANAGER,RETRY_ROLLBACKING_SESSION_MANAGER分別對(duì)應(yīng)相應(yīng)的文件存儲(chǔ)到本地
             */
            SessionHolder.init(dataDir);
    
            //協(xié)調(diào)者初始化
            /**
             * DefaultCoordinator 繼承至 AbstractTCInboundHandler, AbstractTCInboundHandler handle方法會(huì)調(diào)用doGlobalRollback()方法,doGlobalRollback()方法會(huì)調(diào)用DefaultCore的rollback()
             */
            DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
            coordinator.init();
            rpcServer.setHandler(new DefaultCoordinator(rpcServer));
    
            //全局事務(wù)發(fā)號(hào)器初始化
            UUIDGenerator.init(1);
    
            XID.setIpAddress(NetUtil.getLocalIp());
            XID.setPort(rpcServer.getListenPort());
    
            /**
             * 完成服務(wù)的啟動(dòng)及監(jiān)聽
             * RpcServer啟動(dòng)之后就會(huì)監(jiān)聽來自TM的消息,如上代碼所示,在開啟RpcServer之前會(huì)注冊(cè)一個(gè)DefaultServerMessageListenerImpl用于對(duì)TM發(fā)過來的消息進(jìn)行監(jiān)聽。
             * */
            rpcServer.init();
            System.exit(0);
        }
    }
    
  • RpcServer就是netty一個(gè)服務(wù)端

    public RpcServer(ThreadPoolExecutor messageExecutor) {
        super(new NettyServerConfig(), messageExecutor);
    }
    
    #AbstractRpcRemotingServer
    public AbstractRpcRemotingServer(final NettyServerConfig nettyServerConfig,
                                     final ThreadPoolExecutor messageExecutor, final ChannelHandler... handlers) {
        super(messageExecutor);
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        if (NettyServerConfig.enableEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                    nettyServerConfig.getServerWorkerThreads()));
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                    nettyServerConfig.getServerWorkerThreads()));
        }
        if (null != handlers) {
            channelHandlers = handlers;
        }
        // init listenPort in constructor so that getListenPort() will always get the exact port
        setListenPort(nettyServerConfig.getDefaultListenPort());
    }
    

    看到這里就是netty服務(wù)端構(gòu)造的慣用邏輯了。

  • 緊接著初始化SessionHolder:

    SessionHolder.init(dataDir);
    
    # SessionHolder
    public static void init(String sessionStorePath) throws IOException {
        if (sessionStorePath == null) {
            ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        } else {
            if (!sessionStorePath.endsWith("/")) {
                sessionStorePath = sessionStorePath + "/";
            }
            ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        }
    }
    

    無非就是暫存各種DefaultSessionManager,共后續(xù)任務(wù)輪訓(xùn)時(shí)使用。DefaultSessionManager繼承至
    AbstractSessionManager,看下面父類屬性sessionMap,無非就是管理GlobalSession了。
    而GlobalSession就是server根據(jù)XID緩存全局事務(wù)的對(duì)象,里面屬性branchSessions記錄了全局事務(wù)里對(duì)應(yīng)的所有分支事務(wù),這樣就能根據(jù)TM的相關(guān)觸發(fā)消息去處理全局事務(wù),進(jìn)而處理分支事務(wù)了。

    #AbstractSessionManager
    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
    
    public class GlobalSession implements SessionLifecycle, SessionStorable {
        private private long transactionId;
        private GlobalStatus status;
        private String applicationId;
        private String transactionServiceGroup;
        private String transactionName;
        private int timeout;
        private long beginTime;
        private boolean active;
        ArrayList<BranchSession> branchSessions = new ArrayList<>();
        ...
    }
    
  • 再者初始化coordinator:

    coordinator.init();
    public void init() {
        retryRollbacking.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                }
            }
        }, 0, 5, TimeUnit.MILLISECONDS);
    
        retryCommitting.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    handleRetryCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception retry committing ... ", e);
                }
            }
        }, 0, 5, TimeUnit.MILLISECONDS);
    
        asyncCommitting.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                }
            }
        }, 0, 10, TimeUnit.MILLISECONDS);
    
        timeoutCheck.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    timeoutCheck();
                } catch (Exception e) {
                    LOGGER.info("Exception timeout checking ... ", e);
                }
            }
        }, 0, 2, TimeUnit.MILLISECONDS);
    }
    

    這里無非就是啟動(dòng)一些異步任務(wù)處理邏輯,其實(shí)這里最主要的就是commit異步任務(wù)刪除回滾日志的邏輯。因?yàn)檫@里涉及到服務(wù)的相關(guān)觸發(fā)邏輯,暫時(shí)先暫停到此。

  • 繼續(xù)rpcServer

    rpcServer.init();
    
    #init
    @Override
    public void init() {
        super.init();
        setChannelHandlers(RpcServer.this);
        DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(transactionMessageHandler);
        defaultServerMessageListenerImpl.setServerMessageSender(this);
        this.setServerMessageListener(defaultServerMessageListenerImpl);
        super.start();
    }
    
  • 1.父類初始化邏輯

  • 2.設(shè)置處理器handler

  • 3.構(gòu)造消息設(shè)置監(jiān)聽器并監(jiān)聽

  • 4.啟動(dòng)服務(wù)

    這里最重要的就是設(shè)置監(jiān)聽器了,因?yàn)檫@里關(guān)乎如何監(jiān)聽TM的事務(wù)提交或者回滾消息的。直接看實(shí)現(xiàn):

    #DefaultServerMessageListenerImpl
    @Override
    public void onTrxMessage(long msgId, ChannelHandlerContext ctx, Object message, ServerMessageSender sender) {
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:" + message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
        } else {
            messageStrings.offer(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
        }
        if (!(message instanceof AbstractMessage)) { return; }
        if (message instanceof MergedWarpMessage) {
            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()];
            for (int i = 0; i < results.length; i++) {
                final AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i);
                results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results);
            sender.sendResponse(msgId, ctx.channel(), resultMessage);
        } else if (message instanceof AbstractResultMessage) {
            transactionMessageHandler.onResponse((AbstractResultMessage)message, rpcContext);
        }
    }
    

    這里就是監(jiān)聽事務(wù)消息的觸發(fā)點(diǎn)了,看下調(diào)用鏈:

    @Override
    public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof RegisterRMRequest) {
            serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
                checkAuthHandler);
        } else {
            if (ChannelManager.isRegistered(ctx.channel())) {
                serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
            } else {
                try {
                    closeChannelHandlerContext(ctx);
                } catch (Exception exx) {
                    LOGGER.error(exx.getMessage());
                }
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
                }
            }
        }
    }
    

    繼續(xù)往上追:

    #AbstractRpcRemoting
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RpcMessage) {
            final RpcMessage rpcMessage = (RpcMessage)msg;
            if (rpcMessage.isRequest()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
                }
                try {
                    AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
                            }
                        }
                    });
                }
            }
        }
    

到了netty的channel層了,SocketChannel就是netty本身的消息讀寫通道,一切消息交換由此追溯,即netty完成通信后觸發(fā)了監(jiān)聽器邏輯,進(jìn)而繼續(xù)TC服務(wù)邏輯的處理。

  • 啟動(dòng)服務(wù)

    super.start();
    
    #AbstractRpcRemotingServer
    @Override
    public void start() {
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,
                nettyServerConfig.getWriteBufferHighWaterMark())
            .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, nettyServerConfig.getWriteBufferLowWaterMark())
            .localAddress(new InetSocketAddress(listenPort))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                        .addLast(new MessageCodecHandler());
                    if (null != channelHandlers) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
    
                }
            });
    
        if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) {
            this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR);
        }
    
        try {
            ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
            LOGGER.info("Server started ... ");
            future.channel().closeFuture().sync();
        } catch (InterruptedException exx) {
            throw new RuntimeException(exx);
        }
    }
    

慣用的netty服務(wù)啟動(dòng)方式,至此,TC Server服務(wù)啟動(dòng)OK,下面總結(jié)下。

--

四.TC Server服務(wù)端啟動(dòng)總結(jié)
  • 1.構(gòu)造RpcServer的netty服務(wù)端
  • 2.初始化SessionHolder,用于初始化各種DefaultSessionManager,DefaultSessionManager就是管理GlobalSession和BranchSession的管理器
  • 3.初始化DefaultCoordinator,DefaultCoordinator就是具體執(zhí)行提交、回滾、注冊(cè)分支事務(wù)等邏輯的類,同時(shí)啟動(dòng)異步執(zhí)行任務(wù),比如提交全局事務(wù)后,異步刪除回滾sql。
  • 4.初始化RpcServer,設(shè)置事務(wù)消息監(jiān)聽器
  • 5.啟動(dòng)RpcServer的netty服務(wù)端。

--

五.未完待續(xù)。。。

后續(xù)分析主要還是根據(jù)example官方實(shí)例分為:全局事務(wù)的獲取、事務(wù)邏輯執(zhí)行、事務(wù)回滾、事務(wù)提交進(jìn)行。
同時(shí)后續(xù)每一流程都緊密關(guān)聯(lián)Server,因此還會(huì)頻繁回到上敘server啟動(dòng)后,收到消息被觸發(fā)的后續(xù)邏輯。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容