kettle源碼分析 :本次源碼分析 基于 kettle v4.0 分析:
背景:
因最近新增一個需求,需要將原來在 windows平臺上的 kettle應用 遷移到 linux 上,并且 進行 定時調(diào)度,新增前臺管理頁面,可對任務進行 動態(tài)更新,增加,刪除等操作外,還需 設置任務的調(diào)度時間 !
初期遇到的問題:
1.版本選擇:
因 原來windows應用,是選用當時的經(jīng)典版4.4.x版本,目前,kettle 最新版本已更新到8.x,是否仍然選擇4.x版本?
2.數(shù)據(jù)資源庫選擇:
既是遷移,遷移在linux平臺后,當然是以數(shù)據(jù)庫的方式來保存資源!
但 原應用中任務以".kjb",".ktr"等方式保存在磁盤上,任務數(shù)量眾多,以數(shù)據(jù)庫的方式尚無法完全兼容!
3...
4.涉及到發(fā)送郵件,數(shù)據(jù)導出所需的各種依賴 等等,包含windows 和 linux 不同平臺所依賴的不同的jar(這個有點坑,早起kettle的版本依賴) 。
當然,最終的選擇,還是得看老板的選擇,基于對業(yè)務的請求,并發(fā),實際使用的場景,后續(xù)任務的數(shù)量以及 任務的種類 等綜合因素考量!
比如,執(zhí)行一個 "生成日結(jié)" 的任務,可能需要耗費的時間相對較長達到數(shù)小時!
首先對kettle的大致結(jié)構,功能模塊大致熟悉后,當然需要對其 插件注冊原理,資源庫加載初始化等流程需要有一個詳細的了解(后續(xù)很多功能都需要涉及到);
源碼版本,因當前 windows 運行的版本為4.x,所以目前的分析版本也為 kettle 4.x(kettle后續(xù)的幾次更新,改動都較大);
整個源碼結(jié)構 并不算復雜,相對 jstorm內(nèi)核,netty 源碼 而言,其 抽象結(jié)構 還是比較容易看懂的!
插件注冊,資源庫初始化 整個流程還是比較簡單的!
資源庫初始化流程:
一:環(huán)境初始化;
1.1 此處后面分析,kettle環(huán)境初始化,其實就是 初始化 kettle插件模板后,加載指定的各類插件,插件模式!
二:資源庫元數(shù)據(jù):

