kettle源碼分析之資源庫初始化流程

kettle源碼分析 :本次源碼分析 基于 kettle v4.0 分析:

背景:
因最近新增一個需求,需要將原來在 windows平臺上的 kettle應用 遷移到 linux 上,并且 進行 定時調(diào)度,新增前臺管理頁面,可對任務進行 動態(tài)更新,增加,刪除等操作外,還需 設置任務的調(diào)度時間 !

初期遇到的問題:
1.版本選擇:
因 原來windows應用,是選用當時的經(jīng)典版4.4.x版本,目前,kettle 最新版本已更新到8.x,是否仍然選擇4.x版本?

2.數(shù)據(jù)資源庫選擇:
既是遷移,遷移在linux平臺后,當然是以數(shù)據(jù)庫的方式來保存資源!
但 原應用中任務以".kjb",".ktr"等方式保存在磁盤上,任務數(shù)量眾多,以數(shù)據(jù)庫的方式尚無法完全兼容!

3...

4.涉及到發(fā)送郵件,數(shù)據(jù)導出所需的各種依賴 等等,包含windows 和 linux 不同平臺所依賴的不同的jar(這個有點坑,早起kettle的版本依賴) 。

當然,最終的選擇,還是得看老板的選擇,基于對業(yè)務的請求,并發(fā),實際使用的場景,后續(xù)任務的數(shù)量以及 任務的種類 等綜合因素考量!
比如,執(zhí)行一個 "生成日結(jié)" 的任務,可能需要耗費的時間相對較長達到數(shù)小時!

首先對kettle的大致結(jié)構,功能模塊大致熟悉后,當然需要對其 插件注冊原理,資源庫加載初始化等流程需要有一個詳細的了解(后續(xù)很多功能都需要涉及到);

源碼版本,因當前 windows 運行的版本為4.x,所以目前的分析版本也為 kettle 4.x(kettle后續(xù)的幾次更新,改動都較大);

整個源碼結(jié)構 并不算復雜,相對 jstorm內(nèi)核,netty 源碼 而言,其 抽象結(jié)構 還是比較容易看懂的!
插件注冊,資源庫初始化 整個流程還是比較簡單的!
資源庫初始化流程:
一:環(huán)境初始化;
1.1 此處后面分析,kettle環(huán)境初始化,其實就是 初始化 kettle插件模板后,加載指定的各類插件,插件模式!

二:資源庫元數(shù)據(jù):

image.png

本地磁盤資源庫KettleFileRepositoryMeta 實現(xiàn)了本地文件解析的 getXML()和 loadXML(,),其與 數(shù)據(jù)庫資源庫不同的地方之一!

2.1 無參構造方法

public static String REPOSITORY_TYPE_ID = "KettleFileRepository";
public KettleFileRepositoryMeta() {
    super(REPOSITORY_TYPE_ID);
}

REPOSITORY_TYPE_ID:資源庫ID,默認為"KettleFileRepository" ;
作用:在 生成JobMeta時,"*.kjb"文件結(jié)構 的 解析,XMLHandler ;

2.2 有參構造方法

public KettleFileRepositoryMeta(String id, String name, String description, String baseDirectory) {
    super(id, name, description);
    this.baseDirectory = baseDirectory;
}

參數(shù)解釋:
id: 資源庫Id;
name: 資源庫名稱;
description: 資源庫描述;
dir: 資源庫目錄;
2.2.1 id,name,description作用:
在 加載 kjb文件為document,最終轉(zhuǎn)換為InputStream時 !
在 將kjb文件 轉(zhuǎn)換為xml文檔時!
(后面分析 jobMeta 時會詳細介紹 ! )
2.2.2 dir:

2.3 獲取資源庫信息,以XML的方式;

public String getXML() {
    StringBuffer retval = new StringBuffer(100);
    retval.append("  ").append(XMLHandler.openTag("repository"));
    retval.append(super.getXML());
    retval.append("    ").append(XMLHandler.addTagValue("base_directory", this.baseDirectory));
    retval.append("    ").append(XMLHandler.addTagValue("read_only", this.readOnly));
    retval.append("    ").append(XMLHandler.addTagValue("hides_hidden_files", this.hidingHiddenFiles));
    retval.append("  ").append(XMLHandler.closeTag("repository"));
    return retval.toString();
}

基本邏輯 將 所有的解析實現(xiàn)委托給 XMLHandler 去實現(xiàn)(后面分析XMLhandler)!

2.4 加載 本地文件資源庫,以文檔的方式!

public void loadXML(Node repnode, List<DatabaseMeta> databases) throws KettleException {
    super.loadXML(repnode, databases);

    try {
        this.baseDirectory = XMLHandler.getTagValue(repnode, "base_directory");
        this.readOnly = "Y".equalsIgnoreCase(XMLHandler.getTagValue(repnode, "read_only"));
        this.hidingHiddenFiles = "Y".equalsIgnoreCase(XMLHandler.getTagValue(repnode, "hides_hidden_files"));
    } catch (Exception var4) {
        throw new KettleException("Unable to load Kettle file repository meta object", var4);
    }
}

KettleFileRepositoryMeta的整體結(jié)構還是較為簡單,

2.5 初始化本地資源庫信息;

