? ? 歡迎來到cep-china!以下文章翻譯
????https://docs.wso2.com/display/SP430/Quick+Start+Guide
? ? 僅供學(xué)習(xí)參考,未經(jīng)許可不得轉(zhuǎn)載。
????WSO2流處理引擎(SP)是一個輕量級、精簡、基于流SQL的處理平臺,可以毫秒級響應(yīng)收集事件、分析事件、模式識別、反饋影響并傳遞結(jié)果。它由Siddhi提供動力,性能極佳。
????首先,讓我們理解在快速開始中用到的概念:
流引擎和復(fù)雜事件處理簡介
????讓我們通過一個例子了解什么是事件。如果我們將通過ATM進(jìn)行的交易視為數(shù)據(jù)流,則可以將其中的一次取款認(rèn)為是一個事件。此事件包含相關(guān)的金額、事件、賬戶等數(shù)據(jù)。許多次這樣的交易形成了一個流。

????流處理引擎允許你創(chuàng)建一個處理圖并將事件注入其中。每個操作處理這個事件并向下一個處理器發(fā)送事件。
????一個復(fù)雜事件是匯總、展現(xiàn)或表示一系列其他事件的事件。復(fù)雜事件處理是流處理的子集,其涉及實(shí)時分析多個事件流,識別特定序列或模式,從相關(guān)事件推斷出重要的業(yè)務(wù)事件。

????WSO2 SP的流處理能力允許你抓去大量數(shù)據(jù)流并實(shí)時處理他們,并以流式方式呈現(xiàn)結(jié)果,同時其復(fù)雜事件處理功能通過Siddhi支持的模式和序列來模式檢測和趨勢預(yù)測。
Siddhi概覽
????WSO2 SP使用Siddhi查詢語言來編寫Siddhi應(yīng)用程序的處理邏輯。Siddhi可以:
????1.從多種不同類型的數(shù)據(jù)源接受事件輸入
????2.處理事件并產(chǎn)生觀點(diǎn)(信息)
????3.推送觀點(diǎn)(信息)到多種類型的接收器
????你需要用Siddhi流SQL語言來編寫Siddhi應(yīng)用程序的邏輯,在編寫并啟動Siddhi應(yīng)用程序以后,它
????1.逐個獲取數(shù)據(jù)作為事件
????2.處理每個事件中的數(shù)據(jù)
????3.根據(jù)目前的處理生成新的高等級事件
????4.將新生成的事件作為輸出發(fā)送到流
????在開始之前,你需要準(zhǔn)備:????
????????a.安裝Oracle Java SE Development Kit (JDK)?version 1.8
? ????? b.設(shè)置JAVA_HOME環(huán)境變量
? ????? c.解壓下載的zip壓縮包到/bin目錄(是解壓目錄)
? ????? d.運(yùn)行如下命令來開始WSO2 SP編輯器
????????????Windows:editor.bat
????????????Linux:./editor.sh
? ? WSO2 SP成功開始后,在CLI中會打印出一條相似的log

????服務(wù)的log將SP編輯器的URL打印在初始化log中:

????譯者注:可以按照How to install WSO2 Stream Processor進(jìn)行安裝
? ??劇本
? ??在這個劇本下,您正在為一家虛構(gòu)的大型航運(yùn)公司Shipping Wave創(chuàng)建一個應(yīng)用程序。貨運(yùn)經(jīng)理Smith需要跟蹤在任何制定事件段內(nèi)裝載到船上貨物的總重量。當(dāng)貨箱裝載到船上時測量貨箱的重量被認(rèn)為是一個事件。
讓我們開始吧!你可以哪找如下步驟寫一個簡單的Siddhi應(yīng)用程序來計算裝載到船上每個貨箱的總重量:
????1.創(chuàng)建一個Sddhi應(yīng)用程序
????2.模擬事件
????3.編輯Siddhi應(yīng)用程序來處理執(zhí)行時間處理
????4.為編輯過的應(yīng)用程序來模擬事件
????5.下一步?
Step 1:創(chuàng)建一個Siddhi應(yīng)用程序
????Smith 需要計算每當(dāng)有貨物裝入船上后貨物總重量。為了這個結(jié)果,讓我們創(chuàng)建一個Siddhi應(yīng)用程序,如下所示:
????1.通過http://<HOST_NAME>:<EDITOR_PORT>/editor的地址來訪問SP Studio
? ????? 注:默認(rèn)地址為http://localhost:9390/editor
????????SP Studio打開后如下

