最近在做數據倉庫相關的項目,需要對各業(yè)務系統(tǒng)異構數據進行統(tǒng)一歸集,不同業(yè)務系統(tǒng)以及相同異構系統(tǒng)不同功能模塊用到的數據存儲方式不盡相同,其中有一些功能模塊采用的數據存儲方式為Mongodb,為了能夠實時對Mongodb發(fā)生變更的數據進行歸集,通過調研發(fā)現(xiàn)阿里的開源系統(tǒng)MongoShake可以實現(xiàn)該需求,整個環(huán)境通過docker搭建,本文將整個搭建過程進行記錄,方便有需求的朋友使用。
處理流程

環(huán)境準備
| 環(huán)境 | 服務器 | 說明 |
|---|---|---|
| 宿主機 | 172.21.48.1 | |
| Zookeeper | 172.17.0.2 | Kafka的Broker由Zookeeper進行管理,所以需先安裝Zookeeper環(huán)境 |
| Kafka | 172.17.0.3 | |
| Kafka-Manager | 172.17.0.4 | Kafka的web端管理界面 |
| Mongodb | 172.21.48.1 | windows宿主機進行搭建,大家也可通過docker進行搭建 |
| MongoShake | 172.17.0.5 | 使用Centos進行Mongoshake的安裝 |
安裝步驟
為了方便測試,本文中docker網絡采用默認網絡,宿主機可以和容器進行通信,大家在實際環(huán)境中可自定義網絡進行網絡隔離,保證系統(tǒng)安全。
安裝Zookeeper
docker run -d --name zookeeper-mongo -p 2181:2181 wurstmeister/zookeeper
安裝完成后在宿主機可通過telnet 172.21.48.1 2181進行測試
安裝Kafka
docker run -d --name kafka-mongo -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.21.48.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.48.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
參數說明
| 參數 | 說明 |
|---|---|
| KAFKA_BROKER_ID | 每個Kafka都有一個Broker_Id來進行區(qū)分 |
| KAFKA_ZOOKEEPER_CONNECT | Zookeeper管理Kafka的路徑 |
| KAFKA_ADVERTISED_LISTENERS | 把Kafka地址注冊給Zookeeper |
| KAFKA_LISTENERS | Kafka的監(jiān)聽端口配置 |
安裝完成后在宿主機可通過telnet 172.21.48.1 9092進行測試
安裝Kafka-Manager
docker run -d --name kafka-manager -p 9000:9000 -e ZK_HOSTS=172.21.48.1:2181 -e KAFKA_BROKERS=172.21.48.1:9092 sheepkiller/kafka-manager
| 參數 | 說明 |
|---|---|
| ZK_HOSTS | Zookeeper地址,如果是集群配置集群地址 |
| KAFKA_BROKERS | Kafka地址,如果是集群配置集群地址 |
安裝完成后在宿主機可通過訪問http://localhost:9000并添加Cluster進行測試,成功界面如下圖:

安裝Mongodb
Mongodb安裝在宿主機上,可參考windows安裝mongodb,安裝成功后請設置管理員賬戶名密碼。
<font color=red >注意:Mongodb的綁定地址設置為0.0.0.0</font>
MongoShake運行依賴與Mongodb的oplog,oplog相當于mysql的binlog,極大方便了Mongodb副本集各節(jié)點的數據同步。本文為單機版Mongodb,需手動開啟oplog,開啟方式如下:
添加副本集配置
修改mongod.cfg文件,增加如下配置后重啟服務:
replication:
oplogSizeMB: 50
replSetName: rs
# 初始化副本集
use admin
# 權限驗證
db.auth("賬戶名","密碼")
rs.initiate({ _id: "rs", members: [{_id:0,host:"127.0.0.1:27017"}]})
登錄Mongodb數據庫可以查看到oplog.rs的Collection。
安裝MongoShake
安裝Centos容器
docker run -itd --name centos-mongo centos:7 /bin/bash
# 進入容器cli界面安裝wget(下載MongoShake用)
yum install wget -y
下載MongoShake
# 切換到/home目錄
cd /home
# 下載文件
wget https://github.com/alibaba/MongoShake/releases/download/release-v2.6.4-20210414/mongo-shake-v2.6.4_2.tar.gz
# 解壓文件
tar -xvzf mongo-shake-v2.6.4_2.tar.gz
# 重命名文件夾
mv mongo-shake-v2.6.4 mongo-shake
配置MongoShake
# 切換到mongo-shake目錄
cd mongo-shake
# 修改collector.conf文件
vi collector.conf
配置參數大部分默認就好,修改項如下:
mongo_urls = mongodb://username:password@172.21.48.1:27017
tunnel = kafka
tunnel.address = test@172.21.48.1:9092 #test為kafka的topic
tunnel.message = json
啟動MongoShake
./collector.linux -conf=collector.conf
如果沒報錯,那么正常是啟動成功了
驗證
進入mongodb,在數據庫中刪除一條數據,然后打開Kafka-Manager的管理頁面,看是否有名為test的topic,有的話說明啟動成功。
從Kafka消費數據
以python為例,消費代碼如下:
import sys
from pykafka import KafkaClient
import logging
logging.basicConfig(stream=sys.stdout, level=logging.NOTSET)
logging.info('程序啟動...........')
client = KafkaClient(zookeeper_hosts="192.168.3.20:2181")
logging.info(client.topics)
topic = client.topics['test']
consumer = topic.get_simple_consumer(consumer_group="test",
reset_offset_on_start=True)
for message in consumer:
if message is not None:
logging.info(message.value)
啟動程序,然后對MongoDB的數據庫進行數據變更,如果順利,控制臺應該會打印出變更的數據內容,如下圖:

至此,整個環(huán)境就搭建成功了,希望對大家有幫助,謝謝!