public void init(RepositoryMeta repositoryMeta) {
    this.serviceMap = new HashMap();
    this.serviceList = new ArrayList();
    this.repositoryMeta = (KettleFileRepositoryMeta)repositoryMeta;
    this.securityProvider = new KettleFileRepositorySecurityProvider(repositoryMeta);
    this.serviceMap.put(RepositorySecurityProvider.class, this.securityProvider);
    this.serviceList.add(RepositorySecurityProvider.class);
    this.log = new LogChannel(this);
}

初始化流程:
serviceList :提供安全檢查的接口,此處 使用 RepositorySecurityProvider.class,可根據(jù)具體的自定義實現(xiàn) 安全檢查的接口邏輯!
serviceMap :在對 資源庫 操作時,進行的安全檢查!
主要做了如下幾件事:
1.初始化 repositoryMeta ;
2.定義安全檢查接口,可以自定義安全檢查實現(xiàn) !
3.配置日志輸出信息 !

2.6 一般方法

public LogChannelInterface getLog() {
    return this.log;//獲取資源庫日志輸出信息,可搭配 websocket 顯示在前臺上!
}

public boolean isConnected() {
    return true;//此為數(shù)據(jù)庫鏈接的方式,此處做一般實現(xiàn),無實際意義;
}

public RepositorySecurityProvider getSecurityProvider() {
    return this.securityProvider;//執(zhí)行對資源庫的操作前,判斷資源庫的特征,比如是否只讀等!
}

2.7 加載 資源庫目錄 結(jié)構

public RepositoryDirectoryInterface loadRepositoryDirectoryTree() throws KettleException {
        //初始化時,dir and name is all null;
        RepositoryDirectory root = new RepositoryDirectory();
        //設置 root 目錄為 "/" ;
        root.setObjectId(new StringObjectId("/"));
        return this.loadRepositoryDirectoryTree(root);
    }

    
    
    public RepositoryDirectoryInterface loadRepositoryDirectoryTree(RepositoryDirectoryInterface dir) throws KettleException {
            //獲取 本地資源庫的 目錄名字
            //如: E://kettle/data-->E://kettle/data/
            String folderName = this.calcDirectoryName(dir);
            //VFS加載 資源庫目錄下的所有文件,包括子目錄(不包括子目錄下的文件);
            FileObject folder = KettleVFS.getFileObject(folderName);
            //獲取資源庫目錄下的所有的文件對象
            FileObject[] arr$ = folder.getChildren();
            int len$ = arr$.length;
                
            for(int i$ = 0; i$ < len$; ++i$) {
                FileObject child = arr$[i$];
                //判斷文件類型 以及 文件屬性是否 hidden ,
                if (child.getType().equals(FileType.FOLDER) && (!child.isHidden() || !this.repositoryMeta.isHidingHiddenFiles())) {
                    //獲取文件名稱 或者 子目錄名稱,遞歸使用
                    // child.getName().getBaseName():獲取文件名稱或 子目錄名
                    RepositoryDirectory subDir = new RepositoryDirectory(dir, child.getName().getBaseName());
                    subDir.setObjectId(new StringObjectId(this.calcObjectId((RepositoryDirectoryInterface)subDir)));
                    dir.addSubdirectory(subDir);
                    
                    //遞歸調(diào)用,遍歷子目錄下的文件
                    this.loadRepositoryDirectoryTree(subDir);
                }
            }

            return dir;
    }

加載格式化目錄: 指定目錄下的 文件 路徑:相對路徑"subJect/fzaccSub.kjb";

private String calcDirectoryName(RepositoryDirectoryInterface dir) {
    StringBuilder directory = new StringBuilder();
    String baseDir = this.repositoryMeta.getBaseDirectory();
    baseDir = Const.replace(baseDir, "\\", "/");
    directory.append(baseDir);
    if (!baseDir.endsWith("/")) {
        directory.append("/");
    }

    if (dir != null) {
        String path = this.calcRelativeElementDirectory(dir);
        if (path.startsWith("/")) {
            directory.append(path.substring(1));
        } else {
            //指定目錄下的 文件 路徑;
            directory.append(path);
        }

        if (!path.endsWith("/")) {
            directory.append("/");
        }
    }

    return directory.toString();
}

2.7.3 獲取絕對 資源庫 根據(jù)指定文件路徑下的 格式化后的路徑:

public String calcObjectId(RepositoryDirectoryInterface dir) {
    StringBuilder id = new StringBuilder();
    String path = this.calcRelativeElementDirectory(dir);
    id.append(path);
    if (!path.endsWith("/")) {
        id.append("/");
    }

    return id.toString();
}

2.7.4 根據(jù)指定的路徑判斷 是否存在資源庫中

public boolean exists(String name, RepositoryDirectoryInterface repositoryDirectory, RepositoryObjectType objectType) throws KettleException {
    try {
        FileObject fileObject = KettleVFS.getFileObject(this.calcFilename(repositoryDirectory, name, objectType.getExtension()));
        return fileObject.exists();
    } catch (Exception var5) {
        throw new KettleException(var5);
    }
}

2.8 保存任務文件