本地磁盤資源庫KettleFileRepositoryMeta 實現(xiàn)了本地文件解析的 getXML()和 loadXML(,),其與 數(shù)據(jù)庫資源庫不同的地方之一!
2.1 無參構造方法
public static String REPOSITORY_TYPE_ID = "KettleFileRepository";
public KettleFileRepositoryMeta() {
super(REPOSITORY_TYPE_ID);
}
REPOSITORY_TYPE_ID:資源庫ID,默認為"KettleFileRepository" ;
作用:在 生成JobMeta時,"*.kjb"文件結(jié)構 的 解析,XMLHandler ;
2.2 有參構造方法
public KettleFileRepositoryMeta(String id, String name, String description, String baseDirectory) {
super(id, name, description);
this.baseDirectory = baseDirectory;
}
參數(shù)解釋:
id: 資源庫Id;
name: 資源庫名稱;
description: 資源庫描述;
dir: 資源庫目錄;
2.2.1 id,name,description作用:
在 加載 kjb文件為document,最終轉(zhuǎn)換為InputStream時 !
在 將kjb文件 轉(zhuǎn)換為xml文檔時!
(后面分析 jobMeta 時會詳細介紹 ! )
2.2.2 dir:
2.3 獲取資源庫信息,以XML的方式;
public String getXML() {
StringBuffer retval = new StringBuffer(100);
retval.append(" ").append(XMLHandler.openTag("repository"));
retval.append(super.getXML());
retval.append(" ").append(XMLHandler.addTagValue("base_directory", this.baseDirectory));
retval.append(" ").append(XMLHandler.addTagValue("read_only", this.readOnly));
retval.append(" ").append(XMLHandler.addTagValue("hides_hidden_files", this.hidingHiddenFiles));
retval.append(" ").append(XMLHandler.closeTag("repository"));
return retval.toString();
}
基本邏輯 將 所有的解析實現(xiàn)委托給 XMLHandler 去實現(xiàn)(后面分析XMLhandler)!
2.4 加載 本地文件資源庫,以文檔的方式!
public void loadXML(Node repnode, List<DatabaseMeta> databases) throws KettleException {
super.loadXML(repnode, databases);
try {
this.baseDirectory = XMLHandler.getTagValue(repnode, "base_directory");
this.readOnly = "Y".equalsIgnoreCase(XMLHandler.getTagValue(repnode, "read_only"));
this.hidingHiddenFiles = "Y".equalsIgnoreCase(XMLHandler.getTagValue(repnode, "hides_hidden_files"));
} catch (Exception var4) {
throw new KettleException("Unable to load Kettle file repository meta object", var4);
}
}
KettleFileRepositoryMeta的整體結(jié)構還是較為簡單,
2.5 初始化本地資源庫信息;
public void init(RepositoryMeta repositoryMeta) {
this.serviceMap = new HashMap();
this.serviceList = new ArrayList();
this.repositoryMeta = (KettleFileRepositoryMeta)repositoryMeta;
this.securityProvider = new KettleFileRepositorySecurityProvider(repositoryMeta);
this.serviceMap.put(RepositorySecurityProvider.class, this.securityProvider);
this.serviceList.add(RepositorySecurityProvider.class);
this.log = new LogChannel(this);
}
初始化流程:
serviceList :提供安全檢查的接口,此處 使用 RepositorySecurityProvider.class,可根據(jù)具體的自定義實現(xiàn) 安全檢查的接口邏輯!
serviceMap :在對 資源庫 操作時,進行的安全檢查!
主要做了如下幾件事:
1.初始化 repositoryMeta ;
2.定義安全檢查接口,可以自定義安全檢查實現(xiàn) !
3.配置日志輸出信息 !
2.6 一般方法
public LogChannelInterface getLog() {
return this.log;//獲取資源庫日志輸出信息,可搭配 websocket 顯示在前臺上!
}
public boolean isConnected() {
return true;//此為數(shù)據(jù)庫鏈接的方式,此處做一般實現(xiàn),無實際意義;
}
public RepositorySecurityProvider getSecurityProvider() {
return this.securityProvider;//執(zhí)行對資源庫的操作前,判斷資源庫的特征,比如是否只讀等!
}
2.7 加載 資源庫目錄 結(jié)構
public RepositoryDirectoryInterface loadRepositoryDirectoryTree() throws KettleException {
//初始化時,dir and name is all null;
RepositoryDirectory root = new RepositoryDirectory();
//設置 root 目錄為 "/" ;
root.setObjectId(new StringObjectId("/"));
return this.loadRepositoryDirectoryTree(root);
}
public RepositoryDirectoryInterface loadRepositoryDirectoryTree(RepositoryDirectoryInterface dir) throws KettleException {
//獲取 本地資源庫的 目錄名字
//如: E://kettle/data-->E://kettle/data/
String folderName = this.calcDirectoryName(dir);
//VFS加載 資源庫目錄下的所有文件,包括子目錄(不包括子目錄下的文件);
FileObject folder = KettleVFS.getFileObject(folderName);
//獲取資源庫目錄下的所有的文件對象
FileObject[] arr$ = folder.getChildren();
int len$ = arr$.length;
for(int i$ = 0; i$ < len$; ++i$) {
FileObject child = arr$[i$];
//判斷文件類型 以及 文件屬性是否 hidden ,
if (child.getType().equals(FileType.FOLDER) && (!child.isHidden() || !this.repositoryMeta.isHidingHiddenFiles())) {
//獲取文件名稱 或者 子目錄名稱,遞歸使用
// child.getName().getBaseName():獲取文件名稱或 子目錄名
RepositoryDirectory subDir = new RepositoryDirectory(dir, child.getName().getBaseName());
subDir.setObjectId(new StringObjectId(this.calcObjectId((RepositoryDirectoryInterface)subDir)));
dir.addSubdirectory(subDir);
//遞歸調(diào)用,遍歷子目錄下的文件
this.loadRepositoryDirectoryTree(subDir);
}
}
return dir;
}
加載格式化目錄: 指定目錄下的 文件 路徑:相對路徑"subJect/fzaccSub.kjb";
private String calcDirectoryName(RepositoryDirectoryInterface dir) {
StringBuilder directory = new StringBuilder();
String baseDir = this.repositoryMeta.getBaseDirectory();
baseDir = Const.replace(baseDir, "\\", "/");
directory.append(baseDir);
if (!baseDir.endsWith("/")) {
directory.append("/");
}
if (dir != null) {
String path = this.calcRelativeElementDirectory(dir);
if (path.startsWith("/")) {
directory.append(path.substring(1));
} else {
//指定目錄下的 文件 路徑;
directory.append(path);
}
if (!path.endsWith("/")) {
directory.append("/");
}
}
return directory.toString();
}
2.7.3 獲取絕對 資源庫 根據(jù)指定文件路徑下的 格式化后的路徑:
public String calcObjectId(RepositoryDirectoryInterface dir) {
StringBuilder id = new StringBuilder();
String path = this.calcRelativeElementDirectory(dir);
id.append(path);
if (!path.endsWith("/")) {
id.append("/");
}
return id.toString();
}
2.7.4 根據(jù)指定的路徑判斷 是否存在資源庫中
public boolean exists(String name, RepositoryDirectoryInterface repositoryDirectory, RepositoryObjectType objectType) throws KettleException {
try {
FileObject fileObject = KettleVFS.getFileObject(this.calcFilename(repositoryDirectory, name, objectType.getExtension()));
return fileObject.exists();
} catch (Exception var5) {
throw new KettleException(var5);
}
}
2.8 保存任務文件
public void save(RepositoryElementInterface repositoryElement, String versionComment, ProgressMonitorListener monitor, ObjectId parentId, boolean used) throws KettleException {
try {
if (!(repositoryElement instanceof XMLInterface) && !(repositoryElement instanceof SharedObjectInterface)) {
throw new KettleException("Class [" + repositoryElement.getClass().getName() + "] needs to implement the XML Interface in order to save it to disk");
} else {
if (!Const.isEmpty(versionComment)) {
this.insertLogEntry(versionComment);
}
ObjectId objectId = new StringObjectId(this.calcObjectId(repositoryElement));
FileObject fileObject = this.getFileObject(repositoryElement);
//JobMeta,TransMeta繼承XMLInterface的原因
String xml = ((XMLInterface)repositoryElement).getXML();
OutputStream os = KettleVFS.getOutputStream(fileObject, false);
os.write(xml.getBytes("UTF-8"));
os.close();
if (repositoryElement instanceof ChangedFlagInterface) {
//復位狀態(tài)位
((ChangedFlagInterface)repositoryElement).clearChanged();
}
if (repositoryElement.getObjectId() != null && !repositoryElement.getObjectId().equals(objectId)) {
//刪除舊的文件名稱
this.delObject(repositoryElement.getObjectId());
}
repositoryElement.setObjectId(objectId);
}
} catch (Exception var10) {
throw new KettleException("Unable to save repository element [" + repositoryElement + "] to XML file : " + this.calcFilename(repositoryElement), var10);
}
}
2.9 根據(jù) 指定的 路徑 創(chuàng)建資源庫的文件路徑
public RepositoryDirectoryInterface createRepositoryDirectory(RepositoryDirectoryInterface parentDirectory, String directoryPath) throws KettleException {
String folder = this.calcDirectoryName(parentDirectory);
String newFolder;
if (folder.endsWith("/")) {
newFolder = folder + directoryPath;
} else {
newFolder = folder + "/" + directoryPath;
}
FileObject parent = KettleVFS.getFileObject(newFolder);
try {
parent.createFolder();
} catch (FileSystemException var7) {
throw new KettleException("Unable to create folder " + newFolder, var7);
}
RepositoryDirectory newDir = new RepositoryDirectory(parentDirectory, directoryPath);
parentDirectory.addSubdirectory(newDir);
newDir.setObjectId(new StringObjectId(newDir.toString()));
return newDir;
}
KettleDatabaseRepository源碼分析(數(shù)據(jù)庫存儲) :
一:初始化
1.初始化
1.1 無參構造
public KettleDatabaseRepository() {
}
1.2 初始化
1.2.1 指定 參數(shù),即 資源庫元數(shù)據(jù)repositoryMeta:
public void init(RepositoryMeta repositoryMeta) {
this.repositoryMeta = (KettleDatabaseRepositoryMeta)repositoryMeta;
//與kettleFilerepository原理類似,封裝 對資源庫 的操作的安全檢查 和 權限
this.serviceList = new ArrayList();
this.serviceMap = new HashMap();
// 注冊日志組件,定義kettle 日志 輸出級別
this.log = new LogChannel(this);
// 無參 初始化函數(shù)
this.init();
}
1.2.2 資源初始化,加載各種組件;
private void init() {
//操作 TransMeta 的委托模式執(zhí)行者
this.transDelegate = new KettleDatabaseRepositoryTransDelegate(this);
//操作 JobMeta 的委托
this.jobDelegate = new KettleDatabaseRepositoryJobDelegate(this);
//初始化 數(shù)據(jù)庫操作的委托執(zhí)行
this.databaseDelegate = new KettleDatabaseRepositoryDatabaseDelegate(this);
//分布式 工作節(jié)點
this.slaveServerDelegate = new KettleDatabaseRepositorySlaveServerDelegate(this);
//集群模式
this.clusterSchemaDelegate = new KettleDatabaseRepositoryClusterSchemaDelegate(this);
//
this.partitionSchemaDelegate = new KettleDatabaseRepositoryPartitionSchemaDelegate(this);
//資源庫目錄
this.directoryDelegate = new KettleDatabaseRepositoryDirectoryDelegate(this);
//操作數(shù)據(jù)庫的代理執(zhí)行
this.connectionDelegate = new KettleDatabaseRepositoryConnectionDelegate(this, this.repositoryMeta.getConnection());
//資源庫用戶信息,如用于登錄等
this.userDelegate = new KettleDatabaseRepositoryUserDelegate(this);
//行級鎖
this.conditionDelegate = new KettleDatabaseRepositoryConditionDelegate(this);
this.valueDelegate = new KettleDatabaseRepositoryValueDelegate(this);
this.notePadDelegate = new KettleDatabaseRepositoryNotePadDelegate(this);
this.stepDelegate = new KettleDatabaseRepositoryStepDelegate(this);
this.jobEntryDelegate = new KettleDatabaseRepositoryJobEntryDelegate(this);
this.creationHelper = new KettleDatabaseRepositoryCreationHelper(this);
}
1.2.3 創(chuàng)建資源庫元素據(jù)
public RepositoryMeta createRepositoryMeta() {
return new KettleDatabaseRepositoryMeta();
}
- 鏈接 資源 數(shù)據(jù)庫:
2.1
public void connect(String username, String password, boolean upgrade) throws KettleException {
this.connectionDelegate.connect(upgrade, upgrade);
...
}
在鏈接數(shù)據(jù)庫時,將 鏈接操作 Delegate 給 connectionDelegate.
upgrade : 鏈接資源數(shù)據(jù)庫后,是否驗證資源庫版本編號!
2.2 在 connectionDelegate 中,以 驗證版本的方式,鏈接數(shù)據(jù)庫!
public synchronized void connect(boolean no_lookup, boolean ignoreVersion) throws KettleException {
//1.repository資源庫初始化時,connected 默認為false狀態(tài);
//2.在 connection 成功之后,才進行 setConnected(true);
if (this.repository.isConnected()) {
throw new KettleException("Repository is already by class " + this.repository.isConnected());
} else {
try {
//主要操作:1.初始化 系統(tǒng)/應用 配置的屬性,System.getProperties()
//2. 更新initialized 標識為 :已初始化;
this.database.initializeVariablesFrom((VariableSpace)null);
//利用 database 鏈接數(shù)據(jù)庫
this.database.connect();
//鏈接資源庫后,是否需要驗證 資源庫版本
if (!ignoreVersion) {
this.verifyVersion();
}
//是否開啟事務
this.setAutoCommit(false);
// 更新 資源數(shù)據(jù)鏈接庫 狀態(tài)
this.repository.setConnected(true);
if (!no_lookup) {
try {
//
this.repository.connectionDelegate.setLookupStepAttribute();
this.repository.connectionDelegate.setLookupTransAttribute();
this.repository.connectionDelegate.setLookupJobEntryAttribute();
this.repository.connectionDelegate.setLookupJobAttribute();
} catch (KettleException var4) {
throw new KettleException("Error setting lookup prep.statements", var4);
}
}
} catch (KettleException var5) {
throw new KettleException("Error connecting to the repository!", var5);
}
}
}
鏈接資源庫后,校驗資源庫版本:
select * from R_VERSION;
資源庫初始版本---"4.0"
MAJOR_VERSION=4
MINOR_VERSION = 0
反之,拋出:Repository.UpgradeRequired.Message
2.3 在DataBase 中 鏈接數(shù)據(jù)庫;
public synchronized void connect(String group, String partitionId) throws KettleDatabaseException {
//初始化資源數(shù)據(jù)庫時, 分組和 分區(qū)Id為空參!
if (!Const.isEmpty(group)) {
this.connectionGroup = group;
this.partitionId = partitionId;
DatabaseConnectionMap map = DatabaseConnectionMap.getInstance();
Database lookup = map.getDatabase(group, partitionId, this);
if (lookup == null) {
this.normalConnect(partitionId);
++this.opened;
this.copy = this.opened;
map.storeDatabase(group, partitionId, this);
} else {
this.connection = lookup.getConnection();
lookup.setOpened(lookup.getOpened() + 1);
this.copy = lookup.getOpened();
}
} else {
//正常邏輯 鏈接 ,不包含分區(qū)id邏輯
this.normalConnect(partitionId);
}
}
2.4 JDBC的方式連接數(shù)據(jù)庫
public void normalConnect(String partitionId) throws KettleDatabaseException {
if (this.databaseMeta == null) {
throw new KettleDatabaseException("No valid database connection defined!");
} else {
try {
//是否配置使用連接池初始化
//dbAccessTypeCode = new String[]{"Native", "ODBC", "OCI", "Plugin", "JNDI"};
//配置使用連接池初始化步驟:
if (this.databaseMeta.isUsingConnectionPool() && this.databaseMeta.getAccessType() != 4) {
try {
this.connection = ConnectionPoolUtil.getConnection(this.log, this.databaseMeta, partitionId);
} catch (Exception var3) {
throw new KettleDatabaseException("Error occured while trying to connect to the database", var3);
}
} else {
//jdbc 連接 數(shù)據(jù)庫
this.connectUsingClass(this.databaseMeta.getDriverClass(), partitionId);
if (this.log.isDetailed()) {
this.log.logDetailed("Connected to database.");
}
// databaseInterface 反射時,內(nèi)部配置并沒有在初始化時配置
//在 dataBaseMeta時 并未進行賦值,所以此處為空
//此處 ConnectSQL 主要用于 kettle log 顯示,將 query result 顯示在 log plug 中;
String sql = this.environmentSubstitute(this.databaseMeta.getConnectSQL());
if (!Const.isEmpty(sql) && !Const.onlySpaces(sql)) {
this.execStatements(sql);
if (this.log.isDetailed()) {
this.log.logDetailed("Executed connect time SQL statements:" + Const.CR + sql);
}
}
}
} catch (Exception var4) {
throw new KettleDatabaseException("Error occured while trying to connect to the database", var4);
}
}
}
連接數(shù)據(jù)庫:
this.connectUsingClass(this.databaseMeta.getDriverClass(), partitionId);
/**
classname:pluginRaw.dirveClass
partitionId:null
dbAccessTypeCode = new String[]{"Native"-0, "ODBC"-1, "OCI"-2, "Plugin"-3, "JNDI"-4};
**/
private void connectUsingClass(String classname, String partitionId) throws KettleDatabaseException {
//JNDI數(shù)據(jù)源時
if (this.databaseMeta.getAccessType() == 4) {
this.initWithNamedDataSource(this.environmentSubstitute(this.databaseMeta.getDatabaseName()));
} else {
try {
Class var3 = DriverManager.class;
//并發(fā)加載時,此處會產(chǎn)生死鎖等問題,jdk1.8版本并發(fā)加載時應該不會了
synchronized(DriverManager.class) {
Class.forName(classname);
}
} catch (NoClassDefFoundError var10) {
throw new KettleDatabaseException("Exception while loading class", var10);
} catch (ClassNotFoundException var11) {
throw new KettleDatabaseException("Exception while loading class", var11);
} catch (Exception var12) {
throw new KettleDatabaseException("Exception while loading class", var12);
}
try {
String url;
//默認是沒有配置集群,分區(qū)
if (this.databaseMeta.isPartitioned() && !Const.isEmpty(partitionId)) {
url = this.environmentSubstitute(this.databaseMeta.getURL(partitionId));
} else {
//拼接的 connection url
url = this.environmentSubstitute(this.databaseMeta.getURL());
}
String clusterUsername = null;
String clusterPassword = null;
if (this.databaseMeta.isPartitioned() && !Const.isEmpty(partitionId)) {
PartitionDatabaseMeta partition = this.databaseMeta.getPartitionMeta(partitionId);
if (partition != null) {
clusterUsername = partition.getUsername();
clusterPassword = Encr.decryptPasswordOptionallyEncrypted(partition.getPassword());
}
}
String password;
String username;
if (!Const.isEmpty(clusterUsername)) {
username = clusterUsername;
password = clusterPassword;
} else {
//在初始化數(shù)據(jù)庫插件 databaseInterface 時,在 DataBaseMeta 中
username = this.environmentSubstitute(this.databaseMeta.getUsername());
password = Encr.decryptPasswordOptionallyEncrypted(this.environmentSubstitute(this.databaseMeta.getPassword()));
}
//jdbc連接時的邏輯校驗,校驗當前 interface服務 是否支持 url 連接
//默認所有的 數(shù)據(jù)庫插件 都支持此配置
if (this.databaseMeta.supportsOptionsInURL()) {
if (Const.isEmpty(username) && Const.isEmpty(password)) {
this.connection = DriverManager.getConnection(url);
} else if (this.databaseMeta.getDatabaseInterface() instanceof MSSQLServerNativeDatabaseMeta) {
String instance = this.environmentSubstitute(this.databaseMeta.getSQLServerInstance());
if (Const.isEmpty(instance)) {
this.connection = DriverManager.getConnection(url + ";user=" + username + ";password=" + password);
} else {
this.connection = DriverManager.getConnection(url + ";user=" + username + ";password=" + password + ";instanceName=" + instance);
}
} else {
this.connection = DriverManager.getConnection(url, Const.NVL(username, " "), Const.NVL(password, ""));
}
} else {
Properties properties = this.databaseMeta.getConnectionProperties();
if (!Const.isEmpty(username)) {
properties.put("user", username);
}
if (!Const.isEmpty(password)) {
properties.put("password", password);
}
//以 key-value 的形式 連接
this.connection = DriverManager.getConnection(url, properties);
}
} catch (SQLException var13) {
throw new KettleDatabaseException("Error connecting to database: (using class " + classname + ")", var13);
} catch (Throwable var14) {
throw new KettleDatabaseException("Error connecting to database: (using class " + classname + ")", var14);
}
}
}
問題分析:
1.校驗 是否分區(qū),是否配置集群時,在 DataBaseMeta初始化時,并沒有對 Partitioned 以及Clustered初始化
public boolean isPartitioned() {
String isClustered = this.attributes.getProperty("IS_CLUSTERED");
return "Y".equalsIgnoreCase(isClustered);
}
初始化流程:
初始化 DataBaseMeta 時,會首先初始化數(shù)據(jù)庫插件,即 通過反射得到 BaseDatabaseMeta 的實例!
獲取數(shù)據(jù)庫插件實例后,將 root,password等參數(shù),以 DataBaseMeta 的 set方式,將參數(shù) 初始化到 插件中!
2.在 獲取 數(shù)據(jù)庫連接驅(qū)動時,如何獲取到的?
通過 加載數(shù)據(jù)庫插件,以及 指定的"MYSQL"數(shù)據(jù)庫類型,通過反射得到databaseInterface時,直接獲取到的!
并沒有預先賦值!
3.如何根據(jù) 傳入的數(shù)據(jù)庫類型 來 獲得 對應的數(shù)據(jù)庫的模型驅(qū)動?
kettle利用 插件的模式,將 多種類型的數(shù)據(jù)庫驅(qū)動 封裝為 可插拔式的接口調(diào)用!
數(shù)據(jù)庫插件結(jié)構圖:DatabasePluginType;

