一、Channel介紹
Channel被設(shè)計(jì)為Event中轉(zhuǎn)臨時(shí)緩沖區(qū),存儲(chǔ)Source收集并且沒(méi)有被Sink讀取的Event,為平衡Source收集和Sink讀取數(shù)據(jù)的速度,可視為Flume內(nèi)部的消息隊(duì)列。Channel線(xiàn)程安全并且具有事務(wù)性,支持source寫(xiě)失敗重復(fù)寫(xiě)和sink讀失敗重復(fù)讀等操作。
常用的Channel類(lèi)型有Memory Channel、File Channel、KafkaChannel等。
| Channel類(lèi)型 | 說(shuō)明 |
|---|---|
| MemoryChannel | 基于內(nèi)存的channel,實(shí)際就是將event存放于內(nèi)存中一個(gè)固定大小的隊(duì)列中。其優(yōu)點(diǎn)是速度快,缺點(diǎn)是agent掛掉時(shí)會(huì)丟失數(shù)據(jù)。 |
| FileChannel | 基于文件的Channel, 在磁盤(pán)上指定一個(gè)目錄用于存放event,同時(shí)也可以指定目錄的大小。優(yōu)點(diǎn)是數(shù)據(jù)可恢復(fù),相對(duì)于memory channel來(lái)說(shuō)缺點(diǎn)是要頻繁的讀取磁盤(pán),速度較慢。 |
| Spillable Memory Channel | Event存放在內(nèi)存和磁盤(pán)上,內(nèi)存作為主要存儲(chǔ),當(dāng)內(nèi)存達(dá)到一定臨界點(diǎn)的時(shí)候會(huì)溢寫(xiě)到磁盤(pán)上,兼具M(jìn)emory Channel和File Channel的優(yōu)勢(shì),但不穩(wěn)定,不建議生產(chǎn)環(huán)境使用,并且性能不佳。 |
| JDBC Channel | 將event存放于一個(gè)支持JDBC連接的數(shù)據(jù)庫(kù)中,目前官方推薦的是Derby庫(kù),其優(yōu)點(diǎn)是數(shù)據(jù)可以恢復(fù),速度比FileChannel慢 |
| Kafka Channel | 將events存儲(chǔ)在Kafka集群中。Kafka提供高可用性和高可靠性,所以當(dāng)agent或者kafka broker 崩潰時(shí),events能馬上被其他sinks可用。Kafka channel可以被多個(gè)場(chǎng)景使用: Flume source和sink - 它為events提供可靠和高可用的channel; Flume source和interceptor,但是沒(méi)sink - 它允許寫(xiě)Flume evnets到Kafka topic; Flume sink,但是沒(méi)source - 這是一種低延遲,容錯(cuò)的方式從Kafka發(fā)送events到Flume sinks |
1、Memory Channel
對(duì)比Channel, Memory Channel讀寫(xiě)速度快,但是存儲(chǔ)數(shù)據(jù)量小,F(xiàn)lume進(jìn)程掛掉、服務(wù)器停機(jī)或者重啟都會(huì)導(dǎo)致數(shù)據(jù)丟失。部署Flume Agent的線(xiàn)上服務(wù)器內(nèi)存資源充足、不關(guān)心數(shù)據(jù)丟失的場(chǎng)景下可以使用。
① 配置參數(shù)解析:
| 配置參數(shù) | 默認(rèn)值 | 描述 |
|---|---|---|
| type | memory | 類(lèi)型名稱(chēng) |
| capacity | 100 | channel中存儲(chǔ)的最大event數(shù) |
| transactionCapacity | 100 | 每一次事務(wù)中寫(xiě)入和讀取的event最大數(shù) |
| keep-alive | 3 | 在Channel中寫(xiě)入或讀取event等待完成的超時(shí)時(shí)間,單位:秒 |
| byteCapacityBufferPercentage | 20 | 緩沖空間占Channel容量(byteCapacity)的百分比,為event中的頭信息保留了空間,單位:百分比 |
| byteCapacity | Flume堆內(nèi)存的80% | 允許的最大總字節(jié)作為此通道中所有事件的總和。 實(shí)現(xiàn)只計(jì)算Eventbody,這也是提供byteCapacityBufferPercentage配置參數(shù)的原因。 默認(rèn)為計(jì)算值,等于JVM可用的最大內(nèi)存的80%(即命令行傳遞的-Xmx值的80%)。 請(qǐng)注意,如果在單個(gè)JVM上有多個(gè)內(nèi)存通道,并且它們碰巧保持相同的物理事件(即,如果您使用來(lái)自單個(gè)源的復(fù)制通道選擇器),那么這些事件大小可能會(huì)因?yàn)橥ǖ纀yteCapacity目的而被重復(fù)計(jì)算。 將此值設(shè)置為“0”將導(dǎo)致此值回退到大約200 GB的內(nèi)部硬限制。 |
配置 capacity 和 值 。默認(rèn)配置規(guī)則為:
channels.capacity >= channels.transactionCapacity >= source.batchSize
官方channels配置示例
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
本案例修改之后的channels 配置
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
提示
舉2個(gè)例子來(lái)幫助理解最后兩個(gè)參數(shù)吧:
兩個(gè)例子都有共同的前提,假設(shè)JVM最大的可用內(nèi)存是100M(或者說(shuō)JVM啟動(dòng)時(shí)指定了-Xmx=100m)。
例子1: byteCapacityBufferPercentage 設(shè)置為20, byteCapacity 設(shè)置為52428800(就是50M),此時(shí)內(nèi)存中所有 Event body 的總大小就被限制為50M *(1-20%)=40M,內(nèi)存channel可用內(nèi)存是50M。
例子2: byteCapacityBufferPercentage 設(shè)置為10, byteCapacity 不設(shè)置,此時(shí)內(nèi)存中所有 Event body 的總大小就被限制為100M * 80% *(1-10%)=72M,內(nèi)存channel可用內(nèi)存是80M。
② 簡(jiǎn)單模板
# 命名 Agent 上的組件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name
# source
agent_name.sources.source_name.type = avro
XXX
XXX
# channel
# channel中存儲(chǔ)的最大event數(shù)為3000000,一次事務(wù)中可讀取或添加的event數(shù)為20000
agent_name.channels.channel_name.type = memory
agent_name.channels.channel_name.capacity = 10000
agent_name.channels.channel_name.transactionCapacity = 10000
# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX
# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name
2、File Channel
介紹:
- 將 event 寫(xiě)入磁盤(pán)文件,與 Memory Channel 相比存儲(chǔ)容量大,無(wú)數(shù)據(jù)丟失風(fēng)險(xiǎn)。
- File Channle 數(shù)據(jù)存儲(chǔ)路徑可以配置多磁盤(pán)文件路徑,通過(guò)磁盤(pán)并行寫(xiě)入提高FileChannel 性能。
- Flume 將 Event 順序?qū)懭氲?File Channel 文件的末尾,在配置文件中通過(guò)設(shè)置 maxFileSize 參數(shù)配置數(shù)據(jù)文件大小,當(dāng)被寫(xiě)入的文件大小達(dá)到上限時(shí) Flume 會(huì)重新創(chuàng)建新的文件存儲(chǔ)寫(xiě)入的 Event。
- 當(dāng)然數(shù)據(jù)文件數(shù)量也不會(huì)無(wú)限增長(zhǎng),當(dāng)一個(gè)已關(guān)閉的只讀數(shù)據(jù)文件中的 Event 被讀取完成,并且 Sink 已經(jīng)提交讀取完成的事務(wù),則 Flume 將刪除存儲(chǔ)該數(shù)據(jù)的文件。
- Flume 通過(guò)設(shè)置檢查點(diǎn)和備份檢查點(diǎn)實(shí)現(xiàn)在 Agent 重啟之后快速將 File Channle 中的數(shù)據(jù)按順序回放到內(nèi)存中,保證在 Agent 失敗重啟后仍然能夠快速安全地提供服務(wù)。
① 配置參數(shù)解析:
| 配置參數(shù) | 默認(rèn)值 | 描述 |
|---|---|---|
| type | file | 類(lèi)型名稱(chēng) |
| checkpointDir | 默認(rèn)在啟動(dòng)flume用戶(hù)目錄下創(chuàng)建 | 檢查點(diǎn)目錄,建議單獨(dú)配置磁盤(pán)路徑 |
| useDualCheckpoints | false | 是否開(kāi)啟備份檢查點(diǎn), 建議設(shè)置為true開(kāi)啟備份檢查點(diǎn),備份檢查點(diǎn)的作用是當(dāng)Agent意外出錯(cuò)導(dǎo)致寫(xiě)入檢查點(diǎn)文件異常,在重新啟動(dòng)File Channel時(shí)通過(guò)備份檢查點(diǎn)將數(shù)據(jù)回放到內(nèi)存中, 如果不開(kāi)啟備份檢查點(diǎn),在數(shù)據(jù)回放的過(guò)程中發(fā)現(xiàn)檢查點(diǎn)文件異常會(huì)對(duì)所有數(shù)據(jù)進(jìn)行全回放,全回放的過(guò)程相當(dāng)耗時(shí) |
| backupCheckpointDir | 備份檢查點(diǎn)目錄,最好不要和檢查點(diǎn)目錄(checkpointDir)在同一塊磁盤(pán)上 | |
| checkpointInterval | 30000 | 每次寫(xiě)檢查點(diǎn)的時(shí)間間隔,單位:毫秒 |
| dataDirs | 存儲(chǔ)event信息磁盤(pán)存儲(chǔ)路徑,建議配置多塊盤(pán)的多個(gè)路徑,通過(guò)磁盤(pán)的并行寫(xiě)入來(lái)提高file channel性能,多個(gè)磁盤(pán)路徑用逗號(hào)隔開(kāi) | |
| transactionCapacity | 10000 | 一次事務(wù)中寫(xiě)入和讀取的event最大數(shù) |
| maxFileSize | 2146435071 | 每個(gè)數(shù)據(jù)文件的最大大小,單位:字節(jié) |
| minimumRequiredSpace | 磁盤(pán)路徑最小剩余空間,如果磁盤(pán)剩余空間小于設(shè)置值,則不再寫(xiě)入數(shù)據(jù) | |
| capacity | filechannel可容納的最大event數(shù) | |
| keep-alive | 3 | 在Channel中寫(xiě)入或讀取event等待完成的超時(shí)時(shí)間,單位:秒 |
② 簡(jiǎn)單模板
# 命名 Agent 上的組件
agent_name.sources = source_name
agent_name.channels = channel_name
agent_name.sinks = sink_name
# source
agent_name.sources.source_name.type = avro
XXX
XXX
# channel
# channel中存儲(chǔ)的最大event數(shù)為3000000,一次事務(wù)中可讀取或添加的event數(shù)為20000
# 檢查點(diǎn)路徑為/usr/local/flume/checkpoint,數(shù)據(jù)存放路徑為/data1, /data2,開(kāi)啟備份檢查點(diǎn),備份檢查點(diǎn)路徑為/data/flume/backup/checkpoint
agent_name.channels.channel_name.type = file
agent_name.channels.channel_name.dataDirs = ${log_path}/dataDir1, ${log_path}/dataDir2
agent_name.channels.channel_name.checkpointDir = ${exec_log_path}/stat_info_checkpointDir
agent_name.channels.channel_name.useDualCheckpoints = true
agent_name.channels.channel_name.backupCheckpointDir = /data/flume/backup/checkpoint
agent_name.channels.channel_name.capacity = 3000000
agent_name.channels.channel_name.transactionCapacity = 20000
agent_name.channels.channel_name.keep-alive = 5
# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX
# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name
3、Kafka Channel
將Kafka作為Channel存儲(chǔ),Kafka是分布式、可擴(kuò)展、高容錯(cuò)、高吞吐的分布式系統(tǒng),Kafka通過(guò)優(yōu)秀的架構(gòu)設(shè)計(jì)充分利用磁盤(pán)順序特性,在廉價(jià)的硬件條件下完成高效的消息發(fā)布和訂閱。
Memory Channel在使用的過(guò)程中受內(nèi)存容量的限制不能緩存大量的消息,并且如果Memory Channel中的消息沒(méi)來(lái)得及寫(xiě)入Sink,此時(shí)Agent出現(xiàn)故障就會(huì)造成數(shù)據(jù)丟失。File Channel雖然能夠緩存更多的消息,但如果緩存下來(lái)的消息還沒(méi)有寫(xiě)入Sink,此時(shí)Agent出現(xiàn)故障則File Channel中的消息不能被繼續(xù)使用,直到該Agent重新恢復(fù)才能夠繼續(xù)使用File Channel中的消息。Kafka Channel相對(duì)于Memory Channel和File Channel存儲(chǔ)容量更大、容錯(cuò)能力更強(qiáng),彌補(bǔ)了其他兩種Channel的短板,如果合理利用Kafka的性能,能夠達(dá)到事半功倍的效果。
有了Kafka Channel可以在日志收集層只配置Source組件和Kafka Channel組件,不需要再配置Sink組件,減少了日志收集層啟動(dòng)的進(jìn)程數(shù)并且有效降低服務(wù)器內(nèi)存、磁盤(pán)等資源使用率,日志匯聚層可以只配置Kafka Channel和Sink,不需要再配置Source,減少日志匯聚層的進(jìn)程數(shù),這樣的配置既能降低服務(wù)器的資源使用率又能減少Event在網(wǎng)絡(luò)之間的傳輸,有效提高日志采集系統(tǒng)的性能。
① 配置參數(shù)解析:
| 參數(shù) | 默認(rèn)值 | 描述 |
|---|---|---|
| type | org.apache.flume.channel.kafka.KafkaChannel | 類(lèi)型名稱(chēng) |
| kafka.bootstrap.servers | Kafka broker列表,格式為ip1:port1,ip2:port2…,建議配置多個(gè)值提高容錯(cuò)能力,多個(gè)值之間用逗號(hào)隔開(kāi) | |
| kafka.topic | flume-channel | topic名稱(chēng) |
| kafka.consumer.group.id | flume | Consumer組id, Kafka Channel使用 consumer.group.id 注冊(cè)到Kafka,該值是連接kafka集群的唯一值,同一組內(nèi)可以有多個(gè)Consumer,多個(gè)Consumer之間是互不干擾的,一個(gè)主題下的一條消息只能被同一組內(nèi)的一個(gè)Consumer消費(fèi),其中的一個(gè)Consumer消費(fèi)失敗其他的Consumer會(huì)繼續(xù)消費(fèi) 基于這個(gè)特性,可以有多個(gè)Agent的KafkaChannel使用相同的consumer.group.id,當(dāng)一個(gè)Agent運(yùn)行失敗則其他Agent可以繼續(xù)消費(fèi),很容易地提高了消息的容錯(cuò)能力 |
| parseAsFlumeEvent | true | 是否以Avro FlumeEvent模式寫(xiě)入到Kafka Channel中 如果寫(xiě)入到Kafka Channel中主題的Producer只有Flume Source,則該參數(shù)應(yīng)該設(shè)置為true 如果有其他Producer也同時(shí)在向同一主題寫(xiě)數(shù)據(jù)則該參數(shù)應(yīng)該設(shè)置為false Flume Source寫(xiě)入到Kafka的消息在Kafka外部需要使用flume-ng-sdk提供的org.apache. flume.source.avro.AvroFlumeEvent類(lèi)解析 |
| migrateZookeeperOffsets | true | 是否遷移Zookeeper中存儲(chǔ)的Consumer消費(fèi)的偏移量到Kafka中,主要是為了兼容Kafka0.9以下版本的Kafka Kafka 0.9以下版本Consumer消費(fèi)的偏移量保存在Zookeeper中 Kafka 0.9之后的版本開(kāi)始將偏移量保存到Kafka的一個(gè)主題中 |
| pollTimeout | 500毫秒 | 輪詢(xún)超時(shí)時(shí)間 |
| kafka.consumer.auto.offset.reset | latest | 當(dāng)Kafka中沒(méi)有Consumer消費(fèi)的初始偏移量或者當(dāng)前偏移量在Kafka中不存在(比如數(shù)據(jù)已經(jīng)被刪除)情況下,Consumer選擇從Kafka拉取消息的方式 earliest表示從最早的偏移量開(kāi)始拉取 latest表示從最新的偏移量開(kāi)始拉取 none表示如果沒(méi)有發(fā)現(xiàn)該Consumer組之前拉取的偏移量則拋出異常 |
| kafka.enable.auto.commit | alse | Consumer是否自動(dòng)提交偏移量 |
Kafka Channel相關(guān)操作在org.apache.flume.channel.kafka包的KafkaChannel類(lèi)定義,
kafka相關(guān)參數(shù)的默認(rèn)值在org.apache.kafka.clients.CommonClientConfigs包中的KafkaChannel-Configuration中。
Kafka的通用配置參數(shù)在配置文件中都以“kafka.”為前綴,針對(duì)Producer或者Consumer的相關(guān)配置以“kafka.producer. ”或者“kafka.consumer. ”為前綴,
源碼 KafkaChannelConfiguration 中相關(guān)默認(rèn)配置參數(shù)定義如下:
KAFKA_PREFIX = "kafka.";
KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer.";
DEFAULT_ACKS = "all";
DEFAULT_KEY_SERIALIZER ="org.apache.kafka.common.serialization.StringSerializer";
DEFAULT_VALUE_SERIAIZER ="org.apache.kafka.common.serialization.ByteArraySerializer";
DEFAULT_KEY_DESERIALIZER ="org.apache.kafka.common.serialization.StringDeserializer";
DEFAULT_VALUE_DESERIAIZER ="org.apache.kafka.common.serialization.ByteArrayDeserializer";
TOPIC_CONFIG = KAFKA_PREFIX + "topic";
BOOTSTRAP_SERVERS_CONFIG =KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
DEFAULT_TOPIC = "flume-channel";
DEFAULT_GROUP_ID = "flume";
POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout";
DEFAULT_POLL_TIMEOUT = 500;
KEY_HEADER = "key";
DEFAULT_AUTO_OFFSET_RESET = "earliest";
PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
DEFAULT_PARSE_AS_FLUME_EVENT = true;
PARTITION_HEADER_NAME = "partitionIdHeader";
STATIC_PARTITION_CONF = "defaultPartitionId";
MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;/*** Flume1.7以前版本默認(rèn)參數(shù)****/
BROKER_LIST_KEY = "metadata.broker.list";
REQUIRED_ACKS_KEY = "request.required.acks";
BROKER_LIST_FLUME_KEY = "brokerList";
//TOPIC = "topic";
GROUP_ID_FLUME = "groupId";
AUTO_COMMIT_ENABLED = "auto.commit.enable";
ZOOKEEPER_CONNECT = "zookeeper.connect";
ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
TIMEOUT = "timeout";
DEFAULT_TIMEOUT = "100";
CONSUMER_TIMEOUT = "consumer.timeout.ms";
READ_SMALLEST_OFFSET = "readSmallestOffset";
DEFAULT_READ_SMALLEST_OFFSET = false;
② 簡(jiǎn)單模板
# 命名 Agent 上的組件
agent_name.channels = channel_name
agent_name.sinks = sink_name
# channel
agent_name.channels.channel_name.type = org.apache.flume.channel.kafka.KafkaChannel
agent_name.channels.channel_name.kafka.bootstrap.servers = zkServer01:9092, zkServer02:9092
agent_name.channels.channel_name.kafka.topic = test_channel
agent_name.channels.channel_name.kafka.consumer.group.id = test-consumer
# sink
agent_name.sinks.sink_name.type = hdfs
XXX
XXX
# source | channel | sink 關(guān)聯(lián)
agent_name.sources.source_name.channels = channel_name
agent_name.sinks.sink_name.channel = channel_name
說(shuō)明:agent_name 沒(méi)有配置Source,只配置了Channel和Sink,使用的Channel類(lèi)型為Kafka Channel,主題名稱(chēng)為“test_channel”, consumer組id為“test-consumer”, Sink類(lèi)型為 hdfs 滾動(dòng)生成文件,對(duì)接的Channel為KafkaChannel channel_name。