public void save(RepositoryElementInterface repositoryElement, String versionComment, ProgressMonitorListener monitor, ObjectId parentId, boolean used) throws KettleException {
    try {
        if (!(repositoryElement instanceof XMLInterface) && !(repositoryElement instanceof SharedObjectInterface)) {
            throw new KettleException("Class [" + repositoryElement.getClass().getName() + "] needs to implement the XML Interface in order to save it to disk");
        } else {
            if (!Const.isEmpty(versionComment)) {
                this.insertLogEntry(versionComment);
            }

            ObjectId objectId = new StringObjectId(this.calcObjectId(repositoryElement));
            FileObject fileObject = this.getFileObject(repositoryElement);
            //JobMeta,TransMeta繼承XMLInterface的原因
            String xml = ((XMLInterface)repositoryElement).getXML();
            OutputStream os = KettleVFS.getOutputStream(fileObject, false);
            os.write(xml.getBytes("UTF-8"));
            os.close();
            if (repositoryElement instanceof ChangedFlagInterface) {
                //復位狀態(tài)位
                ((ChangedFlagInterface)repositoryElement).clearChanged();
            }

            if (repositoryElement.getObjectId() != null && !repositoryElement.getObjectId().equals(objectId)) {
                //刪除舊的文件名稱
                this.delObject(repositoryElement.getObjectId());
            }

            repositoryElement.setObjectId(objectId);
        }
    } catch (Exception var10) {
        throw new KettleException("Unable to save repository element [" + repositoryElement + "] to XML file : " + this.calcFilename(repositoryElement), var10);
    }
}

2.9 根據(jù) 指定的 路徑 創(chuàng)建資源庫的文件路徑

public RepositoryDirectoryInterface createRepositoryDirectory(RepositoryDirectoryInterface parentDirectory, String directoryPath) throws KettleException {
    String folder = this.calcDirectoryName(parentDirectory);
    String newFolder;
    if (folder.endsWith("/")) {
        newFolder = folder + directoryPath;
    } else {
        newFolder = folder + "/" + directoryPath;
    }

    FileObject parent = KettleVFS.getFileObject(newFolder);

    try {
        parent.createFolder();
    } catch (FileSystemException var7) {
        throw new KettleException("Unable to create folder " + newFolder, var7);
    }

    RepositoryDirectory newDir = new RepositoryDirectory(parentDirectory, directoryPath);
    parentDirectory.addSubdirectory(newDir);
    newDir.setObjectId(new StringObjectId(newDir.toString()));
    return newDir;
}

KettleDatabaseRepository源碼分析(數(shù)據(jù)庫存儲) :
一:初始化
1.初始化
1.1 無參構造

public KettleDatabaseRepository() {
}

1.2 初始化
1.2.1 指定 參數(shù),即 資源庫元數(shù)據(jù)repositoryMeta:

public void init(RepositoryMeta repositoryMeta) {
    this.repositoryMeta = (KettleDatabaseRepositoryMeta)repositoryMeta;


    //與kettleFilerepository原理類似,封裝 對資源庫 的操作的安全檢查 和 權限
    this.serviceList = new ArrayList();
    this.serviceMap = new HashMap();

    // 注冊日志組件,定義kettle 日志 輸出級別
    this.log = new LogChannel(this);

    // 無參 初始化函數(shù)
    this.init();
}

1.2.2 資源初始化,加載各種組件;

private void init() {
    //操作 TransMeta 的委托模式執(zhí)行者
    this.transDelegate = new KettleDatabaseRepositoryTransDelegate(this);

    //操作 JobMeta 的委托
    this.jobDelegate = new KettleDatabaseRepositoryJobDelegate(this);

    //初始化 數(shù)據(jù)庫操作的委托執(zhí)行
    this.databaseDelegate = new KettleDatabaseRepositoryDatabaseDelegate(this);

    //分布式 工作節(jié)點
    this.slaveServerDelegate = new KettleDatabaseRepositorySlaveServerDelegate(this);

    //集群模式
    this.clusterSchemaDelegate = new KettleDatabaseRepositoryClusterSchemaDelegate(this);

    //
    this.partitionSchemaDelegate = new KettleDatabaseRepositoryPartitionSchemaDelegate(this);

    //資源庫目錄
    this.directoryDelegate = new KettleDatabaseRepositoryDirectoryDelegate(this);

    //操作數(shù)據(jù)庫的代理執(zhí)行
    this.connectionDelegate = new KettleDatabaseRepositoryConnectionDelegate(this, this.repositoryMeta.getConnection());

    //資源庫用戶信息,如用于登錄等
    this.userDelegate = new KettleDatabaseRepositoryUserDelegate(this);

    //行級鎖
    this.conditionDelegate = new KettleDatabaseRepositoryConditionDelegate(this);

    
    this.valueDelegate = new KettleDatabaseRepositoryValueDelegate(this);
    this.notePadDelegate = new KettleDatabaseRepositoryNotePadDelegate(this);

    this.stepDelegate = new KettleDatabaseRepositoryStepDelegate(this);


    this.jobEntryDelegate = new KettleDatabaseRepositoryJobEntryDelegate(this);


    this.creationHelper = new KettleDatabaseRepositoryCreationHelper(this);
}

1.2.3 創(chuàng)建資源庫元素據(jù)

public RepositoryMeta createRepositoryMeta() {
    return new KettleDatabaseRepositoryMeta();
}
  1. 鏈接 資源 數(shù)據(jù)庫:
    2.1