3.1 初始化流程:
DatabaseMeta dataMeta = new DatabaseMeta("kl_kettle", "MYSQL", "Native","127.0.0.1", "kettles", "3306","root","dawei");
databaseTypeDesc:"MYSQL";
private static final DatabaseInterface findDatabaseInterface(String databaseTypeDesc) throws KettleDatabaseException {
PluginRegistry registry = PluginRegistry.getInstance();//插件模式
//獲取 數(shù)據(jù)庫 驅(qū)動(注冊驅(qū)動式在哪里呢)
PluginInterface plugin = registry.getPlugin(DatabasePluginType.class, databaseTypeDesc);
if (plugin == null) {
plugin = registry.findPluginWithName(DatabasePluginType.class, databaseTypeDesc);
}
if (plugin == null) {
throw new KettleDatabaseException("database type with plugin id [" + databaseTypeDesc + "] couldn't be found!");
} else {
return (DatabaseInterface)getDatabaseInterfacesMap().get(plugin.getIds()[0]);
}
}
3.2 初始化 數(shù)據(jù)庫插件(此后的分析將不按照初始化流程 ):
synchronized修飾,避免 并發(fā)造成的 getInstance() 初始化時的 雙重檢查的問題!
private static List<PluginTypeInterface> pluginTypes = new ArrayList();
public static synchronized void init() throws KettlePluginException {
PluginRegistry registry = getInstance();
PluginTypeInterface pluginType;
long startScan;
for(Iterator i$ = pluginTypes.iterator(); i$.hasNext(); LogChannel.GENERAL.logDetailed("Registered " + registry.getPlugins(pluginType.getClass()).size() + " plugins of type '" + pluginType.getName() + "' in " + (System.currentTimeMillis() - startScan) + "ms.")) {
pluginType = (PluginTypeInterface)i$.next();
//注冊插件 統(tǒng)一管理
registry.registerPluginType(pluginType.getClass());
startScan = System.currentTimeMillis();
//初始化資源,加載插件,數(shù)據(jù)庫插件加載 ".xml" 文件
pluginType.searchPlugins();
//加載插件方式二:在系統(tǒng)配置中 配置插件加載路徑
String pluginClasses = EnvUtil.getSystemProperty("KETTLE_PLUGIN_CLASSES");
if (!Const.isEmpty(pluginClasses)) {
String[] classNames = pluginClasses.split(",");
String[] arr$ = classNames;
int len$ = classNames.length;
for(int i$ = 0; i$ < len$; ++i$) {
String className = arr$[i$];
try {
PluginAnnotationType annotationType = (PluginAnnotationType)pluginType.getClass().getAnnotation(PluginAnnotationType.class);
Class<? extends Annotation> annotationClass = annotationType.value();
Class<?> clazz = Class.forName(className);
Annotation annotation = clazz.getAnnotation(annotationClass);
if (annotation != null) {
//獲取插件實例,并 初始化(利用反射注解生成)
pluginType.handlePluginAnnotation(clazz, annotation, new ArrayList(), true, (URL)null);
}
} catch (Exception var15) {
LogChannel.GENERAL.logError("Error registring plugin class from KETTLE_PLUGIN_CLASSES: " + className, var15);
}
}
}
}
JarFileCache.getInstance().clear();
}
pluginTypes : 類型為PluginTypeInterface;
所有加載的插件,最終都存放在pluginTypes中,由 PluginRegistry 統(tǒng)一 注冊,加載 和 管理!
PluginTypeInterface:各類插件的接口,所有新增插件,均需實現(xiàn)此接口!
PluginTypeInterface定義如下:

包含一個插件的最基本的功能定義 !
根據(jù)如上信息,可以 擴展自定義實現(xiàn)我們自己的插件功能,只需實現(xiàn) PluginTypeInterface 接口,
并在初始環(huán)境中注冊此插件即可!
3.3 初始化資源,加載插件:
代碼如下:
pluginType.searchPlugins();
-->DatabasePluginType.registerNatives();
----------------------------------------
加載 kettle-database-types.xml 配置文件
kettle-database-types.xml配置文件 定義了二十多種數(shù)據(jù)庫類型,包括很多常見的數(shù)據(jù)庫 !
protected void registerNatives() throws KettlePluginException {
String xmlFile = "kettle-database-types.xml" ;
try {
InputStream inputStream = this.getClass().getResourceAsStream(xmlFile);
if (inputStream == null) {
inputStream = this.getClass().getResourceAsStream("/" + xmlFile);
}
if (inputStream == null) {
throw new KettlePluginException("Unable to find native kettle database types definition file: " + xmlFile);
} else {
Document document = XMLHandler.loadXMLFile(inputStream, (String)null, true, false);
Node repsNode = XMLHandler.getSubNode(document, "database-types");
List<Node> repsNodes = XMLHandler.getNodes(repsNode, "database-type");
Iterator i$ = repsNodes.iterator();
while(i$.hasNext()) {
Node repNode = (Node)i$.next();
//每一個 database-type 作為一個Node遍歷,初始化為一個 數(shù)據(jù)庫接口服務;
this.registerPluginFromXmlResource(repNode, "./", this.getClass(), true, (URL)null);
}
}
} catch (KettleXMLException var8) {
throw new KettlePluginException("Unable to read the kettle database types XML config file: " + xmlFile, var8);
}
}
XML:
<database-type id="MYSQL">
<description>MySQL</description>
<classname>org.pentaho.di.core.database.MySQLDatabaseMeta</classname>
</database-type>
主要事件:
1.讀取配置數(shù)據(jù)庫驅(qū)動的".xml”文件 !
2.讀取database-type節(jié)點,遍歷每一個數(shù)據(jù)庫驅(qū)動節(jié)點 repNode!
3.生成數(shù)據(jù)庫插件接庫服務,注冊到Registry統(tǒng)一 加載,初始化!
List<PluginInterface> list = (List)this.pluginMap.get(pluginType);
if (list == null) {
list = new ArrayList();
this.pluginMap.put(pluginType, list);
}
//魔鬼藏在細節(jié)中啊,細節(jié),細節(jié)
int index = ((List)list).indexOf(plugin);
if (index < 0) {
((List)list).add(plugin);
} else {
((List)list).set(index, plugin);
}
此段邏輯,為 整個插件 加載初始化 最重要的一步,到這里,僅僅只是將 數(shù)據(jù)庫插件服務 初始化完畢!
生成具體的數(shù)據(jù)庫驅(qū)動實例 在 loadClass中完成的!
3.4 初始化數(shù)據(jù)庫驅(qū)動實例
代碼片段一:
try {
DatabaseInterface databaseInterface = (DatabaseInterface)registry.loadClass(plugin);
databaseInterface.setPluginId(plugin.getIds()[0]);
databaseInterface.setPluginName(plugin.getName());
tmpAllDatabaseInterfaces.put(plugin.getIds()[0], databaseInterface);
}
代碼片段二(摘選核心實現(xiàn)代碼):
Class<? extends T> cl = null;
if (plugin.isNativePlugin()) {
cl = Class.forName(className);
return cl.newInstance();
} else {
//此處 體現(xiàn) kettle的強大之處;
}
綜上整個流程為 kettle 加載數(shù)據(jù)庫驅(qū)動的完整實現(xiàn)!
其結(jié)構實現(xiàn),不得不說,體現(xiàn)kettle的強大之處,在抽象插件功能模塊時:
1.本地代碼方式配置插件,也可以 在java 系統(tǒng)屬性配置插件進行加載初始化!
2.可以根據(jù)自身的功能需求,對 kettle 現(xiàn)有的插件進行擴展重寫;
3.可以加載本地已有的插件,也可以將插件封裝為jar包的形式網(wǎng)絡調(diào)用加載 !
僅僅只是一個 插件配置,加載的設計,所涉及到的使用場景,其考慮的周全,細膩,不得不佩服,足以體現(xiàn)了作者的抽象設計能力,這就是差距,看來還是差的很遠啦!
4.kettle 的配置文件是如何加載的 ?
4.1.框架的配置文件(這塊比較簡單):
框架的配置文件 是 在 環(huán)境初始化時,注冊插件時,根據(jù)插件類型去 加載指定的配置文件 !
// 注冊原生類型和各個所需的插件
PluginRegistry.addPluginType(StepPluginType.getInstance());
PluginRegistry.addPluginType(PartitionerPluginType.getInstance());
PluginRegistry.addPluginType(JobEntryPluginType.getInstance());
PluginRegistry.addPluginType(RepositoryPluginType.getInstance());
代碼片段:
protected void registerNatives() throws KettlePluginException {
String xmlFile = "kettle-repositories.xml";
InputStream inputStream = this.getClass().getResourceAsStream(xmlFile);
if (inputStream == null) {
inputStream = this.getClass().getResourceAsStream("/" + xmlFile);
}
Document document = XMLHandler.loadXMLFile(inputStream, (String)null, true, false);
Node repsNode = XMLHandler.getSubNode(document, "repositories");
List<Node> repsNodes = XMLHandler.getNodes(repsNode, "repository");
Iterator i$ = repsNodes.iterator();
while(i$.hasNext()) {
Node repNode = (Node)i$.next();
this.registerPluginFromXmlResource(repNode, (String)null, this.getClass(), true, (URL)null);
}
}
后續(xù)加載的邏輯,與 數(shù)據(jù)庫插件初始化 流程相同,有興趣的同學可以自己扒源碼看看~ ~。
4.2.自定義配置
自定義的一些配置,比如數(shù)據(jù)庫鏈接屬性等,這個也比較簡單!
目前封裝的結(jié)構來說,所有與數(shù)據(jù)庫有關的配置被映射在 數(shù)據(jù)庫驅(qū)動插件類中:如MYSQL:MySQLDatabaseMeta類中!

