【分布式事務(wù)Seata源碼解讀一】Server端啟動流程

實現(xiàn)分布式事務(wù)的核心要點:

  1. 事務(wù)的持久化,事務(wù)所處的各種狀態(tài)事務(wù)參與方的各種狀態(tài)都需要持久化,當實例宕機時才能基于持久化的數(shù)據(jù)對事務(wù)回滾或提交,實現(xiàn)最終一致性
  2. 定時對超時未完成事務(wù)的處理(繼續(xù)嘗試提交或回滾),即通過重試機制實現(xiàn)事務(wù)的最終一致性
  3. 分布式事務(wù)的跨服務(wù)實例傳播,當分布式事務(wù)跨多個實例時需要實現(xiàn)事務(wù)的傳播,一般需要適配不同的rpc框架
  4. 事務(wù)的隔離級別:大多數(shù)分布式事務(wù)為了性能,默認的隔離級別是讀未提交
  5. 冪等性:對于XA或者seata的AT這樣的分布式事務(wù)來說,都已經(jīng)默認實現(xiàn)了冪等性,而TCC、Saga這種接口級別實現(xiàn)的分布式事務(wù)都還需要業(yè)務(wù)開發(fā)者自己實現(xiàn)冪等性。

本片文章主要從seata-server的啟動流程的角度介紹一下seata-server的源碼,啟動流程圖如下:


Seata啟動流程.png

1. 啟動類Server

seata-server的入口類在Server類中,源碼如下:

public static void main(String[] args) throws IOException {
        //解析啟動以及配置文件的各種配置參數(shù)
        ParameterParser parameterParser = new ParameterParser(args);

        //metrics相關(guān),暫時不關(guān)心
        MetricsManager.get().init();
        // 把從配置文件中讀取到的storeMode重新寫入SystemProperty中
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
                
        //創(chuàng)建RpcServer實例,此時并沒有初始化,RpcServer負責與客戶端SDK中的TM、RM進行網(wǎng)絡(luò)通信
        RpcServer rpcServer = new RpcServer(WORKING_THREADS);
        //server port
        rpcServer.setListenPort(parameterParser.getPort());
                
        //UUIDGenerator初始化,UUIDGenerator用于生成全局事務(wù)、分支事務(wù)的id,
        //多個Server實例配置不同的ServerNode,保證id的唯一性
        UUIDGenerator.init(parameterParser.getServerNode());
                
        // SessionHodler負責事務(wù)日志(狀態(tài))的持久化存儲,
        // 當前支持file和db的存儲,集群部署模式要使用db模式
        SessionHolder.init(parameterParser.getStoreMode());
        
        // 創(chuàng)建初始化DefaultCoordinator實例,DefaultCoordinator是TC的核心事務(wù)邏輯處理類,
        // 底層包含了AT、TCC、SAGA等不同事務(wù)類型的邏輯處理。
        DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
        coordinator.init();
        rpcServer.setHandler(coordinator);
        // register ShutdownHook
        ShutdownHook.getInstance().addDisposable(coordinator);
        ShutdownHook.getInstance().addDisposable(rpcServer);

        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            XID.setIpAddress(NetUtil.getLocalIp());
        }
        XID.setPort(rpcServer.getListenPort());
    
        try {
            rpcServer.init();
        } catch (Throwable e) {
            LOGGER.error("rpcServer init error:{}", e.getMessage(), e);
            System.exit(-1);
        }

        System.exit(0);
    }

2. 解析配置

參數(shù)解析的實現(xiàn)代碼在ParameterParser類中,init方法源碼如下:

private void init(String[] args) {
        try {
            // 判斷是否運行在容器中
            boolean inContainer = this.isRunningInContainer();

            if (inContainer) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("The server is running in container.");
                }
                // 如果是運行在容器中,則從環(huán)境變量中獲取啟動配置參數(shù)
                this.seataEnv = StringUtils.trimToNull(System.getenv(ENV_SYSTEM_KEY));
                this.host = StringUtils.trimToNull(System.getenv(ENV_SEATA_IP_KEY));
                this.serverNode = NumberUtils.toLong(System.getenv(ENV_SERVER_NODE_KEY), SERVER_DEFAULT_NODE);
                this.port = NumberUtils.toInt(System.getenv(ENV_SEATA_PORT_KEY), SERVER_DEFAULT_PORT);
                this.storeMode = StringUtils.trimToNull(System.getenv(ENV_STORE_MODE_KEY));
            } else {
                // 基于JCommander獲取啟動應(yīng)用程序時配置的參數(shù),
                // JCommande通過注解、反射的方式把參數(shù)賦值到當前類的字段上。
                JCommander jCommander = JCommander.newBuilder().addObject(this).build();
                jCommander.parse(args);
                if (help) {
                    jCommander.setProgramName(PROGRAM_NAME);
                    jCommander.usage();
                    System.exit(0);
                }
            }
            if (StringUtils.isNotBlank(seataEnv)) {
                System.setProperty(ENV_PROPERTY_KEY, seataEnv);
            }
            if (StringUtils.isBlank(storeMode)) {
                // 這里牽扯到一個重要的Configuration類,ParameterParser只負責獲取ip、port、storeMode
                // 等核心參數(shù),其他的參數(shù)都是從Configuration中獲取的。
                // 這里如果沒有啟動參數(shù)沒有指定storeMode,就從Configuration類中獲取。
                storeMode = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE,
                    SERVER_DEFAULT_STORE_MODE);
            }
        } catch (ParameterException e) {
            printError(e);
        }
    }

在ParameterParser的init方法中第一次調(diào)用了ConfigurationFactory.getInstance(),初始化了一個單例的Configuration對象,Configuration負責初始化所有的其他配置參數(shù)數(shù)據(jù)信息。配置文件中的file.conf、registry.conf都是在這里被處理的。
ConfigurationFactory.getInstance方法其實就是獲取一個單例對象,核心在buildConfiguration方法中,不過在buidlConfiguration方法前,ConfigurationFactory類有一段static代碼塊會先執(zhí)行。

public static Configuration getInstance() {
        if (instance == null) {
            synchronized (Configuration.class) {
                if (instance == null) {
                    instance = buildConfiguration();
                }
            }
        }
        return instance;
    }

ConfigurationFactory有static代碼塊,下面的代碼看起來很多,其實只是從registry.conf中讀取配置信息。registry.conf中有兩個配置信息,注冊中心和配置源。registry.conf中指定其他配置項是file.conf或者是apollo等其他配置源)

static {
    String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
    if (null == seataConfigName) {
            seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
    }
    if (null == seataConfigName) {
            seataConfigName = REGISTRY_CONF_PREFIX;
    }
    String envValue = System.getProperty(ENV_PROPERTY_KEY);
    if (null == envValue) {
            envValue = System.getenv(ENV_SYSTEM_KEY);
    }
    Configuration configuration = (null == envValue) 
            ? new FileConfiguration(seataConfigName + REGISTRY_CONF_SUFFIX,false) 
            : new FileConfiguration(seataConfigName + "-" + envValue + REGISTRY_CONF_SUFFIX, false);
    Configuration extConfiguration = null;
    try {
            extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class)
                               .provide(configuration);
            if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("load Configuration:{}", extConfiguration == null 
                            ? configuration.getClass().getSimpleName()
                            : extConfiguration.getClass().getSimpleName());
            }
    } catch (EnhancedServiceNotFoundException ignore) {

    } catch (Exception e) {
            LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
    }
    CURRENT_FILE_INSTANCE = null == extConfiguration ? configuration : extConfiguration;
}

ConfigurationFactory.buildConfiguration。buildConfiguration方法主要是根據(jù)registry.conf文件中配置的其他配置項的配置源來加載更多的配置項。當前的配置源已經(jīng)支持:file、zk、apollo、nacos、etcd3等。

private static Configuration buildConfiguration() {
        // 從registry中讀取其他配置項的配置源類型
        ConfigType configType;
        String configTypeName = null;
        try {
            configTypeName = CURRENT_FILE_INSTANCE.getConfig(
                ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                    + ConfigurationKeys.FILE_ROOT_TYPE);

            if (StringUtils.isBlank(configTypeName)) {
                throw new NotSupportYetException("config type can not be null");
            }

            configType = ConfigType.getType(configTypeName);
        } catch (Exception e) {
            throw e;
        }
        // 文件的配置源方式,默認讀取file.conf文件
        if (ConfigType.File == configType) {
            String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
            String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
            Configuration configuration = new FileConfiguration(name);
            Configuration extConfiguration = null;
            try {
                extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class)
                                  .provide(configuration);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("load Configuration:{}",
                        extConfiguration == null ? configuration.getClass().getSimpleName()
                            : extConfiguration.getClass().getSimpleName());
                }
            } catch (EnhancedServiceNotFoundException ignore) {

            } catch (Exception e) {
                LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
            }
            return null == extConfiguration ? configuration : extConfiguration;
        } else {
            //通過SPI的方式加載其他配置源的實現(xiàn)類。在seata-server源代碼中可以看到
            // 很多這樣通過單例和SPI的方式來獲取對象的場景。
            return EnhancedServiceLoader.load(ConfigurationProvider.class,
                       Objects.requireNonNull(configType).name())
                   .provide();
        }
    }