public void connect(String username, String password, boolean upgrade) throws KettleException {
        this.connectionDelegate.connect(upgrade, upgrade);
        ...
}

在鏈接數(shù)據(jù)庫時,將 鏈接操作 Delegate 給 connectionDelegate.
upgrade : 鏈接資源數(shù)據(jù)庫后,是否驗證資源庫版本編號!

2.2 在 connectionDelegate 中,以 驗證版本的方式,鏈接數(shù)據(jù)庫!

public synchronized void connect(boolean no_lookup, boolean ignoreVersion) throws KettleException {
    
    //1.repository資源庫初始化時,connected 默認為false狀態(tài);
    //2.在 connection 成功之后,才進行 setConnected(true);
    if (this.repository.isConnected()) {
        throw new KettleException("Repository is already by class " + this.repository.isConnected());
    } else {
        try {
            //主要操作:1.初始化 系統(tǒng)/應用 配置的屬性,System.getProperties()
            //2. 更新initialized 標識為 :已初始化;
            this.database.initializeVariablesFrom((VariableSpace)null);
            //利用 database 鏈接數(shù)據(jù)庫
            this.database.connect();
            //鏈接資源庫后,是否需要驗證 資源庫版本
            if (!ignoreVersion) {
                this.verifyVersion();
            }

            //是否開啟事務
            this.setAutoCommit(false);
            // 更新 資源數(shù)據(jù)鏈接庫 狀態(tài)
            this.repository.setConnected(true);

            if (!no_lookup) {
                try {
                    //
                    this.repository.connectionDelegate.setLookupStepAttribute();
                    this.repository.connectionDelegate.setLookupTransAttribute();
                    this.repository.connectionDelegate.setLookupJobEntryAttribute();
                    this.repository.connectionDelegate.setLookupJobAttribute();
                } catch (KettleException var4) {
                    throw new KettleException("Error setting lookup prep.statements", var4);
                }
            }

        } catch (KettleException var5) {
            throw new KettleException("Error connecting to the repository!", var5);
        }
    }
}

鏈接資源庫后,校驗資源庫版本:
select * from R_VERSION;
資源庫初始版本---"4.0"
MAJOR_VERSION=4
MINOR_VERSION = 0
反之,拋出:Repository.UpgradeRequired.Message

2.3 在DataBase 中 鏈接數(shù)據(jù)庫;

public synchronized void connect(String group, String partitionId) throws KettleDatabaseException {
    //初始化資源數(shù)據(jù)庫時, 分組和 分區(qū)Id為空參!
    if (!Const.isEmpty(group)) {
        this.connectionGroup = group;
        this.partitionId = partitionId;
        DatabaseConnectionMap map = DatabaseConnectionMap.getInstance();
        Database lookup = map.getDatabase(group, partitionId, this);
        if (lookup == null) {
            this.normalConnect(partitionId);
            ++this.opened;
            this.copy = this.opened;
            map.storeDatabase(group, partitionId, this);
        } else {
            this.connection = lookup.getConnection();
            lookup.setOpened(lookup.getOpened() + 1);
            this.copy = lookup.getOpened();
        }
    } else {
        //正常邏輯 鏈接 ,不包含分區(qū)id邏輯
        this.normalConnect(partitionId);
    }

}

2.4 JDBC的方式連接數(shù)據(jù)庫

public void normalConnect(String partitionId) throws KettleDatabaseException {
        
        if (this.databaseMeta == null) {
            throw new KettleDatabaseException("No valid database connection defined!");
        } else {
            try {
                //是否配置使用連接池初始化
                //dbAccessTypeCode = new String[]{"Native", "ODBC", "OCI", "Plugin", "JNDI"};
                //配置使用連接池初始化步驟:
                if (this.databaseMeta.isUsingConnectionPool() && this.databaseMeta.getAccessType() != 4) {
                    try {
                        this.connection = ConnectionPoolUtil.getConnection(this.log, this.databaseMeta, partitionId);
                    } catch (Exception var3) {
                        throw new KettleDatabaseException("Error occured while trying to connect to the database", var3);
                    }
                } else {
                    //jdbc 連接 數(shù)據(jù)庫
                    this.connectUsingClass(this.databaseMeta.getDriverClass(), partitionId);
                    if (this.log.isDetailed()) {
                        this.log.logDetailed("Connected to database.");
                    }
                    // databaseInterface 反射時,內(nèi)部配置并沒有在初始化時配置
                    //在 dataBaseMeta時 并未進行賦值,所以此處為空
                    //此處 ConnectSQL 主要用于 kettle log 顯示,將 query result 顯示在 log plug 中;
                    String sql = this.environmentSubstitute(this.databaseMeta.getConnectSQL());
                    if (!Const.isEmpty(sql) && !Const.onlySpaces(sql)) {
                        this.execStatements(sql);
                        if (this.log.isDetailed()) {
                            this.log.logDetailed("Executed connect time SQL statements:" + Const.CR + sql);
                        }
                    }
                }

            } catch (Exception var4) {
                throw new KettleDatabaseException("Error occured while trying to connect to the database", var4);
            }
        }
    }

連接數(shù)據(jù)庫:

this.connectUsingClass(this.databaseMeta.getDriverClass(), partitionId);

