Kettle插件開發(fā)之Splunk

一、開發(fā)背景

工欲善其事,必先利其器。如果我們把Kettle離線或準實時ETL的工具鏈,那就繞不開Kettle定制化插件開發(fā)的環(huán)節(jié)。比如:我們需要對某個組件流出的數據進行特殊函數處理(如加解密);又或者我現有版本的組件不能滿足我們對源端數據捕獲的需求;再或者現有版本的組件缺失對重復消費的需求。

簡而言之,就是業(yè)務流程的特殊性,kettle原有流程處理組件不能滿足或者完全滿足我們的數據處理需求,就需要我們定制開發(fā)流程處理組件,以滿足數據的管理、數據的驗證、數據的轉換和某些特殊類型數據源的抽取。

二、基本框架

image.png

我們以上圖splunk查詢插件為例,來一步步闡述Kettle轉換插件的工作原理,這四個類構成了基礎的Kettle步驟/節(jié)點。當然,存在即合理,每一個類都扮演者不同的角色及其特定的作用。

SplunkInput:步驟類

繼承了BaseStep父類,并實現了StepInterface接口,在轉換運行時,他的實例即是數據實際處理的位置,每一個執(zhí)行線程都表示一個此類的示例。

image.png

SplunkInputData:數據類

繼承了BaseStepData父類,并實現了StepDataInterface接口,用來存儲數據,當插件執(zhí)行時,對于每一個執(zhí)行線程都是唯一的。執(zhí)行時里面存儲的主要有自定義的元數據對象、數據庫連接、緩存、文件句柄等其他對象信息。

image.png

SplunkInputDialog:對話框類

繼承了BaseStepDialog父類,并實現了StepDialogInterface接口,該類主要實現組件步驟與ETL開發(fā)者交互配置的界面,ETL開發(fā)者按照設定好的輸入和輸出選項配置,來實現個性化ETL開發(fā)。

image.png

SplunkInputMeta:元數據類

繼承了BaseStepMeta父類,并實現了StepMetaInterface接口。他的作用是保存和序列化特定步驟實例的配置,即跟蹤SplunkInputDialog類的開發(fā)者配置。在本例子中,它負責保存開發(fā)者設置的步驟屬性、splunk連接配置屬性和輸出字段的名稱類型等屬性信息。

插件展現配置

負責定義插件步驟在Kettle可視化UI工作臺中的顯示效果。設置插件的唯一ID、名稱及描述,說明kettle插件要加載的元數據類,以及需要預先加載的依賴Jar包列表。當然它有以下兩種實現方式,本例采用第一種。

一、通過@Step注解實現

@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")

id:在Kettle插件中必須保證全局唯一

name:自定義插件Spoon UI中的顯示名稱,無強制唯一性要求

description:描述該插件的具體作用,或者簡要使用說明