MySQLDatabaseMeta 類 被封裝在 DataBaseMeta中使用!自定義的配置,最終都將在DataBaseMeta中被映射到數(shù)據(jù)庫驅(qū)動類中!
5.數(shù)據(jù)庫插件初始化,加載驅(qū)動時,為甚么會加載"org.gjt.mm.mysql.Driver"包 ?
5.1 MySQLDatabaseMeta源碼:
public String getDriverClass() {
return this.getAccessType() == 1 ? "sun.jdbc.odbc.JdbcOdbcDriver" : "org.gjt.mm.mysql.Driver";
}
dbAccessTypeCode = new String[]{"Native"-0, "ODBC"-1, "OCI"-2, "Plugin"-3, "JNDI"-4};
5.2 org.gjt.mm.mysql.Driver 驅(qū)動包為甚么可以加載成功?
引用:
"org.gjt.mm.mysql.Driver 是當時最好的MySQL JDBC,但不是MySQL公司推出的,然后MySQL將 MM公司的 JDBC驅(qū)動收為官方的JDBC驅(qū)動,所以將驅(qū)動的package也改了,
但還保留了org.gjt.mm.mysql.Driver這個路徑的引用,也就是你使用新版的JDBC驅(qū)動時還可以通過這個來引用,你打開下載的新版JDBC驅(qū)動的jar文件可以看到,
只有一個文件的目錄是org.gjt.mm.mysql,就是為了兼容而設計的