/**
classname:pluginRaw.dirveClass
partitionId:null
dbAccessTypeCode = new String[]{"Native"-0, "ODBC"-1, "OCI"-2, "Plugin"-3, "JNDI"-4};
**/
private void connectUsingClass(String classname, String partitionId) throws KettleDatabaseException {
        //JNDI數(shù)據(jù)源時
        if (this.databaseMeta.getAccessType() == 4) {
            this.initWithNamedDataSource(this.environmentSubstitute(this.databaseMeta.getDatabaseName()));
        } else {
            try {
                Class var3 = DriverManager.class;
                //并發(fā)加載時,此處會產(chǎn)生死鎖等問題,jdk1.8版本并發(fā)加載時應該不會了
                synchronized(DriverManager.class) {
                    Class.forName(classname);
                }
            } catch (NoClassDefFoundError var10) {
                throw new KettleDatabaseException("Exception while loading class", var10);
            } catch (ClassNotFoundException var11) {
                throw new KettleDatabaseException("Exception while loading class", var11);
            } catch (Exception var12) {
                throw new KettleDatabaseException("Exception while loading class", var12);
            }

            try {
                String url;
                //默認是沒有配置集群,分區(qū)
                if (this.databaseMeta.isPartitioned() && !Const.isEmpty(partitionId)) {
                    url = this.environmentSubstitute(this.databaseMeta.getURL(partitionId));
                } else {
                    //拼接的 connection url
                    url = this.environmentSubstitute(this.databaseMeta.getURL());
                }
            
                String clusterUsername = null;
                String clusterPassword = null;
                if (this.databaseMeta.isPartitioned() && !Const.isEmpty(partitionId)) {
                    PartitionDatabaseMeta partition = this.databaseMeta.getPartitionMeta(partitionId);
                    if (partition != null) {
                        clusterUsername = partition.getUsername();
                        clusterPassword = Encr.decryptPasswordOptionallyEncrypted(partition.getPassword());
                    }
                }

                String password;
                String username;
                if (!Const.isEmpty(clusterUsername)) {
                    username = clusterUsername;
                    password = clusterPassword;
                } else {
                    
                    //在初始化數(shù)據(jù)庫插件 databaseInterface 時,在 DataBaseMeta 中
                    username = this.environmentSubstitute(this.databaseMeta.getUsername());
                    password = Encr.decryptPasswordOptionallyEncrypted(this.environmentSubstitute(this.databaseMeta.getPassword()));
                }

                //jdbc連接時的邏輯校驗,校驗當前 interface服務 是否支持 url 連接
                //默認所有的 數(shù)據(jù)庫插件 都支持此配置
                if (this.databaseMeta.supportsOptionsInURL()) {
                    if (Const.isEmpty(username) && Const.isEmpty(password)) {
                        this.connection = DriverManager.getConnection(url);
                    } else if (this.databaseMeta.getDatabaseInterface() instanceof MSSQLServerNativeDatabaseMeta) {
                        String instance = this.environmentSubstitute(this.databaseMeta.getSQLServerInstance());
                        if (Const.isEmpty(instance)) {
                            this.connection = DriverManager.getConnection(url + ";user=" + username + ";password=" + password);
                        } else {
                            this.connection = DriverManager.getConnection(url + ";user=" + username + ";password=" + password + ";instanceName=" + instance);
                        }
                    } else {
                        this.connection = DriverManager.getConnection(url, Const.NVL(username, " "), Const.NVL(password, ""));
                    }
                } else {
                    
                    Properties properties = this.databaseMeta.getConnectionProperties();
                    if (!Const.isEmpty(username)) {
                        properties.put("user", username);
                    }

                    if (!Const.isEmpty(password)) {
                        properties.put("password", password);
                    }

                    //以 key-value 的形式 連接
                    this.connection = DriverManager.getConnection(url, properties);
                }

            } catch (SQLException var13) {
                throw new KettleDatabaseException("Error connecting to database: (using class " + classname + ")", var13);
            } catch (Throwable var14) {
                throw new KettleDatabaseException("Error connecting to database: (using class " + classname + ")", var14);
            }
        }
    }

問題分析:
1.校驗 是否分區(qū),是否配置集群時,在 DataBaseMeta初始化時,并沒有對 Partitioned 以及Clustered初始化

public boolean isPartitioned() {
    String isClustered = this.attributes.getProperty("IS_CLUSTERED");
    return "Y".equalsIgnoreCase(isClustered);
}

初始化流程:
初始化 DataBaseMeta 時,會首先初始化數(shù)據(jù)庫插件,即 通過反射得到 BaseDatabaseMeta 的實例!
獲取數(shù)據(jù)庫插件實例后,將 root,password等參數(shù),以 DataBaseMeta 的 set方式,將參數(shù) 初始化到 插件中!

2.在 獲取 數(shù)據(jù)庫連接驅(qū)動時,如何獲取到的?
通過 加載數(shù)據(jù)庫插件,以及 指定的"MYSQL"數(shù)據(jù)庫類型,通過反射得到databaseInterface時,直接獲取到的!
并沒有預先賦值!

