一、開發(fā)背景
工欲善其事,必先利其器。如果我們把Kettle離線或準實時ETL的工具鏈,那就繞不開Kettle定制化插件開發(fā)的環(huán)節(jié)。比如:我們需要對某個組件流出的數據進行特殊函數處理(如加解密);又或者我現有版本的組件不能滿足我們對源端數據捕獲的需求;再或者現有版本的組件缺失對重復消費的需求。
簡而言之,就是業(yè)務流程的特殊性,kettle原有流程處理組件不能滿足或者完全滿足我們的數據處理需求,就需要我們定制開發(fā)流程處理組件,以滿足數據的管理、數據的驗證、數據的轉換和某些特殊類型數據源的抽取。
二、基本框架

我們以上圖splunk查詢插件為例,來一步步闡述Kettle轉換插件的工作原理,這四個類構成了基礎的Kettle步驟/節(jié)點。當然,存在即合理,每一個類都扮演者不同的角色及其特定的作用。
SplunkInput:步驟類
繼承了BaseStep父類,并實現了StepInterface接口,在轉換運行時,他的實例即是數據實際處理的位置,每一個執(zhí)行線程都表示一個此類的示例。

SplunkInputData:數據類
繼承了BaseStepData父類,并實現了StepDataInterface接口,用來存儲數據,當插件執(zhí)行時,對于每一個執(zhí)行線程都是唯一的。執(zhí)行時里面存儲的主要有自定義的元數據對象、數據庫連接、緩存、文件句柄等其他對象信息。

SplunkInputDialog:對話框類
繼承了BaseStepDialog父類,并實現了StepDialogInterface接口,該類主要實現組件步驟與ETL開發(fā)者交互配置的界面,ETL開發(fā)者按照設定好的輸入和輸出選項配置,來實現個性化ETL開發(fā)。

