流式數(shù)據(jù)庫PipelineDB(集成Kafka)

1. 前言

1.1 PipelineDB 介紹

偶然發(fā)現(xiàn)了個流式數(shù)據(jù)庫PipelineDB,它是基于PostgreSQL數(shù)據(jù)庫改造的,允許我們通過sql的方式,對數(shù)據(jù)流做操作,并把操作結(jié)果儲存起來。

這年頭,真是SQL on everything。

其基本的過程是:

  • 創(chuàng)建PipelineDB Stream。
  • 編寫SQL,對Stream做操作。
  • 操作結(jié)果被保存到 continuous view,其背后是物理表在支撐。

1.2 安裝PipelineDB

我們的安裝是在Centos 7上面進(jìn)行的。 PipelineDB不讓用root權(quán)限的用戶操作,請?zhí)崆皠?chuàng)建用戶。

#下載
wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
# 安裝
rmp -ivh ----prefix=/opt/pipelinedb
# 初始化 pipeline-init -D <data directory>
pipeline-init -D /opt/pipelinedb/dbdata
pipelinedb -D /opt/pipelinedb/dbdata
# 激活 continuous query(僅需執(zhí)行一次,后續(xù)重啟不用再做)
psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"

2. Quick Start例子

本例是關(guān)于 Wikipedia頁面訪問數(shù)據(jù)的統(tǒng)計。每一條訪問記錄,包括以下字段,以英文逗號分割。

hour project page title view count bytes served

2.1 創(chuàng)建continuous視圖

首先,我們創(chuàng)建一個continuous view,使用psql工具。從sql里,我們能夠看到統(tǒng)計方法和訪問記錄的對應(yīng)關(guān)系。

psql -h localhost -p 5432 -d pipeline -c "
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
        count(*) AS total_pages,
        sum(view_count) AS total_views,
        min(view_count) AS min_views,
        max(view_count) AS max_views,
        avg(view_count) AS avg_views,
        percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
        sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;"

2.2 創(chuàng)建Stream

我們通過curl工具,獲取wiki的數(shù)據(jù)集,并壓縮數(shù)據(jù),作為一個Stream寫入到stdin。因?yàn)閿?shù)據(jù)集比較大,當(dāng)我們執(zhí)行了幾秒鐘之后,可以使用ctrl+c中斷curl操作。

curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

2.3 查看結(jié)果

通過下面的命令,從視圖(continuous view)讀取streaming的統(tǒng)計計算結(jié)果。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";

3. PipelineDB和kafka的集成

3.1 pipeline_kafka組件安裝

PipelineDB默認(rèn)是沒有pipeline_kafka擴(kuò)展組件的,需要我們自己安裝。安裝需要git,如果沒有g(shù)it,請使用yum -y install git 安裝git。

1.安裝librdkafka

pipeline_kafka依賴librdkafka,需要先安裝librdkafka。

git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install

2.安裝pipeline_kafka

編譯安裝pipeline_kafk。如果有編譯依賴的缺失,請根據(jù)缺失補(bǔ)充安裝依賴。

./configure
make
make install

配置pipeline_kafka

# 編輯配置文件
vi /opt/pipelinedb/dbdata/pipelinedb.conf
# 在結(jié)尾輸入以下內(nèi)容并保存(:wq)
# Add settings for extensions here
shared_preload_libraries = 'pipeline_kafka'

重啟數(shù)據(jù)庫,使得擴(kuò)展組件生效

# pipeline-ctl -D <data directory> start|stop|restart
pipeline-ctl -D /opt/pipelinedb/dbdata restart

3.2 Stream SQL開發(fā)過程

# 連接數(shù)據(jù)庫
psql -h localhost -p 5432 -d pipeline
# 創(chuàng)建pipeline_kafka
CREATE EXTENSION pipeline_kafka;
# 配置kafka broker
SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
# 創(chuàng)建Stream,從kafka里接受三個參數(shù)
CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
# 創(chuàng)建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
# 開始消費(fèi)kafka消息
# topic是my-topic,連接PipelineDB Stream名是msg_stream,消息類型是text,消息以英文逗號分割。
SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text', 
            delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
            maxbytes := 32000000, parallelism := 1, start_offset := NULL );


# 如果要停止Stream,請使用以下命令。
SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');

3.3 驗(yàn)證

1.向kafka發(fā)送消息

登錄kafka節(jié)點(diǎn)的服務(wù)器,進(jìn)入到kafka home路徑,使用以下命令發(fā)送消息

# 啟動producer
bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
# 輸入以下數(shù)據(jù)
a,b,c

2.在PipelineDB中查詢收到的消息

從CONTINUOUS VIEW中 查詢數(shù)據(jù),可以看到有一條記錄,即[a,b,c]。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM msg_result";

ps: 當(dāng)我們連接到PipelineDB,我們可以使用PostgreSQL的命令,來查看有那些數(shù)據(jù)庫對象生成。例如通過 \d 可以查看到,當(dāng)我們創(chuàng)建CONTINUOUS VIEW的時候,額外創(chuàng)建了msg_result_mrel、msg_result_seq和msg_result_osrel,實(shí)際的數(shù)據(jù)就存儲在msg_result_mrel中。

Schema Name Type Owner
public msg_result continuous view pipelinedb
public msg_result_mrel table pipelinedb
public msg_result_osrel stream pipelinedb
public msg_result_seq sequence pipelinedb
public msg_stream stream pipelinedb

http://docs.pipelinedb.com/quickstart.html

https://github.com/pipelinedb/pipeline_kafka

(完)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容