kafka和flume的區(qū)別與

(1)kafka和flume都是日志系統(tǒng)。kafka是分布式消息中間件,自帶存儲,提供push和pull存取數(shù)據(jù)功能。flume分為agent(數(shù)據(jù)采集器)[source channel sink]。
(2)kafka做日志緩存應(yīng)該是更為合適的,但是 flume的數(shù)據(jù)采集部分做的很好,可以定制很多數(shù)據(jù)源,減少開發(fā)量。所以比較流行flume+kafka模式,如果為了利用flume寫hdfs的能力,也可以采用kafka+flume的方式。

采集層 主要可以使用Flume, Kafka兩種技術(shù)。
Flume:Flume 是管道流方式,提供了很多的默認(rèn)實現(xiàn),讓用戶通過參數(shù)部署,及擴(kuò)展API.
Kafka:Kafka是一個可持久化的分布式的消息隊列。
Kafka 是一個非常通用的系統(tǒng)。你可以有許多生產(chǎn)者和很多的消費(fèi)者共享多個主題Topics。相比之下,Flume是一個專用工具被設(shè)計為旨在往HDFS,HBase發(fā)送數(shù)據(jù)。它對HDFS有特殊的優(yōu)化,并且集成了Hadoop的安全特性。所以,Cloudera 建議如果數(shù)據(jù)被多個系統(tǒng)消費(fèi)的話,使用kafka;如果數(shù)據(jù)被設(shè)計給Hadoop使用,使用Flume。
正如你們所知Flume內(nèi)置很多的source和sink組件。Kafka明顯有一個更小的生產(chǎn)消費(fèi)者生態(tài)系統(tǒng),使用Kafka意味著你準(zhǔn)備好了編寫你自己的生產(chǎn)者和消費(fèi)者代碼。如果已經(jīng)存在的Flume Sources和Sinks滿足你的需求,并且你更喜歡不需要任何開發(fā)的系統(tǒng),請使用Flume。
Flume可以使用攔截器實時處理數(shù)據(jù)。這些對數(shù)據(jù)屏蔽或者過量是很有用的。Kafka需要外部的流處理系統(tǒng)才能做到。
Kafka和Flume都是可靠的系統(tǒng),通過適當(dāng)?shù)呐渲媚鼙WC零數(shù)據(jù)丟失。然而,F(xiàn)lume不支持副本事件。于是,如果Flume代理的一個節(jié)點奔潰了,即使使用了可靠的文件管道方式,你也將丟失這些事件直到你恢復(fù)這些磁盤。如果你需要一個高可靠行的管道,那么使用Kafka是個更好的選擇。
Flume和Kafka可以很好地結(jié)合起來使用。如果你的設(shè)計需要從Kafka到Hadoop的流數(shù)據(jù),使用Flume代理并配置Kafka的Source讀取數(shù)據(jù)也是可行的:你沒有必要實現(xiàn)自己的消費(fèi)者。你可以直接利用Flume與HDFS及HBase的結(jié)合的所有好處。你可以使用Cloudera Manager對消費(fèi)者的監(jiān)控,并且你甚至可以添加攔截器進(jìn)行一些流處理。
Flume和Kafka可以結(jié)合起來使用。通常會使用Flume + Kafka的方式。其實如果為了利用Flume已有的寫HDFS功能,也可以使用Kafka + Flume的方式。

flume+kafka flume接收kafka的數(shù)據(jù) 寫入hive和hbase
配置參考:
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.If you have multiple Kafka sources running, you can configure them with the same Consumer Groupso each will read a unique set of partitions for the topics.

Property Name Default Description

channels
type The component type name, needs to beorg.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agentsindicates that they are part of the same consumer group
kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex Regex that defines set of topics the source is subscribed on. This property has higher prioritythankafka.topics and overrideskafka.topics if exists.
batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to ChannelThe batch will be written whenever the first of size and time will be reached.
..........
Other Kafka Consumer Properties These properties are used to configure the Kafka Consumer. Any consumer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer .eg:kafka.consumer.auto.offset.reset

