EMQ X Kuiper支持各種source(MQTT、ZeroMq、EdgeX),默認(rèn)支持MQTT
1、關(guān)于Kuiper的介紹及簡(jiǎn)單使用,請(qǐng)參考:https://docs.emqx.io/kuiper/latest/cn/
2、關(guān)于Kuiper與EdgeX Foundry集成,請(qǐng)參照:http://www.itdecent.cn/p/0726d41b00bf
本文實(shí)踐ZeroMq作為消息總線(xiàn)源(實(shí)際上EdgeX內(nèi)部使用的正是ZeroMq),Kuiper訂閱其消息,并以此測(cè)試Kuiper的性能,kuiper官方給出性能測(cè)試結(jié)果:<u>https://docs.emqx.io/kuiper/latest/cn/</u>
操作系統(tǒng): Ubuntu 18.04
測(cè)試原理描述:?jiǎn)?dòng)一個(gè)go應(yīng)用向ZeroMq發(fā)送消息,同時(shí)Kuiper訂閱來(lái)自ZeroMq的消息,經(jīng)過(guò)Kuiper規(guī)則處理后輸出消息
操作步驟:
1、創(chuàng)建一個(gè)go應(yīng)用,向ZeroMq發(fā)送若干條消息,每條消息類(lèi)似:
{"device":"demo","readings":[{"device":"Temperature device","name":"Temperature","value":"40"},{"device":"Humidity device","name":"Humidity","value":"45"}]},大小大概為157字節(jié)。
go應(yīng)用源碼:https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/benchmark/pub.go
go應(yīng)用與kuiper在一臺(tái)機(jī)器上
2、由于Kuiper對(duì)zmq源的支持是以插件的形式支持的,而且必須與Kuiper的版本相配套,故此處kuiper采用源碼編譯打包,同時(shí)編譯zmq的插件
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作$ git clone https://github.com/emqx/kuiper.git
正克隆到 'kuiper'...
remote: Enumerating objects: 28, done.
remote: Counting objects: 100% (28/28), done.
remote: Compressing objects: 100% (19/19), done.
remote: Total 5326 (delta 10), reused 21 (delta 9), pack-reused 5298
接收對(duì)象中: 100% (5326/5326), 17.02 MiB | 597.00 KiB/s, 完成.
處理 delta 中: 100% (3332/3332), 完成.
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作$ cd kuiper/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ make pkg
Build successfully
make[1]: 進(jìn)入目錄“/home/zj/文檔/項(xiàng)目工作/kuiper”
Package build success
make[1]: 離開(kāi)目錄“/home/zj/文檔/項(xiàng)目工作/kuiper”
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ ll -h _packages/
總用量 18M
drwxr-xr-x 2 zj zj 4.0K 6月 8 10:51 ./
drwxr-xr-x 15 zj zj 4.0K 6月 8 10:51 ../
-rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.tar.gz
-rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.zip
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ ll -h plugins/sources
總用量 5.0M
drwxr-xr-x 2 zj zj 4.0K 6月 8 10:54 ./
drwxr-xr-x 6 zj zj 4.0K 6月 8 10:50 ../
-rw-r--r-- 1 zj zj 1.5K 6月 8 10:50 random.go
-rw-r--r-- 1 zj zj 2.1K 6月 8 10:50 zmq.go
-rw-r--r-- 1 zj zj 5.0M 6月 8 10:54 Zmq.so
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ cd _packages/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages$ tar zxf kuiper-0.4.1-linux-x86_64.tar.gz
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages$ cd kuiper-0.4.1-linux-x86_64/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cp ../../plugins/sources/Zmq.so plugins/sources/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ll plugins/sources/
總用量 5080
drwxr-xr-x 2 zj zj 4096 6月 8 10:57 ./
drwxr-xr-x 5 zj zj 4096 6月 8 10:51 ../
-rw-r--r-- 1 zj zj 5191448 6月 8 10:57 Zmq.so
3、修改source配置
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat etc/sources/zmq.yaml
#Global Zmq configurations
default:
server: tcp://10.0.105.143:5563
topic: events
10.0.105.143是我的go應(yīng)用所在的主機(jī)IP
4、啟動(dòng)Kuiper server進(jìn)程
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ./bin/server
Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081.
5、設(shè)置Kuiper訂閱的數(shù)據(jù)流及過(guò)濾規(guī)則
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create stream demo '() WITH (FORMAT="JSON", TYPE="zmq", DATASOURCE="events")'
Connecting to 127.0.0.1:20498...
Stream demo is created.
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ vi rule.txt
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat rule.txt
{
"sql": "SELECT * from demo",
"actions": [
{
"log": {
"concurrency": 50,
"bufferLength": 10240,
"cacheLength": 102400,
"runAsync": true
}
}
],
"options": {
"concurrency": 30,
"bufferLength": 10240
}
}
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498...
Creating a new rule from file rule.txt.
Rule rule1 was created successfully, please use 'bin/cli getstatus rule rule1' command to get rule status.
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat log/stream.log
time="2020-06-08T11:00:26+08:00" level=info msg="db location is /home/zj/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64/data/" file="server.go:31"
time="2020-06-08T11:00:26+08:00" level=info msg="Starting rules" file="server.go:48"
time="2020-06-08T11:00:26+08:00" level=info msg="Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
time="2020-06-08T11:02:36+08:00" level=info msg="Stream demo is created." file="xsql_processor.go:75"
time="2020-06-08T11:05:52+08:00" level=info msg="Rule rule1 is created." file="xsql_processor.go:226"
time="2020-06-08T11:05:52+08:00" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 30, bufferLength: 10240" file="xsql_processor.go:399"
time="2020-06-08T11:05:52+08:00" level=info msg="Opening stream" file="streams.go:89" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node demo with option map[DATASOURCE:events FORMAT:JSON TYPE:zmq]" file="source_node.go:59" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node 1 instances" file="source_node.go:78" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open sink node 50 instances" file="sink_node.go:143" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Start source demo instance 0 successfully" file="source_node.go:115" rule=rule1
Kuiper規(guī)則配置如下:
{
"sql": "SELECT * from demo",
"actions": [
{
"log": {
"concurrency": 50,
"bufferLength": 10240,
"cacheLength": 102400,
"runAsync": true
}
}
],
"options": {
"concurrency": 30,
"bufferLength": 10240
}
}
以上參數(shù)含義:https://github.com/emqx/kuiper/blob/master/docs/zh_CN/rules/overview.md
6、go應(yīng)用發(fā)送消息耗時(shí):0.344352s

同時(shí)kuiper側(cè)訂閱zeroMq主題為events的消息,過(guò)濾了所有消息

說(shuō)明,kuiper在0.344352s處理了9676條數(shù)據(jù)
經(jīng)測(cè)試:

注:測(cè)試發(fā)現(xiàn)當(dāng)數(shù)據(jù)量大時(shí),明顯丟包嚴(yán)重