image:Spoon UI中的顯示圖標,盡量使用svg格式圖片,可到阿里Icon圖庫查找和下載所需的圖標(https://www.iconfont.cn/

categoryDescription:插件歸屬目錄

……

二、通過plugin.xml實現

<?xml version="1.0" encoding="UTF-8"?>
<plugin
    id="KafkaConsumer"
    iconfile="logo.png"
    description="Apache Kafka Consumer"
    tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"
    category="Input"
    classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta">
    
    <libraries>
        <library name="pentaho-kafka-consumer.jar"/>
        ……
        <library name="lib/zookeeper-3.4.6.jar"/>
    </libraries>
    
    <localized_category>
        <category locale="en_US">Input</category>
    </localized_category>
    <localized_description>
        <description locale="en_US">Apache Kafka Consumer</description>
    </localized_description>
    <localized_tooltip>
        <tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip>
    </localized_tooltip>
</plugin>

id:在Kettle插件中必須保證全局唯一
iconfile:Spoon UI中的顯示圖標,盡量使用png格式圖片
description:描述該插件的具體作用
tooltip:樹形菜單中,鼠標滑過顯示的提示信息
category:插件歸屬目錄
classname:元數據類
libraries:插件依賴Jar包列表

三、實現代碼

一、步驟類

public class SplunkInput extends BaseStep implements StepInterface {
    private SplunkInputMeta meta;
    private SplunkInputData data;
    public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,
            int copyNr, TransMeta transMeta, Trans trans) {
        super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
    }
    @Override
    public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        // Is the step getting input?
        // List<StepMeta> steps = getTransMeta().findPreviousSteps(
        // getStepMeta() );
        // Connect to Neo4j
        if (StringUtils.isEmpty(meta.getHost())) {
            log.logError("You need to specify a Splunk connection Host to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getPort())) {
            log.logError("You need to specify a Splunk connection Port to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getUsername())) {
            log.logError("You need to specify a Splunk connection Username to use in this step");
            return false;
        }
        if (StringUtils.isEmpty(meta.getPassword())) {
            log.logError("You need to specify a Splunk connection Password to use in this step");
            return false;
        }
        // To correct lazy programmers who built certain PDI steps...
        //System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");
        
        //Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");
         /* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */
        // HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
        /* end comment for overriding the method setSslSecurityProtocol */
        
        data.splunkConnection = new SplunkConnection(meta.getName(),
                meta.getHost(), meta.getPort(), meta.getUsername(),
                meta.getPassword());
        data.splunkConnection.initializeVariablesFrom(this);
        try {
            data.serviceArgs = data.splunkConnection.getServiceArgs();
            
            data.service = Service.connect(data.serviceArgs);
        } catch (Exception e) {
            log.logError(
                    "Unable to get or create Neo4j database driver for database '"
                            + data.splunkConnection.getName() + "'", e);
            return false;
        }
        return super.init(smi, sdi);
    }
    @Override
    public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        super.dispose(smi, sdi);
    }
    @Override
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        if (first) {
            first = false;
            // get the output fields...
            data.outputRowMeta = new RowMeta();
            meta.getFields(data.outputRowMeta, getStepname(), null,
                    getStepMeta(), this, repository, data.metaStore);
            // Run a one shot search in blocking mode
            JobResultsArgs args = new JobResultsArgs();
            //不限制返回數據量,即返回所有數據
            args.setCount(0);
            args.setOutputMode(JobResultsArgs.OutputMode.XML);
            data.eventsStream = data.service.oneshotSearch(getTransMeta()
                    .environmentSubstitute(meta.getQuery()), args);
        }
        try {
            ResultsReaderXml resultsReader = new ResultsReaderXml(
                    data.eventsStream);
            HashMap<String, String> event;
            while ((event = resultsReader.getNextEvent()) != null) {
                Object[] outputRow = RowDataUtil
                        .allocateRowData(data.outputRowMeta.size());
                for (int i = 0; i < meta.getReturnValues().size(); i++) {
                    ReturnValue returnValue = meta.getReturnValues().get(i);
                    String value = event.get(returnValue.getSplunkName());
                    outputRow[i] = value;
                }
                incrementLinesInput();
                putRow(data.outputRowMeta, outputRow);
            }
        } catch (Exception e) {
            throw new KettleException(
                    "Error reading from Splunk events stream", e);
        } finally {
            try {
                data.eventsStream.close();
            } catch (IOException e) {
                throw new KettleException("Unable to close events stream", e);
            }
        }
        setOutputDone();
        return false;
    }
}

init(),該方法是在轉換執(zhí)行前被調用,只有所有步驟的初始化成功時,轉換才會真正被執(zhí)行。此例是檢查splunk連接ip、port等屬性是否配置,以及splunk連接初始化是否成功
dispose(),該方法作用是在轉換步驟執(zhí)行完后執(zhí)行,完成如緩存、文件句柄等資源的關閉操作。
run(),該方法是在實際處理數據流記錄集時調用。
processRow(),該方法作用是處理所有數據流。通常通過調用getRow()來獲取需要處理的單條記錄。 這個方法如果有需要將會被阻塞,例如當此步驟希望放慢腳步處理數據時。processRow()隨后的流程將執(zhí)行轉換工作并調用putRow()方法將處理過的記錄放到它的下游步驟。

二、數據類

開發(fā)中,大多數環(huán)節(jié)都需要臨時的緩沖或者臨時的數據。數據類就是這些數據合適的存放位置。每一個執(zhí)行線程將得到其擁有的數據類實例,所以它能在獨立的空間里面運行。

public class SplunkInputData extends BaseStepData implements StepDataInterface {
    public RowMetaInterface outputRowMeta;
    public SplunkConnection splunkConnection;//splunk連接
    public int[] fieldIndexes;//字段索引數組
    public String query;//splunk spl語句(必須以search開頭)
    public IMetaStore metaStore;//元數據倉庫對象
    public ServiceArgs serviceArgs;//splunk查詢服務參數
    public Service service;//splunk查詢服務
    public InputStream eventsStream;//輸入事件流
}

