0x00 背景知識
基本上想去用flume的同學(xué)都知道點flume的用途了。flume是一個分布式,可靠的,易用的,可以將不同源的日志進(jìn)行,收集,匯總,或者存儲的中間件。
0x01 使用場景
- 數(shù)據(jù)來源:系統(tǒng)現(xiàn)有日志,有python腳本源源不斷的從s3上拉下來,每10分鐘拉一次,一次可能會拉取多個日志文件,視日志量而定,每個文件最大是10w行,超過會被分割。
- 數(shù)據(jù)流轉(zhuǎn):需要及時將上面產(chǎn)生的日志發(fā)往kafka
0x02 flume的使用
flume支持三種不同的agent來發(fā)送數(shù)據(jù),我這里比較符合的是spooldir這種方式.
- 基本配置如下
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200
# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1
0x03 遇到的問題
- 文件讀寫報錯
SpoolDir Source throws IllegalStateException: File has changed size since being read
運行后報上面的錯,查了資料說是在flume讀這個文件時,該文件不能被繼續(xù)寫入。改了數(shù)據(jù)生成邏輯,在沒有寫完成前,以.tmp結(jié)尾,寫完后,再重命名去掉tmp后綴。涉及到的配置也比較簡單
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
- kafka partition數(shù)據(jù)不均勻問題
發(fā)送kafka partition上面的數(shù)據(jù)不均勻,每次發(fā)送時,只往一個partition上面發(fā),并沒有同時往多個partition上面發(fā)。
查了資料,說是發(fā)送消息時不指定key將會隨機(jī)發(fā),但事實上,并沒有。
這時,自己用python帶的kafka python庫直接發(fā)送測試,數(shù)據(jù)是均勻的。說明kafka集群是沒問題的。這時候問題出在 kafka sink端。
事情到了這里,似乎需要正面剛這個問題了。
去flume官網(wǎng)下載源文件
- 查看kafka sink源碼,在flume-ng-kafka-sink這個文件夾下面,只有三個文件。
- 查看最主要的文件 KafkaSink.java
- 查看構(gòu)造key的過程,核心代碼如下
public static final String KEY_HDR = "key";
eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
說明key是從event中拿到的,我們只需要在event中構(gòu)造一個包含key為 key 的header 鍵值對就能達(dá)到目的。
事情到了這里,似乎只要搞定event中加key就可以搞定了。
查詢官方文檔,發(fā)現(xiàn)還有一個攔截器 Interceptor 的玩意兒。
flume默認(rèn)提供了一些攔截器
- Timestamp Interceptor
- Host Interceptor
- Static Interceptor
- UUID Interceptor
- Regex Filtering Interceptor
- Regex Extractor Interceptor
- Search and Replace Interceptor
- orphline Interceptor
- ...
我們需要一個能配置headerName的攔截器,找了一下,只有uuid攔截器符合要求。
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
加上上面二行,重啟flume
/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
查看kafka-manager中的partition中message的分布,果然妥妥的均勻了。
完整的配置如下:
a1.sources = flume0
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200
# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1
使用的版本為flume 1.6
其實,真正沒有隨機(jī)的原因本文并沒有直接去找到,只是另辟蹊徑解決了問題。