3. 初始化UUIDGenerator

UUIDGenertor初始化接收一個serverNode參數(shù),UUIDGenertor當前是使用了雪花算法來生成唯一Id,該serverNode用來保證多個seata-server實例生成的唯一id不重復(fù)。

public class UUIDGenerator {

    /**
     * Generate uuid long.
     *
     * @return the long
     */
    public static long generateUUID() {
        return IdWorker.getInstance().nextId();
    }

    /**
     * Init.
     *
     * @param serverNode the server node id
     */
    public static void init(Long serverNode) {
        IdWorker.init(serverNode);
    }
}

UUIDGenerator是對IdWorker做了封裝,唯一id的核心實現(xiàn)邏輯在IdWoker類中,IdWorker是一個雪花算法實現(xiàn)的。此處的IdWorker又是一個單例

public class IdWorker
/**
     * Constructor
     *
     * @param workerId就是上面提到的ServerNode, 取值范圍在0·1023,也就是在64位的UUID中占10位
     */
    public IdWorker(long workerId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(
                String.format("worker Id can't be greater than %d or less than 0",
 maxWorkerId));
        }
        this.workerId = workerId;
    }

    /**
     * Get the next ID (the method is thread-safe)
     *
     * @return SnowflakeId
     */
    public long nextId() {
        long timestamp = timeGen();

        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format(
                "clock moved backwards.  Refusing to generate id for %d milliseconds",
 lastTimestamp - timestamp));
        }

        synchronized (this) {
            if (lastTimestamp == timestamp) {
                sequence = (sequence + 1) & sequenceMask;
                if (sequence == 0) {
                    timestamp = tilNextMillis(lastTimestamp);
                }
            } else {
                sequence = 0L;
            }
            lastTimestamp = timestamp;
        }
        // 雪花算法64位唯一id組成:
        // 第一位0 + 41位時間戳 + 10位workerId + 12位自增序列化(同一時間戳內(nèi)自增)
        return ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) 
| sequence;
    }

4. SessionHolder初始化

SessionHolder負責Session的持久化,一個Session對象對應(yīng)一個事務(wù),事務(wù)分為兩種:全局事務(wù)(GlobalSession)和分支事務(wù)(BranchSession)。
SessionHolder支持file和db兩種持久化方式,其中db支持集群模式,推薦使用db。SessionHolder中最主要的四個字段如下:

// ROOT_SESSION_MANAGER用于獲取所有的Setssion,以及Session的創(chuàng)建、更新、刪除等。
private static SessionManager ROOT_SESSION_MANAGER;
// 用于獲取、更新所有的異步commit的Session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于獲取、更新所有需要重試commit的Session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于獲取、更新所有需要重試rollback的Session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

SessionHolder的init方法

    public static void init(String mode) throws IOException {
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
        }
        StoreMode storeMode = StoreMode.get(mode);
        if (StoreMode.DB.equals(storeMode)) {
            // 這里又用到了SPI的方式加載SessionManager,其實下面獲取的四個
            // SessionManager實例都是同一個類DataBaseSessionManager的不同實例,
            // 只是給DataBaseSessionManager的構(gòu)造函數(shù)傳參不同。
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
        } else if (StoreMode.FILE.equals(storeMode)) {
            //file模式可以先不關(guān)心
            ...
        } else {
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        // reload方法對于db模式可以忽略
        reload();
    }

上面看到SessionHolder中的四個SessionManager本質(zhì)都是類DataBaseSessionManager的實例,只是給構(gòu)造函數(shù)傳參不同,看下DataBaseSessionManager的定義:

public DataBaseSessionManager(String name) {
        super();
        this.taskName = name;
}
// 根據(jù)實例的taskName來決定allSessions返回的事務(wù)列表,
// 如taskName等于ASYNC_COMMITTING_SESSION_MANAGER_NAME的就返回所有狀態(tài)為AsyncCommitting的事務(wù)。
public Collection<GlobalSession> allSessions() {
    // get by taskName
    if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
    } else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying}));
    } else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
                    GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
    } else {
            // taskName為null,則對應(yīng)ROOT_SESSION_MANAGER,即獲取所有狀態(tài)的事務(wù)
            return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
                    GlobalStatus.UnKnown, GlobalStatus.Begin,
                    GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
                    GlobalStatus.RollbackRetrying,
                    GlobalStatus.TimeoutRollbacking,
                    GlobalStatus.TimeoutRollbackRetrying,
                    GlobalStatus.AsyncCommitting}));
    }
}