flume-ng-node Application main方法解析shell命令 加載configuration 形成context上下文,執(zhí)行LifecycleAware接口的start方法,kafkasource 繼承了abstractsource 實現(xiàn)了 configurable和pollablesource(輪詢),重寫了LifecycleAware的start和stop方法,configurable的configure方法,以及pollablesource的process方法。執(zhí)行LifecycleAware的方法時實際上運(yùn)行的是kafkasource的start方法,根據(jù)kafka配置建立消費(fèi)者連接和kafkastream流。在start之前加載了配置文件時已經(jīng)將重載后的configure執(zhí)行了一遍,更改了一些source的配置,如果沒有的話會有默認(rèn)配置。
之后由sourceRunner的子類PollableSourceRunner驅(qū)動kafkasource運(yùn)行(PollableSourceRunner啟動了一個PollingRunner線程,該線程調(diào)用了kafkasource的process方法),process方法將stream的流數(shù)據(jù)讀取轉(zhuǎn)換為channel所需的event,寫入channel中。
當(dāng)時間達(dá)到一定的時間間隔或者批處理的事件條數(shù)達(dá)到一定數(shù)目時,將eventlist一次性發(fā)往channel(由absractsource的getChannelProcessor返回的processor調(diào)用processEventBatch方法發(fā)送)。
核心代碼為:

while (eventList.size() < batchUpperLimit &&
              System.currentTimeMillis() < maxBatchEndTime) {
.....
event = EventBuilder.withBody(eventBody, headers);
        eventList.add(event);
....
}
if (eventList.size() > 0) {
        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
        counter.addToEventReceivedCount((long) eventList.size());
        getChannelProcessor().processEventBatch(eventList);
        counter.addToEventAcceptedCount(eventList.size());
        if (log.isDebugEnabled()) {
          log.debug("Wrote {} events to channel", eventList.size());
        }
        eventList.clear();

基本配置為:

public class KafkaSourceConstants {

  public static final String KAFKA_PREFIX = "kafka.";
  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
  public static final String DEFAULT_KEY_DESERIALIZER =
      "org.apache.kafka.common.serialization.StringDeserializer";
  public static final String DEFAULT_VALUE_DESERIALIZER =
      "org.apache.kafka.common.serialization.ByteArrayDeserializer";
  public static final String BOOTSTRAP_SERVERS =
      KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
  public static final String TOPICS = KAFKA_PREFIX + "topics";
  public static final String TOPICS_REGEX = TOPICS + "." + "regex";
  public static final String DEFAULT_AUTO_COMMIT =  "false";
  public static final String BATCH_SIZE = "batchSize";
  public static final String BATCH_DURATION_MS = "batchDurationMillis";
  public static final int DEFAULT_BATCH_SIZE = 1000;
  public static final int DEFAULT_BATCH_DURATION = 1000;
  public static final String DEFAULT_GROUP_ID = "flume";

  public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;

  public static final String AVRO_EVENT = "useFlumeEventFormat";
  public static final boolean DEFAULT_AVRO_EVENT = false;

  /* Old Properties */
  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
  public static final String TOPIC = "topic";
  public static final String OLD_GROUP_ID = "groupId";

  // flume event headers
  public static final String TOPIC_HEADER = "topic";
  public static final String KEY_HEADER = "key";
  public static final String TIMESTAMP_HEADER = "timestamp";
  public static final String PARTITION_HEADER = "partition";

}

kafkasource根據(jù)消費(fèi)者建立的數(shù)據(jù)流datastream讀取,每隔一段批處理時間或者數(shù)據(jù)達(dá)到一定數(shù)目將會往channel寫數(shù)據(jù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,537評論 19 139
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,538評論 1 15
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,583評論 0 34
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,981評論 4 54
  • 愛是一件很美好的事兒, 很愛很愛卻是一件很糟糕的事兒。 有時候,雖然能想明白, 但心里就是接受不了。 大道理人人都...
    夕子寧洋閱讀 133評論 0 0

友情鏈接更多精彩內(nèi)容