三、對話框類

此處主要實現輸入屬性監(jiān)聽器配置,配置數據初始化約束檢查,splunk連接校驗以及返回結果集字段屬性預覽等功能模塊

private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,
                                                            // needed by
                                                            // Translator2!!
    private Text wStepname;
    private Text wHost;//splunk域名或IP
    private Text wPort;//splunk端口
    private Text wUsername;//splunk用戶名
    private Text wPassword;//splunk密碼
    private Text wQuery;//splunk spl語句
    private TableView wReturns;//splunk組件輸出字段及屬性
    private SplunkInputMeta input;
    
    ……

四、元數據類

下面針對splunk查詢組件,列舉了元數據類的幾個關鍵的方法,其中私有成員變量outputField 存放了下一個步驟的輸出流字段。

@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
@InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {
        "PARAMETERS", "RETURNS" })
public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {
    public static final String HOST = "host";
    public static final String PORT = "port";
    public static final String USERNAME = "username";
    public static final String PASSWORD = "password";
    public static final String QUERY = "query";
    public static final String RETURNS = "returns";
    public static final String RETURN = "return";
    public static final String RETURN_NAME = "return_name";
    public static final String RETURN_SPLUNK_NAME = "return_splunk_name";
    public static final String RETURN_TYPE = "return_type";
    public static final String RETURN_LENGTH = "return_length";
    public static final String RETURN_FORMAT = "return_format";
    @Injection(name = HOST)
    private String host;
    @Injection(name = PORT)
    private String port;
    @Injection(name = USERNAME)
    private String username;
    @Injection(name = PASSWORD)
    private String password;
    @Injection(name = QUERY)
    private String query;
    @InjectionDeep
    private List<ReturnValue> returnValues;
    public SplunkInputMeta() {
        super();
        returnValues = new ArrayList<>();
    }
    ……
}

// 跟蹤組件步驟輸入和輸出設置
public String getOutputField()
public void setOutputField(…)
public void setDefault() //配置參數初始化

@Override
    public void setDefault() {
        host = "127.0.0.1";
        port = "8089";
        username = "query";
        password = "query";
        query = "search * | head 100";
    }

// 依賴獲取和載入xml,序列化步驟屬性設置
public String getXML()

@Override
    public String getXML() {
        StringBuilder xml = new StringBuilder();
        xml.append(XMLHandler.addTagValue(HOST, host));
        xml.append(XMLHandler.addTagValue(PORT, port));
        xml.append(XMLHandler.addTagValue(USERNAME, username));
        xml.append(XMLHandler.addTagValue(PASSWORD, password));
        xml.append(XMLHandler.addTagValue(QUERY, query));
        xml.append(XMLHandler.openTag(RETURNS));
        for (ReturnValue returnValue : returnValues) {
            xml.append(XMLHandler.openTag(RETURN));
            xml.append(XMLHandler.addTagValue(RETURN_NAME,
                    returnValue.getName()));
            xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,
                    returnValue.getSplunkName()));
            xml.append(XMLHandler.addTagValue(RETURN_TYPE,
                    returnValue.getType()));
            xml.append(XMLHandler.addTagValue(RETURN_LENGTH,
                    returnValue.getLength()));
            xml.append(XMLHandler.addTagValue(RETURN_FORMAT,
                    returnValue.getFormat()));
            xml.append(XMLHandler.closeTag(RETURN));
        }
        xml.append(XMLHandler.closeTag(RETURNS));
        return xml.toString();
    }

public void loadXML(…)
// 從資源庫讀取和保存步驟屬性設置
public void readRep(…)

@Override
    public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,
            List<DatabaseMeta> databases) throws KettleException {
        host = rep.getStepAttributeString(stepId, HOST);
        port = rep.getStepAttributeString(stepId, PORT);
        username = rep.getStepAttributeString(stepId, USERNAME);
        password = rep.getStepAttributeString(stepId, PASSWORD);
        query = rep.getStepAttributeString(stepId, QUERY);
        returnValues = new ArrayList<>();
        int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);
        for (int i = 0; i < nrReturns; i++) {
            String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);
            String splunkName = rep.getStepAttributeString(stepId, i,
                    RETURN_SPLUNK_NAME);
            String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);
            int length = (int) rep.getStepAttributeInteger(stepId, i,
                    RETURN_LENGTH);
            String format = rep
                    .getStepAttributeString(stepId, i, RETURN_FORMAT);
            returnValues.add(new ReturnValue(name, splunkName, type, length,
                    format));
        }
    }