SplunkInputMeta:元數據類
繼承了BaseStepMeta父類,并實現了StepMetaInterface接口。他的作用是保存和序列化特定步驟實例的配置,即跟蹤SplunkInputDialog類的開發(fā)者配置。在本例子中,它負責保存開發(fā)者設置的步驟屬性、splunk連接配置屬性和輸出字段的名稱類型等屬性信息。
插件展現配置
負責定義插件步驟在Kettle可視化UI工作臺中的顯示效果。設置插件的唯一ID、名稱及描述,說明kettle插件要加載的元數據類,以及需要預先加載的依賴Jar包列表。當然它有以下兩種實現方式,本例采用第一種。
一、通過@Step注解實現
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
id:在Kettle插件中必須保證全局唯一
name:自定義插件Spoon UI中的顯示名稱,無強制唯一性要求
description:描述該插件的具體作用,或者簡要使用說明
image:Spoon UI中的顯示圖標,盡量使用svg格式圖片,可到阿里Icon圖庫查找和下載所需的圖標(https://www.iconfont.cn/)
categoryDescription:插件歸屬目錄
……
二、通過plugin.xml實現
<?xml version="1.0" encoding="UTF-8"?>
<plugin
id="KafkaConsumer"
iconfile="logo.png"
description="Apache Kafka Consumer"
tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"
category="Input"
classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta">
<libraries>
<library name="pentaho-kafka-consumer.jar"/>
……
<library name="lib/zookeeper-3.4.6.jar"/>
</libraries>
<localized_category>
<category locale="en_US">Input</category>
</localized_category>
<localized_description>
<description locale="en_US">Apache Kafka Consumer</description>
</localized_description>
<localized_tooltip>
<tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip>
</localized_tooltip>
</plugin>
id:在Kettle插件中必須保證全局唯一
iconfile:Spoon UI中的顯示圖標,盡量使用png格式圖片
description:描述該插件的具體作用
tooltip:樹形菜單中,鼠標滑過顯示的提示信息
category:插件歸屬目錄
classname:元數據類
libraries:插件依賴Jar包列表
三、實現代碼
一、步驟類
public class SplunkInput extends BaseStep implements StepInterface {
private SplunkInputMeta meta;
private SplunkInputData data;
public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,
int copyNr, TransMeta transMeta, Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
// Is the step getting input?
// List<StepMeta> steps = getTransMeta().findPreviousSteps(
// getStepMeta() );
// Connect to Neo4j
if (StringUtils.isEmpty(meta.getHost())) {
log.logError("You need to specify a Splunk connection Host to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getPort())) {
log.logError("You need to specify a Splunk connection Port to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getUsername())) {
log.logError("You need to specify a Splunk connection Username to use in this step");
return false;
}
if (StringUtils.isEmpty(meta.getPassword())) {
log.logError("You need to specify a Splunk connection Password to use in this step");
return false;
}
// To correct lazy programmers who built certain PDI steps...
//System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");
//Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");
/* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */
// HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
/* end comment for overriding the method setSslSecurityProtocol */
data.splunkConnection = new SplunkConnection(meta.getName(),
meta.getHost(), meta.getPort(), meta.getUsername(),
meta.getPassword());
data.splunkConnection.initializeVariablesFrom(this);
try {
data.serviceArgs = data.splunkConnection.getServiceArgs();
data.service = Service.connect(data.serviceArgs);
} catch (Exception e) {
log.logError(
"Unable to get or create Neo4j database driver for database '"
+ data.splunkConnection.getName() + "'", e);
return false;
}
return super.init(smi, sdi);
}
@Override
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
super.dispose(smi, sdi);
}
@Override
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
meta = (SplunkInputMeta) smi;
data = (SplunkInputData) sdi;
if (first) {
first = false;
// get the output fields...
data.outputRowMeta = new RowMeta();
meta.getFields(data.outputRowMeta, getStepname(), null,
getStepMeta(), this, repository, data.metaStore);
// Run a one shot search in blocking mode
JobResultsArgs args = new JobResultsArgs();
//不限制返回數據量,即返回所有數據
args.setCount(0);
args.setOutputMode(JobResultsArgs.OutputMode.XML);
data.eventsStream = data.service.oneshotSearch(getTransMeta()
.environmentSubstitute(meta.getQuery()), args);
}
try {
ResultsReaderXml resultsReader = new ResultsReaderXml(
data.eventsStream);
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
Object[] outputRow = RowDataUtil
.allocateRowData(data.outputRowMeta.size());
for (int i = 0; i < meta.getReturnValues().size(); i++) {
ReturnValue returnValue = meta.getReturnValues().get(i);
String value = event.get(returnValue.getSplunkName());
outputRow[i] = value;
}
incrementLinesInput();
putRow(data.outputRowMeta, outputRow);
}
} catch (Exception e) {
throw new KettleException(
"Error reading from Splunk events stream", e);
} finally {
try {
data.eventsStream.close();
} catch (IOException e) {
throw new KettleException("Unable to close events stream", e);
}
}
setOutputDone();
return false;
}
}
init(),該方法是在轉換執(zhí)行前被調用,只有所有步驟的初始化成功時,轉換才會真正被執(zhí)行。此例是檢查splunk連接ip、port等屬性是否配置,以及splunk連接初始化是否成功
dispose(),該方法作用是在轉換步驟執(zhí)行完后執(zhí)行,完成如緩存、文件句柄等資源的關閉操作。
run(),該方法是在實際處理數據流記錄集時調用。
processRow(),該方法作用是處理所有數據流。通常通過調用getRow()來獲取需要處理的單條記錄。 這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理數據時。processRow()隨后的流程將執(zhí)行轉換工作并調用putRow()方法將處理過的記錄放到它的下游步驟。
二、數據類
開發(fā)中,大多數環(huán)節(jié)都需要臨時的緩沖或者臨時的數據。數據類就是這些數據合適的存放位置。每一個執(zhí)行線程將得到其擁有的數據類實例,所以它能在獨立的空間里面運行。
public class SplunkInputData extends BaseStepData implements StepDataInterface {
public RowMetaInterface outputRowMeta;
public SplunkConnection splunkConnection;//splunk連接
public int[] fieldIndexes;//字段索引數組
public String query;//splunk spl語句(必須以search開頭)
public IMetaStore metaStore;//元數據倉庫對象
public ServiceArgs serviceArgs;//splunk查詢服務參數
public Service service;//splunk查詢服務
public InputStream eventsStream;//輸入事件流
}
三、對話框類
此處主要實現輸入屬性監(jiān)聽器配置,配置數據初始化約束檢查,splunk連接校驗以及返回結果集字段屬性預覽等功能模塊
private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,
// needed by
// Translator2!!
private Text wStepname;
private Text wHost;//splunk域名或IP
private Text wPort;//splunk端口
private Text wUsername;//splunk用戶名
private Text wPassword;//splunk密碼
private Text wQuery;//splunk spl語句
private TableView wReturns;//splunk組件輸出字段及屬性
private SplunkInputMeta input;
……
四、元數據類
下面針對splunk查詢組件,列舉了元數據類的幾個關鍵的方法,其中私有成員變量outputField 存放了下一個步驟的輸出流字段。
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
@InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {
"PARAMETERS", "RETURNS" })
public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String QUERY = "query";
public static final String RETURNS = "returns";
public static final String RETURN = "return";
public static final String RETURN_NAME = "return_name";
public static final String RETURN_SPLUNK_NAME = "return_splunk_name";
public static final String RETURN_TYPE = "return_type";
public static final String RETURN_LENGTH = "return_length";
public static final String RETURN_FORMAT = "return_format";
@Injection(name = HOST)
private String host;
@Injection(name = PORT)
private String port;
@Injection(name = USERNAME)
private String username;
@Injection(name = PASSWORD)
private String password;
@Injection(name = QUERY)
private String query;
@InjectionDeep
private List<ReturnValue> returnValues;
public SplunkInputMeta() {
super();
returnValues = new ArrayList<>();
}
……
}
// 跟蹤組件步驟輸入和輸出設置
public String getOutputField()
public void setOutputField(…)
public void setDefault() //配置參數初始化
@Override
public void setDefault() {
host = "127.0.0.1";
port = "8089";
username = "query";
password = "query";
query = "search * | head 100";
}
// 依賴獲取和載入xml,序列化步驟屬性設置
public String getXML()
@Override
public String getXML() {
StringBuilder xml = new StringBuilder();
xml.append(XMLHandler.addTagValue(HOST, host));
xml.append(XMLHandler.addTagValue(PORT, port));
xml.append(XMLHandler.addTagValue(USERNAME, username));
xml.append(XMLHandler.addTagValue(PASSWORD, password));
xml.append(XMLHandler.addTagValue(QUERY, query));
xml.append(XMLHandler.openTag(RETURNS));
for (ReturnValue returnValue : returnValues) {
xml.append(XMLHandler.openTag(RETURN));
xml.append(XMLHandler.addTagValue(RETURN_NAME,
returnValue.getName()));
xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,
returnValue.getSplunkName()));
xml.append(XMLHandler.addTagValue(RETURN_TYPE,
returnValue.getType()));
xml.append(XMLHandler.addTagValue(RETURN_LENGTH,
returnValue.getLength()));
xml.append(XMLHandler.addTagValue(RETURN_FORMAT,
returnValue.getFormat()));
xml.append(XMLHandler.closeTag(RETURN));
}
xml.append(XMLHandler.closeTag(RETURNS));
return xml.toString();
}
public void loadXML(…)
// 從資源庫讀取和保存步驟屬性設置
public void readRep(…)
@Override
public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,
List<DatabaseMeta> databases) throws KettleException {
host = rep.getStepAttributeString(stepId, HOST);
port = rep.getStepAttributeString(stepId, PORT);
username = rep.getStepAttributeString(stepId, USERNAME);
password = rep.getStepAttributeString(stepId, PASSWORD);
query = rep.getStepAttributeString(stepId, QUERY);
returnValues = new ArrayList<>();
int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);
for (int i = 0; i < nrReturns; i++) {
String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);
String splunkName = rep.getStepAttributeString(stepId, i,
RETURN_SPLUNK_NAME);
String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);
int length = (int) rep.getStepAttributeInteger(stepId, i,
RETURN_LENGTH);
String format = rep
.getStepAttributeString(stepId, i, RETURN_FORMAT);
returnValues.add(new ReturnValue(name, splunkName, type, length,
format));
}
}
public void saveRep(…)
// 提供有關步驟如何處理數據流行的字段結構的信息
public void getFields(…)
@Override
public void getFields(RowMetaInterface rowMeta, String name,
RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,
Repository repository, IMetaStore metaStore)
throws KettleStepException {
for (ReturnValue returnValue : returnValues) {
try {
int type = ValueMetaFactory.getIdForValueMeta(returnValue
.getType());
ValueMetaInterface valueMeta = ValueMetaFactory
.createValueMeta(returnValue.getName(), type);
valueMeta.setLength(returnValue.getLength());
valueMeta.setOrigin(name);
rowMeta.addValueMeta(valueMeta);
} catch (KettlePluginException e) {
throw new KettleStepException("Unknown data type '"
+ returnValue.getType() + "' for value named '"
+ returnValue.getName() + "'");
}
}
}
// 對步驟執(zhí)行擴展驗證檢查
public void check(…)
// 向Kettle提供步驟、數據和對話框類的實例
public StepInterface getStep(…)
@Override
public StepInterface getStep(StepMeta stepMeta,
StepDataInterface stepDataInterface, int i, TransMeta transMeta,
Trans trans) {
return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);
}
public StepDataInterface getStepData()
@Override
public StepDataInterface getStepData() {
return new SplunkInputData();
}
五、附加類
該插件還需要SplunkConnection類來構建splunk連接,ReturnValue類來標準化splunk輸出流字段信息。
public ServiceArgs getServiceArgs() {
ServiceArgs args = new ServiceArgs();
args.setUsername(getRealUsername());
args.setPassword(getRealPassword());
args.setHost(getRealHostname());
args.setPort(Const.toInt(getRealPort(), 8089));
args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具體支持協議依賴于服務端配置
return args;
}
public class ReturnValue {
@Injection(name = "RETURN_NAME", group = "RETURNS")
private String name;
@Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")
private String splunkName;
@Injection(name = "RETURN_TYPE", group = "RETURNS")
private String type;
@Injection(name = "RETURN_LENGTH", group = "RETURNS")
private int length;
@Injection(name = "RETURN_FORMAT", group = "RETURNS")
private String format;
public ReturnValue(String name, String splunkName, String type, int length,
String format) {
this.name = name;
this.splunkName = splunkName;
this.type = type;
this.length = length;
this.format = format;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReturnValue that = (ReturnValue) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String toString() {
return "ReturnValue{" + "name='" + name + '\'' + '}';
}
/**
* Gets name
*
* @return value of name
*/
public String getName() {
return name;
}
/**
* @param name
* The name to set
*/
public void setName(String name) {
this.name = name;
}
/**
* Gets splunkName
*
* @return value of splunkName
*/
public String getSplunkName() {
return splunkName;
}
/**
* @param splunkName
* The splunkName to set
*/
public void setSplunkName(String splunkName) {
this.splunkName = splunkName;
}
/**
* Gets type
*
* @return value of type
*/
public String getType() {
return type;
}
/**
* @param type
* The type to set
*/
public void setType(String type) {
this.type = type;
}
/**
* Gets length
*
* @return value of length
*/
public int getLength() {
return length;
}
/**
* @param length
* The length to set
*/
public void setLength(int length) {
this.length = length;
}
/**
* Gets format
*
* @return value of format
*/
public String getFormat() {
return format;
}
/**
* @param format
* The format to set
*/
public void setFormat(String format) {
this.format = format;
}
}
四、總結
通過以上對于Kettle自定義插件開發(fā)的闡述,我們基本上可以掌握端到端的開發(fā)流程,如果你需要源碼或者了解更多自定義插件及集成方式,抑或有開發(fā)過程或者使用過程中的任何疑問或建議,請關注小編"游走在數據之間"