06_Flume之各種Channel的介紹及參數(shù)解析

一、Channel介紹

Channel被設(shè)計(jì)為Event中轉(zhuǎn)臨時(shí)緩沖區(qū),存儲(chǔ)Source收集并且沒(méi)有被Sink讀取的Event,為平衡Source收集和Sink讀取數(shù)據(jù)的速度,可視為Flume內(nèi)部的消息隊(duì)列。Channel線(xiàn)程安全并且具有事務(wù)性,支持source寫(xiě)失敗重復(fù)寫(xiě)和sink讀失敗重復(fù)讀等操作。

常用的Channel類(lèi)型有Memory Channel、File Channel、KafkaChannel等。

Channel類(lèi)型 說(shuō)明
MemoryChannel 基于內(nèi)存的channel,實(shí)際就是將event存放于內(nèi)存中一個(gè)固定大小的隊(duì)列中。其優(yōu)點(diǎn)是速度快,缺點(diǎn)是agent掛掉時(shí)會(huì)丟失數(shù)據(jù)。
FileChannel 基于文件的Channel, 在磁盤(pán)上指定一個(gè)目錄用于存放event,同時(shí)也可以指定目錄的大小。優(yōu)點(diǎn)是數(shù)據(jù)可恢復(fù),相對(duì)于memory channel來(lái)說(shuō)缺點(diǎn)是要頻繁的讀取磁盤(pán),速度較慢。
Spillable Memory Channel Event存放在內(nèi)存和磁盤(pán)上,內(nèi)存作為主要存儲(chǔ),當(dāng)內(nèi)存達(dá)到一定臨界點(diǎn)的時(shí)候會(huì)溢寫(xiě)到磁盤(pán)上,兼具M(jìn)emory Channel和File Channel的優(yōu)勢(shì),但不穩(wěn)定,不建議生產(chǎn)環(huán)境使用,并且性能不佳。
JDBC Channel 將event存放于一個(gè)支持JDBC連接的數(shù)據(jù)庫(kù)中,目前官方推薦的是Derby庫(kù),其優(yōu)點(diǎn)是數(shù)據(jù)可以恢復(fù),速度比FileChannel慢
Kafka Channel 將events存儲(chǔ)在Kafka集群中。Kafka提供高可用性和高可靠性,所以當(dāng)agent或者kafka broker 崩潰時(shí),events能馬上被其他sinks可用。Kafka channel可以被多個(gè)場(chǎng)景使用:
Flume source和sink - 它為events提供可靠和高可用的channel;
Flume source和interceptor,但是沒(méi)sink - 它允許寫(xiě)Flume evnets到Kafka topic;
Flume sink,但是沒(méi)source - 這是一種低延遲,容錯(cuò)的方式從Kafka發(fā)送events到Flume sinks

1、Memory Channel

對(duì)比Channel, Memory Channel讀寫(xiě)速度快,但是存儲(chǔ)數(shù)據(jù)量小,F(xiàn)lume進(jìn)程掛掉、服務(wù)器停機(jī)或者重啟都會(huì)導(dǎo)致數(shù)據(jù)丟失。部署Flume Agent的線(xiàn)上服務(wù)器內(nèi)存資源充足、不關(guān)心數(shù)據(jù)丟失的場(chǎng)景下可以使用。

① 配置參數(shù)解析:

配置參數(shù) 默認(rèn)值 描述
type memory 類(lèi)型名稱(chēng)
capacity 100 channel中存儲(chǔ)的最大event數(shù)
transactionCapacity 100 每一次事務(wù)中寫(xiě)入和讀取的event最大數(shù)
keep-alive 3 在Channel中寫(xiě)入或讀取event等待完成的超時(shí)時(shí)間,單位:秒
byteCapacityBufferPercentage 20 緩沖空間占Channel容量(byteCapacity)的百分比,為event中的頭信息保留了空間,單位:百分比
byteCapacity Flume堆內(nèi)存的80% 允許的最大總字節(jié)作為此通道中所有事件的總和。 實(shí)現(xiàn)只計(jì)算Eventbody,這也是提供byteCapacityBufferPercentage配置參數(shù)的原因。 默認(rèn)為計(jì)算值,等于JVM可用的最大內(nèi)存的80%(即命令行傳遞的-Xmx值的80%)。 請(qǐng)注意,如果在單個(gè)JVM上有多個(gè)內(nèi)存通道,并且它們碰巧保持相同的物理事件(即,如果您使用來(lái)自單個(gè)源的復(fù)制通道選擇器),那么這些事件大小可能會(huì)因?yàn)橥ǖ纀yteCapacity目的而被重復(fù)計(jì)算。 將此值設(shè)置為“0”將導(dǎo)致此值回退到大約200 GB的內(nèi)部硬限制。

配置 capacity 和 值 。默認(rèn)配置規(guī)則為:

channels.capacity >= channels.transactionCapacity >= source.batchSize

官方channels配置示例
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
本案例修改之后的channels 配置
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

提示

舉2個(gè)例子來(lái)幫助理解最后兩個(gè)參數(shù)吧:

兩個(gè)例子都有共同的前提,假設(shè)JVM最大的可用內(nèi)存是100M(或者說(shuō)JVM啟動(dòng)時(shí)指定了-Xmx=100m)。

  • 例子1: byteCapacityBufferPercentage 設(shè)置為20, byteCapacity 設(shè)置為52428800(就是50M),此時(shí)內(nèi)存中所有 Event body 的總大小就被限制為50M *(1-20%)=40M,內(nèi)存channel可用內(nèi)存是50M。

  • 例子2: byteCapacityBufferPercentage 設(shè)置為10, byteCapacity 不設(shè)置,此時(shí)內(nèi)存中所有 Event body 的總大小就被限制為100M * 80% *(1-10%)=72M,內(nèi)存channel可用內(nèi)存是80M。

② 簡(jiǎn)單模板

# 命名 Agent 上的組件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = avro
XXX
XXX

# channel
# channel中存儲(chǔ)的最大event數(shù)為3000000,一次事務(wù)中可讀取或添加的event數(shù)為20000
agent_name.channels.channel_name.type = memory
agent_name.channels.channel_name.capacity = 10000
agent_name.channels.channel_name.transactionCapacity = 10000

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

2、File Channel

介紹:

  1. 將 event 寫(xiě)入磁盤(pán)文件,與 Memory Channel 相比存儲(chǔ)容量大,無(wú)數(shù)據(jù)丟失風(fēng)險(xiǎn)。
  2. File Channle 數(shù)據(jù)存儲(chǔ)路徑可以配置多磁盤(pán)文件路徑,通過(guò)磁盤(pán)并行寫(xiě)入提高FileChannel 性能。
  3. Flume 將 Event 順序?qū)懭氲?File Channel 文件的末尾,在配置文件中通過(guò)設(shè)置 maxFileSize 參數(shù)配置數(shù)據(jù)文件大小,當(dāng)被寫(xiě)入的文件大小達(dá)到上限時(shí) Flume 會(huì)重新創(chuàng)建新的文件存儲(chǔ)寫(xiě)入的 Event。
  4. 當(dāng)然數(shù)據(jù)文件數(shù)量也不會(huì)無(wú)限增長(zhǎng),當(dāng)一個(gè)已關(guān)閉的只讀數(shù)據(jù)文件中的 Event 被讀取完成,并且 Sink 已經(jīng)提交讀取完成的事務(wù),則 Flume 將刪除存儲(chǔ)該數(shù)據(jù)的文件。
  5. Flume 通過(guò)設(shè)置檢查點(diǎn)和備份檢查點(diǎn)實(shí)現(xiàn)在 Agent 重啟之后快速將 File Channle 中的數(shù)據(jù)按順序回放到內(nèi)存中,保證在 Agent 失敗重啟后仍然能夠快速安全地提供服務(wù)。

① 配置參數(shù)解析:

配置參數(shù) 默認(rèn)值 描述
type file 類(lèi)型名稱(chēng)
checkpointDir 默認(rèn)在啟動(dòng)flume用戶(hù)目錄下創(chuàng)建 檢查點(diǎn)目錄,建議單獨(dú)配置磁盤(pán)路徑
useDualCheckpoints false 是否開(kāi)啟備份檢查點(diǎn),
建議設(shè)置為true開(kāi)啟備份檢查點(diǎn),備份檢查點(diǎn)的作用是當(dāng)Agent意外出錯(cuò)導(dǎo)致寫(xiě)入檢查點(diǎn)文件異常,在重新啟動(dòng)File Channel時(shí)通過(guò)備份檢查點(diǎn)將數(shù)據(jù)回放到內(nèi)存中,
如果不開(kāi)啟備份檢查點(diǎn),在數(shù)據(jù)回放的過(guò)程中發(fā)現(xiàn)檢查點(diǎn)文件異常會(huì)對(duì)所有數(shù)據(jù)進(jìn)行全回放,全回放的過(guò)程相當(dāng)耗時(shí)
backupCheckpointDir 備份檢查點(diǎn)目錄,最好不要和檢查點(diǎn)目錄(checkpointDir)在同一塊磁盤(pán)上
checkpointInterval 30000 每次寫(xiě)檢查點(diǎn)的時(shí)間間隔,單位:毫秒
dataDirs 存儲(chǔ)event信息磁盤(pán)存儲(chǔ)路徑,建議配置多塊盤(pán)的多個(gè)路徑,通過(guò)磁盤(pán)的并行寫(xiě)入來(lái)提高file channel性能,多個(gè)磁盤(pán)路徑用逗號(hào)隔開(kāi)
transactionCapacity 10000 一次事務(wù)中寫(xiě)入和讀取的event最大數(shù)
maxFileSize 2146435071 每個(gè)數(shù)據(jù)文件的最大大小,單位:字節(jié)
minimumRequiredSpace 磁盤(pán)路徑最小剩余空間,如果磁盤(pán)剩余空間小于設(shè)置值,則不再寫(xiě)入數(shù)據(jù)
capacity filechannel可容納的最大event數(shù)
keep-alive 3 在Channel中寫(xiě)入或讀取event等待完成的超時(shí)時(shí)間,單位:秒

② 簡(jiǎn)單模板

# 命名 Agent 上的組件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name

# source
agent_name.sources.source_name.type = avro
XXX
XXX

# channel
# channel中存儲(chǔ)的最大event數(shù)為3000000,一次事務(wù)中可讀取或添加的event數(shù)為20000
# 檢查點(diǎn)路徑為/usr/local/flume/checkpoint,數(shù)據(jù)存放路徑為/data1, /data2,開(kāi)啟備份檢查點(diǎn),備份檢查點(diǎn)路徑為/data/flume/backup/checkpoint
agent_name.channels.channel_name.type = file
agent_name.channels.channel_name.dataDirs = ${log_path}/dataDir1, ${log_path}/dataDir2
agent_name.channels.channel_name.checkpointDir = ${exec_log_path}/stat_info_checkpointDir
agent_name.channels.channel_name.useDualCheckpoints = true 
agent_name.channels.channel_name.backupCheckpointDir = /data/flume/backup/checkpoint
agent_name.channels.channel_name.capacity = 3000000
agent_name.channels.channel_name.transactionCapacity = 20000
agent_name.channels.channel_name.keep-alive = 5

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

3、Kafka Channel

將Kafka作為Channel存儲(chǔ),Kafka是分布式、可擴(kuò)展、高容錯(cuò)、高吞吐的分布式系統(tǒng),Kafka通過(guò)優(yōu)秀的架構(gòu)設(shè)計(jì)充分利用磁盤(pán)順序特性,在廉價(jià)的硬件條件下完成高效的消息發(fā)布和訂閱。

