1. 前言
1.1 PipelineDB 介紹
偶然發(fā)現(xiàn)了個流式數(shù)據(jù)庫PipelineDB,它是基于PostgreSQL數(shù)據(jù)庫改造的,允許我們通過sql的方式,對數(shù)據(jù)流做操作,并把操作結(jié)果儲存起來。
這年頭,真是SQL on everything。
其基本的過程是:
- 創(chuàng)建PipelineDB Stream。
- 編寫SQL,對Stream做操作。
- 操作結(jié)果被保存到 continuous view,其背后是物理表在支撐。
1.2 安裝PipelineDB
我們的安裝是在Centos 7上面進(jìn)行的。 PipelineDB不讓用root權(quán)限的用戶操作,請?zhí)崆皠?chuàng)建用戶。
#下載
wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
# 安裝
rmp -ivh ----prefix=/opt/pipelinedb
# 初始化 pipeline-init -D <data directory>
pipeline-init -D /opt/pipelinedb/dbdata
pipelinedb -D /opt/pipelinedb/dbdata
# 激活 continuous query(僅需執(zhí)行一次,后續(xù)重啟不用再做)
psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"
2. Quick Start例子
本例是關(guān)于 Wikipedia頁面訪問數(shù)據(jù)的統(tǒng)計。每一條訪問記錄,包括以下字段,以英文逗號分割。
| hour | project | page title | view count | bytes served |
|---|
2.1 創(chuàng)建continuous視圖
首先,我們創(chuàng)建一個continuous view,使用psql工具。從sql里,我們能夠看到統(tǒng)計方法和訪問記錄的對應(yīng)關(guān)系。
psql -h localhost -p 5432 -d pipeline -c "
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
count(*) AS total_pages,
sum(view_count) AS total_views,
min(view_count) AS min_views,
max(view_count) AS max_views,
avg(view_count) AS avg_views,
percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;"
2.2 創(chuàng)建Stream
我們通過curl工具,獲取wiki的數(shù)據(jù)集,并壓縮數(shù)據(jù),作為一個Stream寫入到stdin。因?yàn)閿?shù)據(jù)集比較大,當(dāng)我們執(zhí)行了幾秒鐘之后,可以使用ctrl+c中斷curl操作。
curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
psql -h localhost -p 5432 -d pipeline -c "
COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
2.3 查看結(jié)果
通過下面的命令,從視圖(continuous view)讀取streaming的統(tǒng)計計算結(jié)果。
psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";
3. PipelineDB和kafka的集成
3.1 pipeline_kafka組件安裝
PipelineDB默認(rèn)是沒有pipeline_kafka擴(kuò)展組件的,需要我們自己安裝。安裝需要git,如果沒有g(shù)it,請使用yum -y install git 安裝git。
1.安裝librdkafka
pipeline_kafka依賴librdkafka,需要先安裝librdkafka。
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install
2.安裝pipeline_kafka
編譯安裝pipeline_kafk。如果有編譯依賴的缺失,請根據(jù)缺失補(bǔ)充安裝依賴。
./configure
make
make install
配置pipeline_kafka
# 編輯配置文件
vi /opt/pipelinedb/dbdata/pipelinedb.conf
# 在結(jié)尾輸入以下內(nèi)容并保存(:wq)
# Add settings for extensions here
shared_preload_libraries = 'pipeline_kafka'
重啟數(shù)據(jù)庫,使得擴(kuò)展組件生效
# pipeline-ctl -D <data directory> start|stop|restart
pipeline-ctl -D /opt/pipelinedb/dbdata restart
3.2 Stream SQL開發(fā)過程
# 連接數(shù)據(jù)庫
psql -h localhost -p 5432 -d pipeline
# 創(chuàng)建pipeline_kafka
CREATE EXTENSION pipeline_kafka;
# 配置kafka broker
SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
# 創(chuàng)建Stream,從kafka里接受三個參數(shù)
CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
# 創(chuàng)建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
# 開始消費(fèi)kafka消息
# topic是my-topic,連接PipelineDB Stream名是msg_stream,消息類型是text,消息以英文逗號分割。
SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text',
delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
maxbytes := 32000000, parallelism := 1, start_offset := NULL );
# 如果要停止Stream,請使用以下命令。
SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');
3.3 驗(yàn)證
1.向kafka發(fā)送消息
登錄kafka節(jié)點(diǎn)的服務(wù)器,進(jìn)入到kafka home路徑,使用以下命令發(fā)送消息
# 啟動producer
bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
# 輸入以下數(shù)據(jù)
a,b,c
2.在PipelineDB中查詢收到的消息
從CONTINUOUS VIEW中 查詢數(shù)據(jù),可以看到有一條記錄,即[a,b,c]。
psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM msg_result";
ps: 當(dāng)我們連接到PipelineDB,我們可以使用PostgreSQL的命令,來查看有那些數(shù)據(jù)庫對象生成。例如通過 \d 可以查看到,當(dāng)我們創(chuàng)建CONTINUOUS VIEW的時候,額外創(chuàng)建了msg_result_mrel、msg_result_seq和msg_result_osrel,實(shí)際的數(shù)據(jù)就存儲在msg_result_mrel中。
| Schema | Name | Type | Owner |
|---|---|---|---|
| public | msg_result | continuous view | pipelinedb |
| public | msg_result_mrel | table | pipelinedb |
| public | msg_result_osrel | stream | pipelinedb |
| public | msg_result_seq | sequence | pipelinedb |
| public | msg_stream | stream | pipelinedb |
(完)