一:在linux環(huán)境下安裝部署好mysql
1 :開啟binlog
? ? ?sudo vi /etc/my.cnf (Mysql的配置文件)
2: mysql的binlog格式有3種,為了把binlog解析成json數(shù)據(jù)格式,要設(shè)置binlog的格式為row(binlog有三種格式:Statement、Row以及Mixed)
server-id=1
log-bin=/data/mysql/log/binlog(這一步開啟binlog)
binlog_format=row
具體如下圖:

3:重啟msyql服務(wù)
sudo service mysqld restart
4:查看是否已經(jīng)開啟binlog
Mysql>show variables like '%log_bin%';

此時(shí)在/data/mysql/log/binlog目錄下可以看到生成了相應(yīng)的binlog監(jiān)聽日志文件,如圖,binlog.000004文件,每次重啟msyql服務(wù),就會(huì)生成一個(gè)新的監(jiān)聽文件,這里日志文件名稱跟步驟2中配置的log-bin=/data/mysql/log/binlog有關(guān),如果log-bin=master配置這樣,那么日志文件名稱就是master.000004。

二:配置Maxwell相關(guān)的部署工作
1:下載Maxwell
官網(wǎng):http://maxwells-daemon.io/
組件下載鏈接
https://github.com/zendesk/maxwell/releases/download/v1.11.0/maxwell-1.11.0.tar.gz
2:上傳maxwell-1.11.0.tar.gz
? ?通過winscp等FTP工具把maxwell-1.11.0.tar.gz到/usr/local/maxwell/ 目錄下
3:解壓maxwell-1.11.0.tar.gz
? ? tar -zxvf maxwell-1.11.0.tar.gz
4:創(chuàng)建maxwell使用的mysql用戶并賦權(quán)
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
以上圖片為官網(wǎng)參考
以我自己的為例:
GRANT ALL on *.* to 'user01'@'%' identified by 'test123';
把所有數(shù)據(jù)庫的所有表授權(quán)給user01用戶以密碼test123登錄
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';
flush privileges;
(mysql 新設(shè)置用戶或更改密碼后需用flush privileges刷新MySQL的系統(tǒng)權(quán)限相關(guān)表,否則會(huì)出現(xiàn)拒絕訪問,還有一種方法,就是重新啟動(dòng)mysql服務(wù)器,來使新設(shè)置生效。-)
4::開啟maxwell命令行(注意,如果沒有設(shè)置,maxwell默認(rèn)是把監(jiān)聽的mysql的binlog日志發(fā)送到kafka的主題叫maxwell的topic上的)
bin/maxwell --user='maxwell' --password='xxxx' --host='數(shù)據(jù)庫IP地址' --producer=kafka --kafka.bootstrap.servers=bi-master:9092
解釋:數(shù)據(jù)庫IP地址參數(shù)是安裝mysql的那臺(tái)主機(jī),最后的kafka.bootstrap.servers是安裝kafka集群的節(jié)點(diǎn)主機(jī)名(最好不要用IP地址)和端口號(hào)。
三:kafka相關(guān)配置
說明(以下我的kafka是安裝在主機(jī)名叫bi-master,注意kafka里的配置文件端口號(hào)要和命令行里給的端口號(hào)一致)
1:首先啟動(dòng)zookeeper
$sbin/zkServer.sh start
2:開啟kafka命令行
bin/kafka-server-start.shconfig/server.properties
3:在kafka中創(chuàng)建一個(gè)主題叫maxwell以便于接受數(shù)據(jù)
bin/kafka-topics.sh--create --zookeeper bi-master:2181 --replication-factor 1 --partitions 1 --topic maxwell
4:?jiǎn)?dòng)kafka消息生產(chǎn)者窗口
bin/kafka-console-producer.sh --broker-list bi-master:9092 --topic maxwell
5:?jiǎn)?dòng)kafka消息消費(fèi)者窗口
bin/kafka-console-consumer.sh --zookeeper bi-master:2181 --topic maxwell --from-beginning
四:測(cè)試
1:在Mysql客戶工具 中添加一個(gè)數(shù)據(jù)


消費(fèi)窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"insert","ts":1515494531,"xid":7693,"commit":true,"data":{"name":"小梅","sex":"女","age":18,"address":"深圳市南山區(qū)海岸城"}}
2:修改一條數(shù)據(jù):

消費(fèi)窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"update","ts":1515494707,"xid":7756,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田區(qū)"},"old":{"sex":"女","address":"深圳市南山區(qū)海岸城"}}
3:刪除一條數(shù)據(jù)

消費(fèi)窗口接收到的消息:
{"database":"binlogTest","table":"myTest","type":"delete","ts":1515494807,"xid":7799,"commit":true,"data":{"name":"小梅","sex":"男","age":18,"address":"深圳市福田區(qū)"}}
五:通過java程序去消費(fèi)kafka消息數(shù)據(jù)
1:jar包添加

2:代碼實(shí)現(xiàn)
第一種實(shí)現(xiàn):



第二種實(shí)現(xiàn):