Memory Channel在使用的過(guò)程中受內(nèi)存容量的限制不能緩存大量的消息,并且如果Memory Channel中的消息沒(méi)來(lái)得及寫(xiě)入Sink,此時(shí)Agent出現(xiàn)故障就會(huì)造成數(shù)據(jù)丟失。File Channel雖然能夠緩存更多的消息,但如果緩存下來(lái)的消息還沒(méi)有寫(xiě)入Sink,此時(shí)Agent出現(xiàn)故障則File Channel中的消息不能被繼續(xù)使用,直到該Agent重新恢復(fù)才能夠繼續(xù)使用File Channel中的消息。Kafka Channel相對(duì)于Memory Channel和File Channel存儲(chǔ)容量更大、容錯(cuò)能力更強(qiáng),彌補(bǔ)了其他兩種Channel的短板,如果合理利用Kafka的性能,能夠達(dá)到事半功倍的效果。

有了Kafka Channel可以在日志收集層只配置Source組件和Kafka Channel組件,不需要再配置Sink組件,減少了日志收集層啟動(dòng)的進(jìn)程數(shù)并且有效降低服務(wù)器內(nèi)存、磁盤(pán)等資源使用率,日志匯聚層可以只配置Kafka Channel和Sink,不需要再配置Source,減少日志匯聚層的進(jìn)程數(shù),這樣的配置既能降低服務(wù)器的資源使用率又能減少Event在網(wǎng)絡(luò)之間的傳輸,有效提高日志采集系統(tǒng)的性能。

① 配置參數(shù)解析:

參數(shù) 默認(rèn)值 描述
type org.apache.flume.channel.kafka.KafkaChannel 類(lèi)型名稱(chēng)
kafka.bootstrap.servers Kafka broker列表,格式為ip1:port1,ip2:port2…,建議配置多個(gè)值提高容錯(cuò)能力,多個(gè)值之間用逗號(hào)隔開(kāi)
kafka.topic flume-channel topic名稱(chēng)
kafka.consumer.group.id flume Consumer組id, Kafka Channel使用 consumer.group.id 注冊(cè)到Kafka,該值是連接kafka集群的唯一值,同一組內(nèi)可以有多個(gè)Consumer,多個(gè)Consumer之間是互不干擾的,一個(gè)主題下的一條消息只能被同一組內(nèi)的一個(gè)Consumer消費(fèi),其中的一個(gè)Consumer消費(fèi)失敗其他的Consumer會(huì)繼續(xù)消費(fèi)
基于這個(gè)特性,可以有多個(gè)Agent的KafkaChannel使用相同的consumer.group.id,當(dāng)一個(gè)Agent運(yùn)行失敗則其他Agent可以繼續(xù)消費(fèi),很容易地提高了消息的容錯(cuò)能力
parseAsFlumeEvent true 是否以Avro FlumeEvent模式寫(xiě)入到Kafka Channel中
如果寫(xiě)入到Kafka Channel中主題的Producer只有Flume Source,則該參數(shù)應(yīng)該設(shè)置為true
如果有其他Producer也同時(shí)在向同一主題寫(xiě)數(shù)據(jù)則該參數(shù)應(yīng)該設(shè)置為false
Flume Source寫(xiě)入到Kafka的消息在Kafka外部需要使用flume-ng-sdk提供的org.apache. flume.source.avro.AvroFlumeEvent類(lèi)解析
migrateZookeeperOffsets true 是否遷移Zookeeper中存儲(chǔ)的Consumer消費(fèi)的偏移量到Kafka中,主要是為了兼容Kafka0.9以下版本的Kafka
Kafka 0.9以下版本Consumer消費(fèi)的偏移量保存在Zookeeper中
Kafka 0.9之后的版本開(kāi)始將偏移量保存到Kafka的一個(gè)主題中
pollTimeout 500毫秒 輪詢(xún)超時(shí)時(shí)間
kafka.consumer.auto.offset.reset latest 當(dāng)Kafka中沒(méi)有Consumer消費(fèi)的初始偏移量或者當(dāng)前偏移量在Kafka中不存在(比如數(shù)據(jù)已經(jīng)被刪除)情況下,Consumer選擇從Kafka拉取消息的方式
earliest表示從最早的偏移量開(kāi)始拉取
latest表示從最新的偏移量開(kāi)始拉取
none表示如果沒(méi)有發(fā)現(xiàn)該Consumer組之前拉取的偏移量則拋出異常
kafka.enable.auto.commit alse Consumer是否自動(dòng)提交偏移量