3.如何根據(jù) 傳入的數(shù)據(jù)庫類型 來 獲得 對應的數(shù)據(jù)庫的模型驅(qū)動?
kettle利用 插件的模式,將 多種類型的數(shù)據(jù)庫驅(qū)動 封裝為 可插拔式的接口調(diào)用!
數(shù)據(jù)庫插件結(jié)構圖:DatabasePluginType;


image.png

3.1 初始化流程:

DatabaseMeta dataMeta = new DatabaseMeta("kl_kettle", "MYSQL", "Native","127.0.0.1", "kettles", "3306","root","dawei");

databaseTypeDesc:"MYSQL";
private static final DatabaseInterface findDatabaseInterface(String databaseTypeDesc) throws KettleDatabaseException {
    PluginRegistry registry = PluginRegistry.getInstance();//插件模式
    //獲取 數(shù)據(jù)庫 驅(qū)動(注冊驅(qū)動式在哪里呢)
    PluginInterface plugin = registry.getPlugin(DatabasePluginType.class, databaseTypeDesc);
    if (plugin == null) {
        plugin = registry.findPluginWithName(DatabasePluginType.class, databaseTypeDesc);
    }
    if (plugin == null) {
        throw new KettleDatabaseException("database type with plugin id [" + databaseTypeDesc + "] couldn't be found!");
    } else {
        return (DatabaseInterface)getDatabaseInterfacesMap().get(plugin.getIds()[0]);
    }
}

3.2 初始化 數(shù)據(jù)庫插件(此后的分析將不按照初始化流程 ):

synchronized修飾,避免 并發(fā)造成的 getInstance() 初始化時的 雙重檢查的問題!


private static List<PluginTypeInterface> pluginTypes = new ArrayList();

public static synchronized void init() throws KettlePluginException {
    PluginRegistry registry = getInstance();

    PluginTypeInterface pluginType;
    long startScan;

    for(Iterator i$ = pluginTypes.iterator(); i$.hasNext(); LogChannel.GENERAL.logDetailed("Registered " + registry.getPlugins(pluginType.getClass()).size() + " plugins of type '" + pluginType.getName() + "' in " + (System.currentTimeMillis() - startScan) + "ms.")) {
        pluginType = (PluginTypeInterface)i$.next();
        
        //注冊插件 統(tǒng)一管理
        registry.registerPluginType(pluginType.getClass());
        startScan = System.currentTimeMillis();
        //初始化資源,加載插件,數(shù)據(jù)庫插件加載 ".xml" 文件
        pluginType.searchPlugins();
        //加載插件方式二:在系統(tǒng)配置中 配置插件加載路徑
        String pluginClasses = EnvUtil.getSystemProperty("KETTLE_PLUGIN_CLASSES");
        if (!Const.isEmpty(pluginClasses)) {
            String[] classNames = pluginClasses.split(",");
            String[] arr$ = classNames;
            int len$ = classNames.length;

            for(int i$ = 0; i$ < len$; ++i$) {
                String className = arr$[i$];

                try {
                    PluginAnnotationType annotationType = (PluginAnnotationType)pluginType.getClass().getAnnotation(PluginAnnotationType.class);
                    Class<? extends Annotation> annotationClass = annotationType.value();
                    Class<?> clazz = Class.forName(className);
                    Annotation annotation = clazz.getAnnotation(annotationClass);
                    if (annotation != null) {
                        //獲取插件實例,并 初始化(利用反射注解生成)
                        pluginType.handlePluginAnnotation(clazz, annotation, new ArrayList(), true, (URL)null);
                    }
                } catch (Exception var15) {
                    LogChannel.GENERAL.logError("Error registring plugin class from KETTLE_PLUGIN_CLASSES: " + className, var15);
                }
            }
        }
    }

    JarFileCache.getInstance().clear();
}

pluginTypes : 類型為PluginTypeInterface;
所有加載的插件,最終都存放在pluginTypes中,由 PluginRegistry 統(tǒng)一 注冊,加載 和 管理!
PluginTypeInterface:各類插件的接口,所有新增插件,均需實現(xiàn)此接口!
PluginTypeInterface定義如下:


image.png

包含一個插件的最基本的功能定義 !

根據(jù)如上信息,可以 擴展自定義實現(xiàn)我們自己的插件功能,只需實現(xiàn) PluginTypeInterface 接口,
并在初始環(huán)境中注冊此插件即可!

3.3 初始化資源,加載插件:

代碼如下:
pluginType.searchPlugins();
-->DatabasePluginType.registerNatives();
----------------------------------------
加載 kettle-database-types.xml 配置文件
kettle-database-types.xml配置文件 定義了二十多種數(shù)據(jù)庫類型,包括很多常見的數(shù)據(jù)庫 !

protected void registerNatives() throws KettlePluginException {
    String xmlFile = "kettle-database-types.xml" ;

    try {
        InputStream inputStream = this.getClass().getResourceAsStream(xmlFile);
        if (inputStream == null) {
            inputStream = this.getClass().getResourceAsStream("/" + xmlFile);
        }

        if (inputStream == null) {
            throw new KettlePluginException("Unable to find native kettle database types definition file: " + xmlFile);
        } else {
            Document document = XMLHandler.loadXMLFile(inputStream, (String)null, true, false);
            Node repsNode = XMLHandler.getSubNode(document, "database-types");
            List<Node> repsNodes = XMLHandler.getNodes(repsNode, "database-type");
            Iterator i$ = repsNodes.iterator();

            while(i$.hasNext()) {
                Node repNode = (Node)i$.next();
                //每一個 database-type 作為一個Node遍歷,初始化為一個 數(shù)據(jù)庫接口服務;
                this.registerPluginFromXmlResource(repNode, "./", this.getClass(), true, (URL)null);
            }
        }
    } catch (KettleXMLException var8) {
        throw new KettlePluginException("Unable to read the kettle database types XML config file: " + xmlFile, var8);
    }
}

