flume與kafka集成遇到的問題與解決思路

0x00 背景知識

基本上想去用flume的同學(xué)都知道點flume的用途了。flume是一個分布式,可靠的,易用的,可以將不同源的日志進(jìn)行,收集,匯總,或者存儲的中間件。

0x01 使用場景

  • 數(shù)據(jù)來源:系統(tǒng)現(xiàn)有日志,有python腳本源源不斷的從s3上拉下來,每10分鐘拉一次,一次可能會拉取多個日志文件,視日志量而定,每個文件最大是10w行,超過會被分割。
  • 數(shù)據(jù)流轉(zhuǎn):需要及時將上面產(chǎn)生的日志發(fā)往kafka

0x02 flume的使用

flume支持三種不同的agent來發(fā)送數(shù)據(jù),我這里比較符合的是spooldir這種方式.

  • 基本配置如下
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

0x03 遇到的問題

  • 文件讀寫報錯

SpoolDir Source throws IllegalStateException: File has changed size since being read

運行后報上面的錯,查了資料說是在flume讀這個文件時,該文件不能被繼續(xù)寫入。改了數(shù)據(jù)生成邏輯,在沒有寫完成前,以.tmp結(jié)尾,寫完后,再重命名去掉tmp后綴。涉及到的配置也比較簡單

a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
  • kafka partition數(shù)據(jù)不均勻問題

發(fā)送kafka partition上面的數(shù)據(jù)不均勻,每次發(fā)送時,只往一個partition上面發(fā),并沒有同時往多個partition上面發(fā)。

查了資料,說是發(fā)送消息時不指定key將會隨機(jī)發(fā),但事實上,并沒有。
這時,自己用python帶的kafka python庫直接發(fā)送測試,數(shù)據(jù)是均勻的。說明kafka集群是沒問題的。這時候問題出在 kafka sink端。

事情到了這里,似乎需要正面剛這個問題了。
去flume官網(wǎng)下載源文件

  • 查看kafka sink源碼,在flume-ng-kafka-sink這個文件夾下面,只有三個文件。
  • 查看最主要的文件 KafkaSink.java
  • 查看構(gòu)造key的過程,核心代碼如下
public static final String KEY_HDR = "key";

eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
messageList.add(data);

說明key是從event中拿到的,我們只需要在event中構(gòu)造一個包含key為 key 的header 鍵值對就能達(dá)到目的。

事情到了這里,似乎只要搞定event中加key就可以搞定了。

查詢官方文檔,發(fā)現(xiàn)還有一個攔截器 Interceptor 的玩意兒。

flume默認(rèn)提供了一些攔截器

  • Timestamp Interceptor
  • Host Interceptor
  • Static Interceptor
  • UUID Interceptor
  • Regex Filtering Interceptor
  • Regex Extractor Interceptor
  • Search and Replace Interceptor
  • orphline Interceptor
  • ...

我們需要一個能配置headerName的攔截器,找了一下,只有uuid攔截器符合要求。

a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key

加上上面二行,重啟flume

/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

查看kafka-manager中的partition中message的分布,果然妥妥的均勻了。

完整的配置如下:

a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

使用的版本為flume 1.6

其實,真正沒有隨機(jī)的原因本文并沒有直接去找到,只是另辟蹊徑解決了問題。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容