6.既然kettle底層使用jdbc的方式去鏈接數(shù)據(jù)庫,那以什么樣的方式 保證connection的有效性 和 避免OOM?
6.1 kettle底層,以 jdbc的方式,去訪問數(shù)據(jù)庫 !
6.2 如何避免大數(shù)據(jù)下產(chǎn)生的OOM?(后續(xù))
通常而言,若不采用第三方框架,而使用JDBC操作數(shù)據(jù)庫,OOM總是不可避免的(Mybatis是另一種方式實現(xiàn)的),在生產(chǎn)環(huán)境中,它會是最隱形,最頻繁的一種bug !
DataBase是如何盡可能的避免這種情況的呢?
一直在想,為什么沒有采用common-dbutils jar呢?
下面的代碼可以當作 JDBC的模板來使用(整個邏輯代碼,要比dbutils實現(xiàn)的更好一些):
6.3 并發(fā)場景下(后續(xù)):
DataBase 中封裝著 DataBaseMeta 和 connection:
前者用于對 參數(shù)的初始化,驅(qū)動的加載 以及 與數(shù)據(jù)庫有關的邏輯校驗。
后者用于 jdbc的鏈接 和 與數(shù)據(jù)庫有關的操作!
整個結(jié)構還是非常清晰!
總結(jié):
kettle 支持數(shù)據(jù)源,默認支持JNDI,但是可以通過 插件功能,自定義擴展,重寫 kettle 數(shù)據(jù)源配置!
從最初 kettle 就被作者定義為一個開放的工具,代碼結(jié)構設計中,處處體現(xiàn)著這一點!比如:插件功能(很強大)!
遷移的幾點思考:
1.遷移到Linux平臺后,對原有的kettle任務進行調(diào)度,調(diào)度邏輯如何實現(xiàn)?
當前國內(nèi)主流的分布式調(diào)度框架,比如當當?shù)膃lastic-job,淘寶的TBSchedule,唯品會的Saturn ,其工作節(jié)點定時調(diào)度邏輯,多基于Quartz(你懂得)+zk的模式!
SpringBoot:
1.1.利用spring對Quartz的高度支持,其定時調(diào)度實現(xiàn)的非常輕量級,利用時間驅(qū)動,實現(xiàn)的非常巧妙!另:不得不說spring對jdk線程池的優(yōu)化來支持其定時調(diào)度!
1.2.可能出現(xiàn)的瓶頸在于 執(zhí)行器上,對于不同的任務,耗時較長 或者 任務過多時,springBoot當前的實現(xiàn),會造成任務異常中斷且后續(xù)不再執(zhí)行等!
1.3.spring完善的生態(tài)體系 和 功能強大的框架組合,雖然提高了開發(fā)效率,但其冗雜無續(xù)的依賴,也是讓人頭疼的!
Quartz:
...
時間輪:
當前,以netty4.x版本中,Hash***的實現(xiàn),其理想狀態(tài),可將延時控制在1s以內(nèi)(最多不會超過1s)。
適合于 時間精度要求不是非常高,且 任務量巨大的場景!
阿里內(nèi)部rocketMq版本,其 定時調(diào)度 就以時間輪實現(xiàn),利用鏈表分區(qū) 以及 鏈表并行,提高并發(fā)效率!
2.初期的結(jié)構設想:
調(diào)度器:定時調(diào)度應用中所有的任務,任務在運行之前,必須將任務的信息注冊到調(diào)度器中,由調(diào)度器 進行統(tǒng)一的調(diào)度!
kettle-task:動態(tài)的創(chuàng)建,更新 task,
Executor:并發(fā)的執(zhí)行已調(diào)度中的任務;
當然實際的場景,也許比這更復雜,需要實時前臺展示執(zhí)行的日志,異常的處理,任務版本的變化監(jiān)控等等!
最終的功能需求,設計結(jié)構 還需后續(xù)!

fdsf