????2.輸入你的Siddhi應(yīng)用程序的名字,在這個劇本中,給這個應(yīng)用用CargoWeightApp來命名,如下:
????? ??@App:name("CargoWeightApp")
????3.定義輸入流。流需要有一個名字和模式,來定義每個傳入事件應(yīng)包含的數(shù)據(jù)。事件數(shù)據(jù)屬性表示為名稱和類型。在這個例子中:
? ? ????a.輸入流的名稱:CargoStream
? ? ????b.誒個事件中數(shù)據(jù)的名稱:weight
? ? ????c.接收到weight的數(shù)據(jù)類型:int????
????define stream CargoStream(weight?int);
????4.定義一個輸出流。它與先前定義的信息相同,并有一個額外的totalWeight屬性,來表示迄今為止總的重量。這里,我們需要添加接收器來記錄輸出流中的事件,方便我們觀察輸出值。
????注:通過Siddhi,接收器定義了一個方法將流推送到其他額外的系統(tǒng)。在這個場景下,接收器增加了log類型,它將輸出流以log的形式推送到CLI。
? ??@sink(type='log',prefix='LOGGER')
? ??define stream OutputStream(weightint,totalWeight long);
????5.輸入按照如下方式的Siddhi查詢:
????????a.查詢的名稱(例如cargoWeightQuery)
????????b.被處理事件的輸入流(例如CargoStream)
????????c.需要被發(fā)送到輸出流的數(shù)據(jù)(例如weight和totalWeight)
????????d.輸出需要如何被計算(例如計算所有事件的總和)
????????e.輸出數(shù)據(jù)需要被發(fā)送到的流(例如OutputStream)
????這個查詢?nèi)缦拢?/p>
? ??@info(name='CargoWeightQuery')
? ??fromCargoStream
? ??selectweight,sum(weight)astotalWeight
? ??insertintoOutputStream;
????Siddhi最終的文件如下:
? ??App:name("CargoWeightApp")
? ??define stream CargoStream(weightint);
? ??@sink(type='log',prefix='LOGGER')
? ??define stream OutputStream(weightint,totalWeight long);
? ??@info(name='CargoWeightQuery')
? ??fromCargoStream
? ??selectweight,sum(weight)astotalWeight
? ??insertintoOutputStream;
????6.保存Diddhi文件,點(diǎn)擊File=>Save。這將打開Save to Workspace對話框,點(diǎn)擊Save來保存這個文件在/wso2/editor/deployment/workspace路徑(這個事默認(rèn)的Siddhi應(yīng)用保存地址)

Step 2:模擬事件
????SP Studio內(nèi)置支持模擬事件。為了測試創(chuàng)建的CargoWeightApp是否按照預(yù)期工作,請按照下面給出的步驟模擬事件。
????1.開始CargoWeight的Sddhi應(yīng)用程序,點(diǎn)擊開始按鈕

????如果應(yīng)用程序成功開始,在SP Studio console中會打出如下日志:

????2.在SP Studio點(diǎn)擊下面的標(biāo)志 來打開事件模擬面板。

????3.在面板的Single Simulation tab中,選擇如下值:

????因此,CargoStream流的Weight屬性顯示如下:

????4.在weight欄中,輸入1000然后點(diǎn)擊Start開始Siddhi程序。然后點(diǎn)擊Send發(fā)送事件。這個事件在CLI中輸出日志如下:

????5.用下面的數(shù)據(jù)發(fā)送5個更多的事件

????事件輸出日志如下:

????每個新增的重量都會添加到重量中。因此,發(fā)送6個事件之后,總重量為10500.
Step 3:編輯Siddhi應(yīng)用程序執(zhí)行時間處理
????本節(jié)演示如何使用Siddhi執(zhí)行時間窗口處理。
????在上一個場景中,通過僅在內(nèi)存中運(yùn)行累加值處理,這個過程中沒有存儲任何時間。
????口處理事一種允許我們在內(nèi)存中存儲在給定時間段或指定數(shù)量的事件,以便我們可以執(zhí)行平均數(shù)、最大值等操作。
????考慮一下Simth有一個額外計算每次裝載貨物最后三個貨箱的平均重量,以平衡船上的重量。在這里,我們正考慮一個由三個事件組成的窗口,如下圖所示:

????為了實(shí)現(xiàn)這個,按照下面步驟編輯應(yīng)用程序
????????a.將名為averageWeight的新屬性添加到OutputStream流定義中,以便除了加載心曠的權(quán)重和總重量,每個輸出事件還顯示平均權(quán)重。
????????define stream OutputStream(weight?int,totalWeight long,averageWeightdouble);
????????b.為了定義如何計算平均值,在select語句中對weight屬性使用Siddhi的avg方法。這表示對傳入事件的weight屬性計算平均值。
? ? ????select?weight,sum(weight)as?totalWeight,avg(weight)?as?averageWeight
????????c.要指定此查詢僅應(yīng)用于收到的最后三個事件,需要長度窗口應(yīng)用于輸入流,如下所示:
? ? ????from?CargoStream#window.length(3)
????注:這個窗口應(yīng)用于所有從CargoStream?stream提取的事件之行的所有計算。因此,添加此窗口還會導(dǎo)致居于最后三個事件計算總weight值。
? ? 完整的查詢語句如下:
????????@info(name='CargoWeightQuery')
????????from?CargoStream#window.length(3)
????????select?weight,?sum(weight)?as?totalWeight,?avg(weight)?as?averageWeight
????????insert?into?OutputStream;
? ???完整的CargoWeight?Siddhi 應(yīng)用程序如下:
????????@App:name("CargoWeight")
????????define stream CargoStream (weight?int);
????????@sink(type='log', prefix='LOGGER')
????????define stream OutputStream(weight?int, totalWeight long, averageWeight?double);
????????@info(name='CargoWeightQuery')
????????from?CargoStream#window.length(3)
????????select?weight,?sum(weight)?as?totalWeight,?avg(weight)?as?averageWeight
????????insert?into?OutputStream;
Step 4:為編輯后的Siddhi應(yīng)用程序模擬輸入
在這步中,開始編輯后的Siddhi應(yīng)用程序然后模擬6個事件。

輸出的日志如下:

下一步?
了解更多Siddhi以及如何對于你的用戶場景更有用,請參看Tutorials?
學(xué)習(xí)如何開發(fā)部署管理Siddhi應(yīng)用,請參看User Guide????????????????????