利用maxwell 組件實(shí)時(shí)監(jiān)聽Mysql的Binlog日志,并且把解析的json格式數(shù)據(jù)發(fā)送到kafka窗口供實(shí)時(shí)消費(fèi)

一:在linux環(huán)境下安裝部署好mysql


1 :開啟binlog

? ? ?sudo vi /etc/my.cnf (Mysql的配置文件)

2: mysql的binlog格式有3種,為了把binlog解析成json數(shù)據(jù)格式,要設(shè)置binlog的格式為row(binlog有三種格式:Statement、Row以及Mixed)

server-id=1

log-bin=/data/mysql/log/binlog(這一步開啟binlog)

binlog_format=row

具體如下圖:


3:重啟msyql服務(wù)

sudo service mysqld restart

4:查看是否已經(jīng)開啟binlog

Mysql>show variables like '%log_bin%';


此時(shí)在/data/mysql/log/binlog目錄下可以看到生成了相應(yīng)的binlog監(jiān)聽日志文件,如圖,binlog.000004文件,每次重啟msyql服務(wù),就會(huì)生成一個(gè)新的監(jiān)聽文件,這里日志文件名稱跟步驟2中配置的log-bin=/data/mysql/log/binlog有關(guān),如果log-bin=master配置這樣,那么日志文件名稱就是master.000004。


二:配置Maxwell相關(guān)的部署工作


1:下載Maxwell

官網(wǎng):http://maxwells-daemon.io/

組件下載鏈接

https://github.com/zendesk/maxwell/releases/download/v1.11.0/maxwell-1.11.0.tar.gz

2:上傳maxwell-1.11.0.tar.gz

? ?通過winscp等FTP工具把maxwell-1.11.0.tar.gz到/usr/local/maxwell/ 目錄下

3:解壓maxwell-1.11.0.tar.gz

? ? tar -zxvf maxwell-1.11.0.tar.gz

4:創(chuàng)建maxwell使用的mysql用戶并賦權(quán)

mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';

mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';

以上圖片為官網(wǎng)參考

以我自己的為例:

GRANT ALL on *.* to 'user01'@'%' identified by 'test123';

把所有數(shù)據(jù)庫的所有表授權(quán)給user01用戶以密碼test123登錄

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';

flush privileges;

(mysql 新設(shè)置用戶或更改密碼后需用flush privileges刷新MySQL的系統(tǒng)權(quán)限相關(guān)表,否則會(huì)出現(xiàn)拒絕訪問,還有一種方法,就是重新啟動(dòng)mysql服務(wù)器,來使新設(shè)置生效。-)

4::開啟maxwell命令行(注意,如果沒有設(shè)置,maxwell默認(rèn)是把監(jiān)聽的mysql的binlog日志發(fā)送到kafka的主題叫maxwell的topic上的)

bin/maxwell --user='maxwell' --password='xxxx' --host='數(shù)據(jù)庫IP地址' --producer=kafka --kafka.bootstrap.servers=bi-master:9092

解釋:數(shù)據(jù)庫IP地址參數(shù)是安裝mysql的那臺(tái)主機(jī),最后的kafka.bootstrap.servers是安裝kafka集群的節(jié)點(diǎn)主機(jī)名(最好不要用IP地址)和端口號(hào)。

三:kafka相關(guān)配置


說明(以下我的kafka是安裝在主機(jī)名叫bi-master,注意kafka里的配置文件端口號(hào)要和命令行里給的端口號(hào)一致)

1:首先啟動(dòng)zookeeper

$sbin/zkServer.sh start

2:開啟kafka命令行

bin/kafka-server-start.shconfig/server.properties

3:在kafka中創(chuàng)建一個(gè)主題叫maxwell以便于接受數(shù)據(jù)

bin/kafka-topics.sh--create --zookeeper bi-master:2181 --replication-factor 1 --partitions 1 --topic maxwell

4:?jiǎn)?dòng)kafka消息生產(chǎn)者窗口

bin/kafka-console-producer.sh --broker-list bi-master:9092 --topic maxwell

5:?jiǎn)?dòng)kafka消息消費(fèi)者窗口

bin/kafka-console-consumer.sh --zookeeper bi-master:2181 --topic maxwell --from-beginning

四:測(cè)試



1:在Mysql客戶工具 中添加一個(gè)數(shù)據(jù)


測(cè)試的表結(jié)構(gòu),表名稱:myTest


消費(fèi)窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"insert","ts":1515494531,"xid":7693,"commit":true,"data":{"name":"小梅","sex":"女","age":18,"address":"深圳市南山區(qū)海岸城"}}


2:修改一條數(shù)據(jù):

消費(fèi)窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"update","ts":1515494707,"xid":7756,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田區(qū)"},"old":{"sex":"女","address":"深圳市南山區(qū)海岸城"}}

3:刪除一條數(shù)據(jù)


消費(fèi)窗口接收到的消息:

{"database":"binlogTest","table":"myTest","type":"delete","ts":1515494807,"xid":7799,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田區(qū)"}}


五:通過java程序去消費(fèi)kafka消息數(shù)據(jù)

1:jar包添加

2:代碼實(shí)現(xiàn)

第一種實(shí)現(xiàn):


第二種實(shí)現(xiàn):


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容