XML:
<database-type id="MYSQL">
  <description>MySQL</description>
  <classname>org.pentaho.di.core.database.MySQLDatabaseMeta</classname>
</database-type>   

主要事件:
1.讀取配置數(shù)據(jù)庫驅(qū)動的".xml”文件 !
2.讀取database-type節(jié)點,遍歷每一個數(shù)據(jù)庫驅(qū)動節(jié)點 repNode!
3.生成數(shù)據(jù)庫插件接庫服務,注冊到Registry統(tǒng)一 加載,初始化!

List<PluginInterface> list = (List)this.pluginMap.get(pluginType);
if (list == null) {
    list = new ArrayList();
    this.pluginMap.put(pluginType, list);
}
//魔鬼藏在細節(jié)中啊,細節(jié),細節(jié)
int index = ((List)list).indexOf(plugin);
if (index < 0) {
    ((List)list).add(plugin);
} else {
    ((List)list).set(index, plugin);
}

此段邏輯,為 整個插件 加載初始化 最重要的一步,到這里,僅僅只是將 數(shù)據(jù)庫插件服務 初始化完畢!
生成具體的數(shù)據(jù)庫驅(qū)動實例 在 loadClass中完成的!

3.4 初始化數(shù)據(jù)庫驅(qū)動實例

代碼片段一:
try {
    DatabaseInterface databaseInterface = (DatabaseInterface)registry.loadClass(plugin);
    databaseInterface.setPluginId(plugin.getIds()[0]);
    databaseInterface.setPluginName(plugin.getName());
    tmpAllDatabaseInterfaces.put(plugin.getIds()[0], databaseInterface);
}

代碼片段二(摘選核心實現(xiàn)代碼):
Class<? extends T> cl = null;
if (plugin.isNativePlugin()) {
    cl = Class.forName(className);
    return cl.newInstance();
} else {
    //此處 體現(xiàn) kettle的強大之處;
}

綜上整個流程為 kettle 加載數(shù)據(jù)庫驅(qū)動的完整實現(xiàn)!
其結(jié)構實現(xiàn),不得不說,體現(xiàn)kettle的強大之處,在抽象插件功能模塊時:
1.本地代碼方式配置插件,也可以 在java 系統(tǒng)屬性配置插件進行加載初始化!
2.可以根據(jù)自身的功能需求,對 kettle 現(xiàn)有的插件進行擴展重寫;
3.可以加載本地已有的插件,也可以將插件封裝為jar包的形式網(wǎng)絡調(diào)用加載 !

僅僅只是一個 插件配置,加載的設計,所涉及到的使用場景,其考慮的周全,細膩,不得不佩服,足以體現(xiàn)了作者的抽象設計能力,這就是差距,看來還是差的很遠啦!

4.kettle 的配置文件是如何加載的 ?
4.1.框架的配置文件(這塊比較簡單):
框架的配置文件 是 在 環(huán)境初始化時,注冊插件時,根據(jù)插件類型去 加載指定的配置文件 !

// 注冊原生類型和各個所需的插件
PluginRegistry.addPluginType(StepPluginType.getInstance());
PluginRegistry.addPluginType(PartitionerPluginType.getInstance());
PluginRegistry.addPluginType(JobEntryPluginType.getInstance());
PluginRegistry.addPluginType(RepositoryPluginType.getInstance());

代碼片段:
protected void registerNatives() throws KettlePluginException {
        String xmlFile = "kettle-repositories.xml";

            InputStream inputStream = this.getClass().getResourceAsStream(xmlFile);
            if (inputStream == null) {
                inputStream = this.getClass().getResourceAsStream("/" + xmlFile);
            }
                Document document = XMLHandler.loadXMLFile(inputStream, (String)null, true, false);
                Node repsNode = XMLHandler.getSubNode(document, "repositories");
                List<Node> repsNodes = XMLHandler.getNodes(repsNode, "repository");
                Iterator i$ = repsNodes.iterator();

                while(i$.hasNext()) {
                    Node repNode = (Node)i$.next();
                    this.registerPluginFromXmlResource(repNode, (String)null, this.getClass(), true, (URL)null);
                }
    }

后續(xù)加載的邏輯,與 數(shù)據(jù)庫插件初始化 流程相同,有興趣的同學可以自己扒源碼看看~ ~。

4.2.自定義配置
自定義的一些配置,比如數(shù)據(jù)庫鏈接屬性等,這個也比較簡單!
目前封裝的結(jié)構來說,所有與數(shù)據(jù)庫有關的配置被映射在 數(shù)據(jù)庫驅(qū)動插件類中:如MYSQL:MySQLDatabaseMeta類中!


image.png

