spark-streaming與kafka的整合

1. 概述

在2.x中,spark有兩個用來與kafka整合的代碼,版本代號為0.80.10,由于在0.8,kafka有兩套消費者api,根據(jù)高級api得到了Receiver-based Approach,根據(jù)低級api得到了Direct Approach,而在0.10由于kafka只有一套消費者api了,所以也只有Direct Approach

2. Direct Approach

由于0.80.10Direct Approach的實現(xiàn)差不多,這里使用0.10來進行描述。該途徑由DirectKafkaInputDStream實現(xiàn)

private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) 

其中,LocationStrategy表示的是分配topic partition的consumer到executor的策略,有3個取值

  1. PreferConsistent: 均勻的分布在executor中。原理是使用TopicPartition的hash值來決定到哪個可用的executor上執(zhí)行
  2. PreferBrokers:偏向于將consumer分配到指定的topic partition所在的leader上;如果leader集中,會造成負載不均。如果可用的executor中沒有l(wèi)eader所在的主機,使用 PreferConsistent
  3. PreferFixed: 指定某些topic partition的consumer分配到指定的機器上,沒有指定的topic partition使用PreferConsistent

ConsumerStrategy用來指定消費的topic partition,也有三個子類來表示不同的策略:

  1. Subscribe:精確指定用來消費的topic
  2. SubscribePattern:使用正則表達式來指定消費的topic
  3. Assign: 精確指定用來消費的topic partition。上述2個都是消費topic的所有分區(qū),這個可以選擇性的消費部分分區(qū)

PerPartitionConfig只有一個實現(xiàn)類DefaultPerPartitionConfig,用來執(zhí)行反壓,控制每個分區(qū)的消費速率

2.1 執(zhí)行過程

分為2步,一是DirectKafkaInputDStream計算每個分區(qū)及其消費開始的offset與結(jié)束的offset(不包含),二是將上述信息封裝到KafkaRDD中消費每個topic partition,一個topic partition對應(yīng)一個task

2.2 常用的配置

配置分為兩種,一種是傳給ConsumerStrategy,為Kafka Consumer相關(guān)的配置,還有一種是傳給SparkConf的。
傳給ConsumerStrategykafkaParams

變量名 描述 其它
heartbeat.interval.ms 客戶端與broker的心跳間隔 不小于batchDuration,在有window操作時,要不小于slideDuration
session.timeout.ms 在該時間內(nèi),如果收不到客戶端心跳,會移除該客戶端 推薦3 * heartbeat.interval.ms,但要在范圍(group.min.session.timeout.ms, group.max.session.timeout.ms]
metadata.broker.list kafka集群的broker列表,用來獲取整個集群拓撲,不用完整 等價于bootstrap.servers
enable.auto.commit 是否自動提交消費的offset 在executor端,為false,通常也要設(shè)置為false
auto.commit.interval.ms 自動提交消費的offset的周期 默認為5s
group.id 消費組的id 盡量設(shè)置
auto.offset.reset 當(dāng)指定消費的offset不存在時,采取的行為 在executor端為none
receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data 默認65536,設(shè)置的時候要大于該值

注意上述的默認值要根據(jù)使用的版本來確定。
傳給SparkConf

變量名 描述 其它
spark.streaming.kafka.consumer.poll.ms 消費者獲取消息等待的最大時間 默認值spark.network.timeout
spark.streaming.kafka.allowNonConsecutiveOffsets 是否允許消費的offset不連續(xù),通常都是連續(xù)的,除非中間的message被刪掉了 默認值false
spark.streaming.kafka.consumer.cache.enabled 在executor端將消費指定topic partition的consumer緩存起來,這個是executor級別的緩存,如果沒有使用動態(tài)資源分配,可以用來重用consumer 默認值true
spark.streaming.kafka.consumer.cache.maxCapacity 每個executor上能緩存的consumer的最大值 默認值64
spark.streaming.kafka.consumer.cache.minCapacity 每個executor上能緩存的consumer的最大值 默認值16
2.3 at-least-once與 exactly-once語義

用戶有三種方式來保存每個分區(qū)消費的offset,分別是checkpoint、kafka保存、用戶自己處理

  1. checkpoint:driver端的容錯,最不靠譜,由于任務(wù)執(zhí)行與checkpoint是異步執(zhí)行的,如果消息沒有消費完程序報錯了,但是已經(jīng)checkpoint,恢復(fù)后則會丟失數(shù)據(jù)
  1. kafka保存:設(shè)置enable.auto.commit為false,需要調(diào)用在業(yè)務(wù)邏輯后加上如下代碼。下面的代碼注入的offsetRanges會在下次生成任務(wù)的時候被提交。這種方式不會丟數(shù)據(jù),當(dāng)輸出端不是冪等的,就只能保證at-least-once,因為輸出與保存offset不是一個事務(wù)
val kafkaDStream = KafkaUtils.createDirectStream(...)
// 寫法一開始
// 一大堆流式處理代碼
// 最后
kafkaDStream .foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
  kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// 寫法一結(jié)束
// 寫法二
kafkaDStream .transform{ rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
  kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  rdd
}.一大堆流式處理代碼
  1. 用戶自己處理,當(dāng)輸出端不冪等,但是支持事務(wù)時,將保存offset與輸出封裝到一個事務(wù)中可以實現(xiàn)exactly-once,官方文檔streaming-kafka-0-10-integration里提供一種寫法,筆者在這里也提供一種,供讀者參考
// 故障恢復(fù)
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream(...)
var offsetBroadCast: Broadcast[Array[OffsetRange]] = null
    messages.transform(rdd => {
      val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetBroadCast = ssc.sparkContext.broadcast(ranges)
      rdd
    }).map(t => (t.key(), t.value())).foreachRDD(rdd => {
      val partitions = rdd.getNumPartitions
      val offset = offsetBroadCast.value
      rdd.mapPartitionsWithIndex((mapId, it) => Iterator((mapId, it)))
        .foreachPartition(it => {
          val (mapId, realIt) = it.buffered.head
          val commitOffRange = offset(mapid)
          // 開啟事務(wù)
          // 用 realIt 更新結(jié)果
          // 用 commitOffRange 更新對應(yīng)分區(qū)的offset
          // 提交事務(wù)
        })
        offsetBroadCast.destroy(true)
    })

3. 與receiver-based的差別

這里的receiver指的是ReliableKafkaReceiver并且開啟了WAL,如果不開啟WAL則可能會導(dǎo)致數(shù)據(jù)丟失,存儲數(shù)據(jù)的過程如下:

  1. 添加到BlockGenerator的buffer中
  2. 將buffer中的數(shù)據(jù)寫入到WAL,寫入到BlockManager
  3. 通知給ReceiverTracker,并把元數(shù)據(jù)寫入到WAL
  4. 更新offset到zookeeper

整個過程能保證數(shù)據(jù)不丟,但是只能保證at-least-once語義。差別如下:

  1. receiver-based需要存儲一份數(shù)據(jù),kafka里面也有一份,這樣的數(shù)據(jù)冗余沒有必要
  2. Direct approach少了receiver,整個架構(gòu)更簡單
  3. receiver-based的topic partition offset只能由zookeeper管理,Direct approach更靈活
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容