5. 初始化DefaultCoordinator

DefaultCoordinator是事務(wù)協(xié)調(diào)器的核心,如:開啟、提交、回滾全局事務(wù),注冊、提交、回滾分支事務(wù)都是由DefaultCoordinator負責協(xié)調(diào)處理的。DefaultCoordinato通過RpcServer與遠程的TM、RM通信來實現(xiàn)分支事務(wù)的提交、回滾等。

public DefaultCoordinator(ServerMessageSender messageSender) {
        // 接口messageSender的實現(xiàn)類就是上文提到的RpcServer
        this.messageSender = messageSender;
        // DefaultCore封裝了AT、TCC、Saga等分布式事務(wù)模式的具體實現(xiàn)類
        this.core = new DefaultCore(messageSender);
}

//init方法初始化了5個定時器,主要用于分布式事務(wù)的重試機制,
// 因為分布式環(huán)境的不穩(wěn)定性會造成事務(wù)處于中間狀態(tài),
// 所以要通過不斷的重試機制來實現(xiàn)事務(wù)的最終一致性。
// 下面的定時器除了undoLogDelete之外,其他的定時任務(wù)默認都是1秒執(zhí)行一次。
public void init() {
       //處理處于回滾狀態(tài)可重試的事務(wù)
        retryRollbacking.scheduleAtFixedRate(() -> {
                try {
                        handleRetryRollbacking();
                } catch (Exception e) {
                        LOGGER.info("Exception retry rollbacking ... ", e);
                }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        
    //處理二階段可以重試提交的狀態(tài)可重試的事務(wù)
        retryCommitting.scheduleAtFixedRate(() -> {
                try {
                        handleRetryCommitting();
                } catch (Exception e) {
                        LOGGER.info("Exception retry committing ... ", e);
                }
        }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    //處理異步提交的事務(wù)
        asyncCommitting.scheduleAtFixedRate(() -> {
                try {
                        handleAsyncCommitting();
                } catch (Exception e) {
                        LOGGER.info("Exception async committing ... ", e);
                }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        //檢查事務(wù)的第一階段已經(jīng)超時的事務(wù),設(shè)置為TimeoutRollbacking,
        // 由其他定時任務(wù)執(zhí)行回滾操作
        timeoutCheck.scheduleAtFixedRate(() -> {
                try {
                        timeoutCheck();
                } catch (Exception e) {
                        LOGGER.info("Exception timeout checking ... ", e);
                }
        }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        // 根據(jù)unlog的保存天數(shù)調(diào)用RM刪除unlog
        undoLogDelete.scheduleAtFixedRate(() -> {
                try {
                        undoLogDelete();
                } catch (Exception e) {
                        LOGGER.info("Exception undoLog deleting ... ", e);
                }
        }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

6. 初始化RpcServer

RpcServer是基于Netty實現(xiàn)的簡化版的Rpc服務(wù)端,RpcServer初始化時主要做了兩件事:

  1. 初始化Netty,設(shè)置ChannelHandler,啟動Netty
  2. 把當前實例的IP端口注冊到注冊中心中(根據(jù)registry中的注冊中心類型以及地址配置注冊)
    public void init() {
            // 響應(yīng)Rpc客戶端的邏輯處理
        DefaultServerMessageListenerImpl defaultServerMessageListenerImpl =
            new DefaultServerMessageListenerImpl(getTransactionMessageHandler());
        defaultServerMessageListenerImpl.init();
        defaultServerMessageListenerImpl.setServerMessageSender(this);
        super.setServerMessageListener(defaultServerMessageListenerImpl);
        super.setChannelHandlers(new ServerHandler());
        super.init();
    }
        
        @Override
    public void init() {
        super.init();
        // 調(diào)用Netty初始化邏輯
        serverBootstrap.start();
    }
        
        // Netty初始化邏輯
        public void start() {
        //netty初始化邏輯
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                    nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(listenPort))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig
                                         .getChannelMaxReadIdleSeconds(), 0, 0))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (null != channelHandlers) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }

                }
            });

        try {
            ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
            LOGGER.info("Server started ... ");
            //向注冊中心注冊當前實例
            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
            initialized.set(true);
            future.channel().closeFuture().sync();
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容