Kafka Channel相關(guān)操作在org.apache.flume.channel.kafka包的KafkaChannel類(lèi)定義,
kafka相關(guān)參數(shù)的默認(rèn)值在org.apache.kafka.clients.CommonClientConfigs包中的KafkaChannel-Configuration中。
Kafka的通用配置參數(shù)在配置文件中都以“kafka.”為前綴,針對(duì)Producer或者Consumer的相關(guān)配置以“kafka.producer. ”或者“kafka.consumer. ”為前綴,
源碼 KafkaChannelConfiguration 中相關(guān)默認(rèn)配置參數(shù)定義如下:

KAFKA_PREFIX = "kafka.";
KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
DEFAULT_ACKS = "all";
DEFAULT_KEY_SERIALIZER ="org.apache.kafka.common.serialization.StringSerializer";
DEFAULT_VALUE_SERIAIZER ="org.apache.kafka.common.serialization.ByteArraySerializer";
DEFAULT_KEY_DESERIALIZER ="org.apache.kafka.common.serialization.StringDeserializer";
DEFAULT_VALUE_DESERIAIZER ="org.apache.kafka.common.serialization.ByteArrayDeserializer";
TOPIC_CONFIG = KAFKA_PREFIX + "topic";
BOOTSTRAP_SERVERS_CONFIG =KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
DEFAULT_TOPIC = "flume-channel";
DEFAULT_GROUP_ID = "flume";
POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
DEFAULT_POLL_TIMEOUT = 500;
KEY_HEADER = "key";
DEFAULT_AUTO_OFFSET_RESET = "earliest";
PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
DEFAULT_PARSE_AS_FLUME_EVENT = true;
PARTITION_HEADER_NAME = "partitionIdHeader";
STATIC_PARTITION_CONF = "defaultPartitionId";
MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;/*** Flume1.7以前版本默認(rèn)參數(shù)****/
BROKER_LIST_KEY = "metadata.broker.list";
REQUIRED_ACKS_KEY = "request.required.acks";
BROKER_LIST_FLUME_KEY = "brokerList";
//TOPIC = "topic";
GROUP_ID_FLUME = "groupId";
AUTO_COMMIT_ENABLED = "auto.commit.enable";
ZOOKEEPER_CONNECT = "zookeeper.connect";
ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
TIMEOUT = "timeout";
DEFAULT_TIMEOUT = "100";
CONSUMER_TIMEOUT = "consumer.timeout.ms";
READ_SMALLEST_OFFSET = "readSmallestOffset";
DEFAULT_READ_SMALLEST_OFFSET = false;

② 簡(jiǎn)單模板

# 命名 Agent 上的組件
agent_name.channels = channel_name
agent_name.sinks = sink_name

# channel
agent_name.channels.channel_name.type = org.apache.flume.channel.kafka.KafkaChannel
agent_name.channels.channel_name.kafka.bootstrap.servers = zkServer01:9092, zkServer02:9092 
agent_name.channels.channel_name.kafka.topic = test_channel
agent_name.channels.channel_name.kafka.consumer.group.id = test-consumer

# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX

# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name

說(shuō)明:agent_name 沒(méi)有配置Source,只配置了Channel和Sink,使用的Channel類(lèi)型為Kafka Channel,主題名稱(chēng)為“test_channel”, consumer組id為“test-consumer”, Sink類(lèi)型為 hdfs 滾動(dòng)生成文件,對(duì)接的Channel為KafkaChannel channel_name。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容