MySQLDatabaseMeta 類 被封裝在 DataBaseMeta中使用!自定義的配置,最終都將在DataBaseMeta中被映射到數(shù)據(jù)庫驅(qū)動類中!

5.數(shù)據(jù)庫插件初始化,加載驅(qū)動時,為甚么會加載"org.gjt.mm.mysql.Driver"包 ?
5.1 MySQLDatabaseMeta源碼:

public String getDriverClass() {
    return this.getAccessType() == 1 ? "sun.jdbc.odbc.JdbcOdbcDriver" : "org.gjt.mm.mysql.Driver";
}
dbAccessTypeCode = new String[]{"Native"-0, "ODBC"-1, "OCI"-2, "Plugin"-3, "JNDI"-4};

5.2 org.gjt.mm.mysql.Driver 驅(qū)動包為甚么可以加載成功?
引用:

"org.gjt.mm.mysql.Driver 是當時最好的MySQL JDBC,但不是MySQL公司推出的,然后MySQL將 MM公司的 JDBC驅(qū)動收為官方的JDBC驅(qū)動,所以將驅(qū)動的package也改了,
但還保留了org.gjt.mm.mysql.Driver這個路徑的引用,也就是你使用新版的JDBC驅(qū)動時還可以通過這個來引用,你打開下載的新版JDBC驅(qū)動的jar文件可以看到,
只有一個文件的目錄是org.gjt.mm.mysql,就是為了兼容而設計的
image.png

6.既然kettle底層使用jdbc的方式去鏈接數(shù)據(jù)庫,那以什么樣的方式 保證connection的有效性 和 避免OOM?
6.1 kettle底層,以 jdbc的方式,去訪問數(shù)據(jù)庫 !

6.2 如何避免大數(shù)據(jù)下產(chǎn)生的OOM?(后續(xù))
通常而言,若不采用第三方框架,而使用JDBC操作數(shù)據(jù)庫,OOM總是不可避免的(Mybatis是另一種方式實現(xiàn)的),在生產(chǎn)環(huán)境中,它會是最隱形,最頻繁的一種bug !
DataBase是如何盡可能的避免這種情況的呢?
一直在想,為什么沒有采用common-dbutils jar呢?
下面的代碼可以當作 JDBC的模板來使用(整個邏輯代碼,要比dbutils實現(xiàn)的更好一些):

6.3 并發(fā)場景下(后續(xù)):

DataBase 中封裝著 DataBaseMeta 和 connection:
前者用于對 參數(shù)的初始化,驅(qū)動的加載 以及 與數(shù)據(jù)庫有關的邏輯校驗。
后者用于 jdbc的鏈接 和 與數(shù)據(jù)庫有關的操作!
整個結(jié)構還是非常清晰!

總結(jié):
kettle 支持數(shù)據(jù)源,默認支持JNDI,但是可以通過 插件功能,自定義擴展,重寫 kettle 數(shù)據(jù)源配置!
從最初 kettle 就被作者定義為一個開放的工具,代碼結(jié)構設計中,處處體現(xiàn)著這一點!比如:插件功能(很強大)!

遷移的幾點思考:
1.遷移到Linux平臺后,對原有的kettle任務進行調(diào)度,調(diào)度邏輯如何實現(xiàn)?
當前國內(nèi)主流的分布式調(diào)度框架,比如當當?shù)膃lastic-job,淘寶的TBSchedule,唯品會的Saturn ,其工作節(jié)點定時調(diào)度邏輯,多基于Quartz(你懂得)+zk的模式!
SpringBoot:
1.1.利用spring對Quartz的高度支持,其定時調(diào)度實現(xiàn)的非常輕量級,利用時間驅(qū)動,實現(xiàn)的非常巧妙!另:不得不說spring對jdk線程池的優(yōu)化來支持其定時調(diào)度!

1.2.可能出現(xiàn)的瓶頸在于 執(zhí)行器上,對于不同的任務,耗時較長 或者 任務過多時,springBoot當前的實現(xiàn),會造成任務異常中斷且后續(xù)不再執(zhí)行等!
1.3.spring完善的生態(tài)體系 和 功能強大的框架組合,雖然提高了開發(fā)效率,但其冗雜無續(xù)的依賴,也是讓人頭疼的!

Quartz:
...

時間輪:
當前,以netty4.x版本中,Hash***的實現(xiàn),其理想狀態(tài),可將延時控制在1s以內(nèi)(最多不會超過1s)。
適合于 時間精度要求不是非常高,且 任務量巨大的場景!
阿里內(nèi)部rocketMq版本,其 定時調(diào)度 就以時間輪實現(xiàn),利用鏈表分區(qū) 以及 鏈表并行,提高并發(fā)效率!

2.初期的結(jié)構設想:
調(diào)度器:定時調(diào)度應用中所有的任務,任務在運行之前,必須將任務的信息注冊到調(diào)度器中,由調(diào)度器 進行統(tǒng)一的調(diào)度!
kettle-task:動態(tài)的創(chuàng)建,更新 task,
Executor:并發(fā)的執(zhí)行已調(diào)度中的任務;

當然實際的場景,也許比這更復雜,需要實時前臺展示執(zhí)行的日志,異常的處理,任務版本的變化監(jiān)控等等!
最終的功能需求,設計結(jié)構 還需后續(xù)!

image.png
fdsf
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容