使用ZeroMq源測(cè)試Kuiper吞吐量

EMQ X Kuiper支持各種source(MQTT、ZeroMq、EdgeX),默認(rèn)支持MQTT
1、關(guān)于Kuiper的介紹及簡(jiǎn)單使用,請(qǐng)參考:https://docs.emqx.io/kuiper/latest/cn/
2、關(guān)于Kuiper與EdgeX Foundry集成,請(qǐng)參照:http://www.itdecent.cn/p/0726d41b00bf

本文實(shí)踐ZeroMq作為消息總線(xiàn)源(實(shí)際上EdgeX內(nèi)部使用的正是ZeroMq),Kuiper訂閱其消息,并以此測(cè)試Kuiper的性能,kuiper官方給出性能測(cè)試結(jié)果:<u>https://docs.emqx.io/kuiper/latest/cn/</u>

操作系統(tǒng): Ubuntu 18.04
測(cè)試原理描述:?jiǎn)?dòng)一個(gè)go應(yīng)用向ZeroMq發(fā)送消息,同時(shí)Kuiper訂閱來(lái)自ZeroMq的消息,經(jīng)過(guò)Kuiper規(guī)則處理后輸出消息
操作步驟
1、創(chuàng)建一個(gè)go應(yīng)用,向ZeroMq發(fā)送若干條消息,每條消息類(lèi)似:
{"device":"demo","readings":[{"device":"Temperature device","name":"Temperature","value":"40"},{"device":"Humidity device","name":"Humidity","value":"45"}]},大小大概為157字節(jié)。

go應(yīng)用源碼:https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/benchmark/pub.go
go應(yīng)用與kuiper在一臺(tái)機(jī)器上
2、由于Kuiper對(duì)zmq源的支持是以插件的形式支持的,而且必須與Kuiper的版本相配套,故此處kuiper采用源碼編譯打包,同時(shí)編譯zmq的插件

zj@zj-Z390-UD:~/文檔/項(xiàng)目工作$ git clone https://github.com/emqx/kuiper.git
正克隆到 'kuiper'...
remote: Enumerating objects: 28, done.
remote: Counting objects: 100% (28/28), done.
remote: Compressing objects: 100% (19/19), done.
remote: Total 5326 (delta 10), reused 21 (delta 9), pack-reused 5298
接收對(duì)象中: 100% (5326/5326), 17.02 MiB | 597.00 KiB/s, 完成.
處理 delta 中: 100% (3332/3332), 完成.
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作$ cd kuiper/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ make pkg
Build successfully
make[1]: 進(jìn)入目錄“/home/zj/文檔/項(xiàng)目工作/kuiper”
Package build success
make[1]: 離開(kāi)目錄“/home/zj/文檔/項(xiàng)目工作/kuiper”
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ ll -h _packages/
總用量 18M
drwxr-xr-x  2 zj zj 4.0K 6月   8 10:51 ./
drwxr-xr-x 15 zj zj 4.0K 6月   8 10:51 ../
-rw-r--r--  1 zj zj 8.8M 6月   8 10:51 kuiper-0.4.1-linux-x86_64.tar.gz
-rw-r--r--  1 zj zj 8.8M 6月   8 10:51 kuiper-0.4.1-linux-x86_64.zip
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ ll -h  plugins/sources
總用量 5.0M
drwxr-xr-x 2 zj zj 4.0K 6月   8 10:54 ./
drwxr-xr-x 6 zj zj 4.0K 6月   8 10:50 ../
-rw-r--r-- 1 zj zj 1.5K 6月   8 10:50 random.go
-rw-r--r-- 1 zj zj 2.1K 6月   8 10:50 zmq.go
-rw-r--r-- 1 zj zj 5.0M 6月   8 10:54 Zmq.so
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper$ cd _packages/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages$ tar zxf kuiper-0.4.1-linux-x86_64.tar.gz 
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages$ cd kuiper-0.4.1-linux-x86_64/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cp ../../plugins/sources/Zmq.so plugins/sources/
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ll plugins/sources/
總用量 5080
drwxr-xr-x 2 zj zj    4096 6月   8 10:57 ./
drwxr-xr-x 5 zj zj    4096 6月   8 10:51 ../
-rw-r--r-- 1 zj zj 5191448 6月   8 10:57 Zmq.so

3、修改source配置

zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat etc/sources/zmq.yaml 
#Global Zmq configurations
default:
  server: tcp://10.0.105.143:5563
  topic: events

10.0.105.143是我的go應(yīng)用所在的主機(jī)IP

4、啟動(dòng)Kuiper server進(jìn)程

zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ./bin/server
Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081.

5、設(shè)置Kuiper訂閱的數(shù)據(jù)流及過(guò)濾規(guī)則

zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create stream demo '() WITH (FORMAT="JSON", TYPE="zmq", DATASOURCE="events")'
Connecting to 127.0.0.1:20498... 
Stream demo is created.

zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ vi rule.txt
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat rule.txt
{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {
          "concurrency": 50,
          "bufferLength": 10240,
          "cacheLength": 102400,
          "runAsync": true
       }
    }
  ],
  "options": {
    "concurrency": 30,
    "bufferLength": 10240
  }
}
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498... 
Creating a new rule from file rule.txt.
Rule rule1 was created successfully, please use 'bin/cli getstatus rule rule1' command to get rule status.
zj@zj-Z390-UD:~/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat log/stream.log 
time="2020-06-08T11:00:26+08:00" level=info msg="db location is /home/zj/文檔/項(xiàng)目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64/data/" file="server.go:31"
time="2020-06-08T11:00:26+08:00" level=info msg="Starting rules" file="server.go:48"
time="2020-06-08T11:00:26+08:00" level=info msg="Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
time="2020-06-08T11:02:36+08:00" level=info msg="Stream demo is created." file="xsql_processor.go:75"
time="2020-06-08T11:05:52+08:00" level=info msg="Rule rule1 is created." file="xsql_processor.go:226"
time="2020-06-08T11:05:52+08:00" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 30, bufferLength: 10240" file="xsql_processor.go:399"
time="2020-06-08T11:05:52+08:00" level=info msg="Opening stream" file="streams.go:89" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node demo with option map[DATASOURCE:events FORMAT:JSON TYPE:zmq]" file="source_node.go:59" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node 1 instances" file="source_node.go:78" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open sink node 50 instances" file="sink_node.go:143" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Start source demo instance 0 successfully" file="source_node.go:115" rule=rule1

Kuiper規(guī)則配置如下:

{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {
          "concurrency": 50,
          "bufferLength": 10240,
          "cacheLength": 102400,
          "runAsync": true
       }
    }
  ],
  "options": {
    "concurrency": 30,
    "bufferLength": 10240
  }
}

以上參數(shù)含義:https://github.com/emqx/kuiper/blob/master/docs/zh_CN/rules/overview.md

6、go應(yīng)用發(fā)送消息耗時(shí):0.344352s


image.png

同時(shí)kuiper側(cè)訂閱zeroMq主題為events的消息,過(guò)濾了所有消息


image.png

說(shuō)明,kuiper在0.344352s處理了9676條數(shù)據(jù)

經(jīng)測(cè)試:


image.png

注:測(cè)試發(fā)現(xiàn)當(dāng)數(shù)據(jù)量大時(shí),明顯丟包嚴(yán)重

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容