利用MongoShake進行Mongodb數據的實時變更處理

最近在做數據倉庫相關的項目,需要對各業(yè)務系統(tǒng)異構數據進行統(tǒng)一歸集,不同業(yè)務系統(tǒng)以及相同異構系統(tǒng)不同功能模塊用到的數據存儲方式不盡相同,其中有一些功能模塊采用的數據存儲方式為Mongodb,為了能夠實時對Mongodb發(fā)生變更的數據進行歸集,通過調研發(fā)現(xiàn)阿里的開源系統(tǒng)MongoShake可以實現(xiàn)該需求,整個環(huán)境通過docker搭建,本文將整個搭建過程進行記錄,方便有需求的朋友使用。

處理流程

avatar

環(huán)境準備

環(huán)境 服務器 說明
宿主機 172.21.48.1
Zookeeper 172.17.0.2 Kafka的Broker由Zookeeper進行管理,所以需先安裝Zookeeper環(huán)境
Kafka 172.17.0.3
Kafka-Manager 172.17.0.4 Kafka的web端管理界面
Mongodb 172.21.48.1 windows宿主機進行搭建,大家也可通過docker進行搭建
MongoShake 172.17.0.5 使用Centos進行Mongoshake的安裝

安裝步驟

為了方便測試,本文中docker網絡采用默認網絡,宿主機可以和容器進行通信,大家在實際環(huán)境中可自定義網絡進行網絡隔離,保證系統(tǒng)安全。

安裝Zookeeper

docker run -d --name zookeeper-mongo -p 2181:2181 wurstmeister/zookeeper

安裝完成后在宿主機可通過telnet 172.21.48.1 2181進行測試

安裝Kafka

docker run -d --name kafka-mongo -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.21.48.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.48.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
參數說明
參數 說明
KAFKA_BROKER_ID 每個Kafka都有一個Broker_Id來進行區(qū)分
KAFKA_ZOOKEEPER_CONNECT Zookeeper管理Kafka的路徑
KAFKA_ADVERTISED_LISTENERS 把Kafka地址注冊給Zookeeper
KAFKA_LISTENERS Kafka的監(jiān)聽端口配置

安裝完成后在宿主機可通過telnet 172.21.48.1 9092進行測試

安裝Kafka-Manager

docker run -d --name kafka-manager -p 9000:9000 -e ZK_HOSTS=172.21.48.1:2181 -e KAFKA_BROKERS=172.21.48.1:9092 sheepkiller/kafka-manager
參數 說明
ZK_HOSTS Zookeeper地址,如果是集群配置集群地址
KAFKA_BROKERS Kafka地址,如果是集群配置集群地址

安裝完成后在宿主機可通過訪問http://localhost:9000并添加Cluster進行測試,成功界面如下圖:

avatar

安裝Mongodb

Mongodb安裝在宿主機上,可參考windows安裝mongodb,安裝成功后請設置管理員賬戶名密碼。

<font color=red >注意:Mongodb的綁定地址設置為0.0.0.0</font>

MongoShake運行依賴與Mongodb的oplog,oplog相當于mysql的binlog,極大方便了Mongodb副本集各節(jié)點的數據同步。本文為單機版Mongodb,需手動開啟oplog,開啟方式如下:

添加副本集配置

修改mongod.cfg文件,增加如下配置后重啟服務:

replication:
  oplogSizeMB: 50
  replSetName: rs
# 初始化副本集
use admin
# 權限驗證
db.auth("賬戶名","密碼")
rs.initiate({ _id: "rs", members: [{_id:0,host:"127.0.0.1:27017"}]})

登錄Mongodb數據庫可以查看到oplog.rs的Collection。

安裝MongoShake

安裝Centos容器
docker run -itd --name centos-mongo centos:7 /bin/bash
# 進入容器cli界面安裝wget(下載MongoShake用)
yum install wget -y
下載MongoShake
# 切換到/home目錄
cd /home
# 下載文件
wget https://github.com/alibaba/MongoShake/releases/download/release-v2.6.4-20210414/mongo-shake-v2.6.4_2.tar.gz
# 解壓文件
tar -xvzf mongo-shake-v2.6.4_2.tar.gz
# 重命名文件夾
mv mongo-shake-v2.6.4 mongo-shake
配置MongoShake
# 切換到mongo-shake目錄
cd mongo-shake
# 修改collector.conf文件
vi collector.conf

配置參數大部分默認就好,修改項如下:

mongo_urls = mongodb://username:password@172.21.48.1:27017
tunnel = kafka
tunnel.address = test@172.21.48.1:9092 #test為kafka的topic
tunnel.message = json
啟動MongoShake
./collector.linux -conf=collector.conf

如果沒報錯,那么正常是啟動成功了

驗證

進入mongodb,在數據庫中刪除一條數據,然后打開Kafka-Manager的管理頁面,看是否有名為test的topic,有的話說明啟動成功。

從Kafka消費數據

以python為例,消費代碼如下:

import sys
from pykafka import KafkaClient
import logging
logging.basicConfig(stream=sys.stdout, level=logging.NOTSET)

logging.info('程序啟動...........')
client = KafkaClient(zookeeper_hosts="192.168.3.20:2181")
logging.info(client.topics)
topic = client.topics['test']
consumer = topic.get_simple_consumer(consumer_group="test",
                                     reset_offset_on_start=True)

for message in consumer:
    if message is not None:
        logging.info(message.value)

啟動程序,然后對MongoDB的數據庫進行數據變更,如果順利,控制臺應該會打印出變更的數據內容,如下圖:


avatar

至此,整個環(huán)境就搭建成功了,希望對大家有幫助,謝謝!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容