public void saveRep(…)
// 提供有關步驟如何處理數據流行的字段結構的信息
public void getFields(…)

@Override
    public void getFields(RowMetaInterface rowMeta, String name,
            RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,
            Repository repository, IMetaStore metaStore)
            throws KettleStepException {
        for (ReturnValue returnValue : returnValues) {
            try {
                int type = ValueMetaFactory.getIdForValueMeta(returnValue
                        .getType());
                ValueMetaInterface valueMeta = ValueMetaFactory
                        .createValueMeta(returnValue.getName(), type);
                valueMeta.setLength(returnValue.getLength());
                valueMeta.setOrigin(name);
                rowMeta.addValueMeta(valueMeta);
            } catch (KettlePluginException e) {
                throw new KettleStepException("Unknown data type '"
                        + returnValue.getType() + "' for value named '"
                        + returnValue.getName() + "'");
            }
        }
    }

// 對步驟執(zhí)行擴展驗證檢查
public void check(…)
// 向Kettle提供步驟、數據和對話框類的實例
public StepInterface getStep(…)

@Override
    public StepInterface getStep(StepMeta stepMeta,
            StepDataInterface stepDataInterface, int i, TransMeta transMeta,
            Trans trans) {
        return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);
    }

public StepDataInterface getStepData()

@Override
    public StepDataInterface getStepData() {
        return new SplunkInputData();
    }

五、附加類

該插件還需要SplunkConnection類來構建splunk連接,ReturnValue類來標準化splunk輸出流字段信息。

public ServiceArgs getServiceArgs() {
        ServiceArgs args = new ServiceArgs();
        args.setUsername(getRealUsername());
        args.setPassword(getRealPassword());
        args.setHost(getRealHostname());
        args.setPort(Const.toInt(getRealPort(), 8089));
        args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具體支持協議依賴于服務端配置
        return args;
    }
public class ReturnValue {
    @Injection(name = "RETURN_NAME", group = "RETURNS")
    private String name;
    @Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")
    private String splunkName;
    @Injection(name = "RETURN_TYPE", group = "RETURNS")
    private String type;
    @Injection(name = "RETURN_LENGTH", group = "RETURNS")
    private int length;
    @Injection(name = "RETURN_FORMAT", group = "RETURNS")
    private String format;
    public ReturnValue(String name, String splunkName, String type, int length,
            String format) {
        this.name = name;
        this.splunkName = splunkName;
        this.type = type;
        this.length = length;
        this.format = format;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        ReturnValue that = (ReturnValue) o;
        return Objects.equals(name, that.name);
    }
    @Override
    public int hashCode() {
        return Objects.hash(name);
    }
    @Override
    public String toString() {
        return "ReturnValue{" + "name='" + name + '\'' + '}';
    }
    /**
     * Gets name
     *
     * @return value of name
     */
    public String getName() {
        return name;
    }
    /**
     * @param name
     *            The name to set
     */
    public void setName(String name) {
        this.name = name;
    }
    /**
     * Gets splunkName
     *
     * @return value of splunkName
     */
    public String getSplunkName() {
        return splunkName;
    }
    /**
     * @param splunkName
     *            The splunkName to set
     */
    public void setSplunkName(String splunkName) {
        this.splunkName = splunkName;
    }
    /**
     * Gets type
     *
     * @return value of type
     */
    public String getType() {
        return type;
    }
    /**
     * @param type
     *            The type to set
     */
    public void setType(String type) {
        this.type = type;
    }
    /**
     * Gets length
     *
     * @return value of length
     */
    public int getLength() {
        return length;
    }
    /**
     * @param length
     *            The length to set
     */
    public void setLength(int length) {
        this.length = length;
    }
    /**
     * Gets format
     *
     * @return value of format
     */
    public String getFormat() {
        return format;
    }
    /**
     * @param format
     *            The format to set
     */
    public void setFormat(String format) {
        this.format = format;
    }
}

四、總結

通過以上對于Kettle自定義插件開發(fā)的闡述,我們基本上可以掌握端到端的開發(fā)流程,如果你需要源碼或者了解更多自定義插件及集成方式,抑或有開發(fā)過程或者使用過程中的任何疑問或建議,請關注小編"游走在數據之間"

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容