Maxwell介紹
Maxwell是一個守護程序,一個應(yīng)用程序,能夠讀取MySQL Binlogs然后解析輸出為json。支持?jǐn)?shù)據(jù)輸出到Kafka中,支持表和庫過濾。
→ Reference:http://maxwells-daemon.io
→ Download:?https://github.com/zendesk/maxwell/releases/download/v1.10.3/maxwell-1.10.3.tar.gz
→ Source:?https://github.com/zendesk/maxwell
配置MySQL->Maxwell->Kafka->Flume->HDFS
1)MySQL配置要求
配置要求
[mysqld]
server-id=1
log-bin=master
binlog_format=row
binlog_row_image=FULL
權(quán)限要求
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'maxwell';
GRANT ALL on maxwell.* to 'maxwell'@'localhost' identified by 'maxwell';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost';
2)安裝配置Kafka
確認(rèn)已安裝java運行環(huán)境,直接解壓Kafka即可使用。
$ tar xvf kafka_2.10-0.10.2.1.tgz -C /usr/local/elk
解壓后,編輯配置文件:
$ cat /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties????
############################# Server Basics #############################
broker.id=0
delete.topic.enable=true
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
kafka需要依賴zookeeper,所以需要先啟動zookeeper。
$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/zookeeper.properties &
啟動Kafka Server:(指定JMX_PORT端口,可以通過Kafka-manager獲取統(tǒng)計信息)
$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties &
3)安裝配置Flume
去Apache官網(wǎng)下載Flume二進制安裝包,然后解壓即可。
tar xvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/
ln -sv /usr/local/apache-flume-1.7.0-bin/ /usr/local/flume
設(shè)置環(huán)境變量
$ cat /etc/profile.d/flume.sh
export FLUME_HOME=/usr/local/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
查看Flume版本
$ flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
創(chuàng)建配置文件和環(huán)境變量
$ cp -fr /usr/local/flume/conf/flume-conf.properties.template /usr/local/flume/conf/flume.conf
$ cp -fr /usr/local/flume/conf/flume-env.sh.template /usr/local/flume/conf/flume-env.sh
如果上面的JAVA_HOME設(shè)置好了,這里其實不需要設(shè)置flume-env.sh,也可以選擇配置。
$ cat /usr/local/flume/conf/flume-env.sh
# Enviroment variables can be set here.
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
# as it may result in logging sensitive user information or encryption secrets.
export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""
4)安裝配置Maxwell
Maxwell存儲在MySQL服務(wù)器本身所需要的所有狀態(tài),在schema_database選項指定的數(shù)據(jù)庫中。默認(rèn)情況下, 數(shù)據(jù)庫被命名為maxwell。
$ cd /usr/local/maxwell/;./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --port='3306' --producer=stdout
MySQL創(chuàng)造點數(shù)據(jù)
mysql> create database hadoop charset utf8;
Query OK, 1 row affected (0.02 sec)
mysql> use hadoop;
Database changed
mysql> create table test(id int,name varchar(10),address varchar(20));
Query OK, 0 rows affected (0.00 sec)
mysql> insert into test values(1,'dkey','ShangHai');
Query OK, 1 row affected (0.01 sec)
然后可以看到Maxwell的輸出信息:
04:16:48,341 INFO OpenReplicator - starting replication at mysql-bin.000004:6777
04:18:18,654 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:136974], lastHeartbeat=1497601097500]
after applying "create database hadoop charset utf8" to hadoop, new schema id is 2
04:20:24,430 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:255163], lastHeartbeat=1497601224355]
after applying "create table test(id int,name varchar(10),address varchar(20))" to hadoop, new schema id is 3
{"database":"hadoop","table":"test","type":"insert","ts":1497601280,"xid":929,"commit":true,"data":{"id":1,"name":"dkey","address":"ShangHai"}}
5)數(shù)據(jù)輸出到HDFS
Kafka創(chuàng)建topic
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1
查看主題
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper=127.0.0.1:2181 maxwell
查看主題詳情
$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --describe --topic maxwell
Topic:maxwell PartitionCount:1 ReplicationFactor:1 Configs:
Topic: maxwell Partition: 0 Leader: 0 Replicas: 0 Isr: 0
提供一份Flume配置文件(從Kafka收集日志到HDFS)
$ cat /usr/local/flume/conf/mysql-flume-hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = 127.0.0.1:2181
a1.sources.r1.topic = maxwell
a1.sources.r1.groupId = flume
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.kafka.consumer.timeout.ms = 100
# Describe the sink
a1.sinks.k1.type = hdfs
#a1.sinks.k1.hdfs.path = /mysql/%{topic}/%y-%m-%d
a1.sinks.k1.hdfs.path = hdfs://10.10.0.186:8020/mysql/%{topic}/%y-%m-%d
a1.sinks.k1.hdfs.rollInterval = 5
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動Flume
$ nohup flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/mysql-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,console &
如果啟動Flume時報錯:ERROR – org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType.
可能是因為你的Flume是獨立部署,需要依賴Hadoop HDFS的jar包,解決方法也很簡單,就是在Flume主機上解壓好Hadoop的二進制安裝包,然后輸出Hadoop環(huán)境變量即可,F(xiàn)lume會根據(jù)環(huán)境變量自動找到相關(guān)的依賴jar包。具體可以看:Hadoop實戰(zhàn):Flume輸入日志到HDFS報錯解決
另外,當(dāng)Flume-ng正常運行后,寫入HDFS時報錯:java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode=”/”:hadoop:supergroup:drwxr-xr-x.
這個提示很明顯,就是沒有寫入權(quán)限(因為你當(dāng)前運行flume-ng的用戶不是Hadoop用戶),解決方案也很簡單,就是切換到Hadoop用戶執(zhí)行flume-ng命令即可?;蛘唛_啟HDFS允許所有用戶進行文件寫入,默認(rèn)可能你沒有開啟。具體可以看:Hadoop實戰(zhàn):Flume輸入日志到HDFS報錯解決
啟動Maxwell
$ cd /usr/local/maxwell/;./bin/maxwell \
--user='maxwell' \
--password='maxwell' \
--host='127.0.0.1' \
--port='3306' \
--producer=kafka \
--kafka.bootstrap.servers=127.0.0.1:9092
測試MySQL->Maxwell->Kafka->Flume->HDFS
相關(guān)組件現(xiàn)在都已經(jīng)跑通了,接下來就是測試了,我們在MySQL插入一條數(shù)據(jù):
mysql> insert into hadoop.test values(5,'dkey5','Shanghai');
Query OK, 1 row affected (0.00 sec)
查看Kafka隊列
$ /usr/local/kafka/bin/kafka-console-consumer.sh -zookeeper=127.0.0.1:2181 --from-beginning --topic maxwell
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"database":"hadoop","table":"test","type":"insert","ts":1497607783,"xid":2414,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}}
然后去HDFS查看:
[hadoop@hadoop-nn ~]$ hdfs dfs -ls /mysql/maxwell/17-06-19/
Found 1 items
-rw-r--r--?? 3 hadoop supergroup????????148 2017-06-19 03:57 /mysql/maxwell/17-06-19/FlumeData.1497859019506
[hadoop@hadoop-nn ~]$ hdfs dfs -cat /mysql/maxwell/17-06-19/FlumeData.1497859019506
{"database":"hadoop","table":"test","type":"insert","ts":1497859014,"xid":372064,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}}
會自動創(chuàng)建相關(guān)目錄,并生成一個文件。
總結(jié)
整個MySQL->Maxwell->Flume->HDFS流程算是跑通了,但是此時也僅限于玩一玩而已,包括Flume和Kakfa都得深入學(xué)習(xí)一下。另外,我們可以看到寫入HDFS的數(shù)據(jù)時json的,可能還需要提取只需要的數(shù)據(jù),另外,對于update或delete操作目前還不知道要怎么處理。生產(chǎn)使用難度很大。