1. 概述
在2.x中,spark有兩個用來與kafka整合的代碼,版本代號為0.8和0.10,由于在0.8,kafka有兩套消費者api,根據(jù)高級api得到了Receiver-based Approach,根據(jù)低級api得到了Direct Approach,而在0.10由于kafka只有一套消費者api了,所以也只有Direct Approach
2. Direct Approach
由于0.8與0.10的Direct Approach的實現(xiàn)差不多,這里使用0.10來進行描述。該途徑由DirectKafkaInputDStream實現(xiàn)
private[spark] class DirectKafkaInputDStream[K, V](
_ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
ppc: PerPartitionConfig
)
其中,LocationStrategy表示的是分配topic partition的consumer到executor的策略,有3個取值
PreferConsistent: 均勻的分布在executor中。原理是使用TopicPartition的hash值來決定到哪個可用的executor上執(zhí)行PreferBrokers:偏向于將consumer分配到指定的topic partition所在的leader上;如果leader集中,會造成負載不均。如果可用的executor中沒有l(wèi)eader所在的主機,使用PreferConsistentPreferFixed: 指定某些topic partition的consumer分配到指定的機器上,沒有指定的topic partition使用PreferConsistent
ConsumerStrategy用來指定消費的topic partition,也有三個子類來表示不同的策略:
Subscribe:精確指定用來消費的topicSubscribePattern:使用正則表達式來指定消費的topicAssign: 精確指定用來消費的topic partition。上述2個都是消費topic的所有分區(qū),這個可以選擇性的消費部分分區(qū)
PerPartitionConfig只有一個實現(xiàn)類DefaultPerPartitionConfig,用來執(zhí)行反壓,控制每個分區(qū)的消費速率
2.1 執(zhí)行過程
分為2步,一是DirectKafkaInputDStream計算每個分區(qū)及其消費開始的offset與結(jié)束的offset(不包含),二是將上述信息封裝到KafkaRDD中消費每個topic partition,一個topic partition對應(yīng)一個task
2.2 常用的配置
配置分為兩種,一種是傳給ConsumerStrategy,為Kafka Consumer相關(guān)的配置,還有一種是傳給SparkConf的。
傳給ConsumerStrategy的kafkaParams
| 變量名 | 描述 | 其它 |
|---|---|---|
| heartbeat.interval.ms | 客戶端與broker的心跳間隔 | 不小于batchDuration,在有window操作時,要不小于slideDuration
|
| session.timeout.ms | 在該時間內(nèi),如果收不到客戶端心跳,會移除該客戶端 | 推薦3 * heartbeat.interval.ms,但要在范圍(group.min.session.timeout.ms, group.max.session.timeout.ms]
|
| metadata.broker.list | kafka集群的broker列表,用來獲取整個集群拓撲,不用完整 | 等價于bootstrap.servers |
| enable.auto.commit | 是否自動提交消費的offset | 在executor端,為false,通常也要設(shè)置為false |
| auto.commit.interval.ms | 自動提交消費的offset的周期 | 默認為5s |
| group.id | 消費組的id | 盡量設(shè)置 |
| auto.offset.reset | 當(dāng)指定消費的offset不存在時,采取的行為 | 在executor端為none |
| receive.buffer.bytes | The size of the TCP receive buffer (SO_RCVBUF) to use when reading data | 默認65536,設(shè)置的時候要大于該值 |
注意上述的默認值要根據(jù)使用的版本來確定。
傳給SparkConf的
| 變量名 | 描述 | 其它 |
|---|---|---|
| spark.streaming.kafka.consumer.poll.ms | 消費者獲取消息等待的最大時間 | 默認值spark.network.timeout |
| spark.streaming.kafka.allowNonConsecutiveOffsets | 是否允許消費的offset不連續(xù),通常都是連續(xù)的,除非中間的message被刪掉了 | 默認值false |
| spark.streaming.kafka.consumer.cache.enabled | 在executor端將消費指定topic partition的consumer緩存起來,這個是executor級別的緩存,如果沒有使用動態(tài)資源分配,可以用來重用consumer | 默認值true |
| spark.streaming.kafka.consumer.cache.maxCapacity | 每個executor上能緩存的consumer的最大值 | 默認值64 |
| spark.streaming.kafka.consumer.cache.minCapacity | 每個executor上能緩存的consumer的最大值 | 默認值16 |
2.3 at-least-once與 exactly-once語義
用戶有三種方式來保存每個分區(qū)消費的offset,分別是checkpoint、kafka保存、用戶自己處理
- checkpoint:driver端的容錯,最不靠譜,由于任務(wù)執(zhí)行與checkpoint是異步執(zhí)行的,如果消息沒有消費完程序報錯了,但是已經(jīng)checkpoint,恢復(fù)后則會丟失數(shù)據(jù)
- kafka保存:設(shè)置
enable.auto.commit為false,需要調(diào)用在業(yè)務(wù)邏輯后加上如下代碼。下面的代碼注入的offsetRanges會在下次生成任務(wù)的時候被提交。這種方式不會丟數(shù)據(jù),當(dāng)輸出端不是冪等的,就只能保證at-least-once,因為輸出與保存offset不是一個事務(wù)
val kafkaDStream = KafkaUtils.createDirectStream(...)
// 寫法一開始
// 一大堆流式處理代碼
// 最后
kafkaDStream .foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// 寫法一結(jié)束
// 寫法二
kafkaDStream .transform{ rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offstietRanges
kafkaDStream .asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
rdd
}.一大堆流式處理代碼
- 用戶自己處理,當(dāng)輸出端不冪等,但是支持事務(wù)時,將保存offset與輸出封裝到一個事務(wù)中可以實現(xiàn)
exactly-once,官方文檔streaming-kafka-0-10-integration里提供一種寫法,筆者在這里也提供一種,供讀者參考
// 故障恢復(fù)
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream(...)
var offsetBroadCast: Broadcast[Array[OffsetRange]] = null
messages.transform(rdd => {
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetBroadCast = ssc.sparkContext.broadcast(ranges)
rdd
}).map(t => (t.key(), t.value())).foreachRDD(rdd => {
val partitions = rdd.getNumPartitions
val offset = offsetBroadCast.value
rdd.mapPartitionsWithIndex((mapId, it) => Iterator((mapId, it)))
.foreachPartition(it => {
val (mapId, realIt) = it.buffered.head
val commitOffRange = offset(mapid)
// 開啟事務(wù)
// 用 realIt 更新結(jié)果
// 用 commitOffRange 更新對應(yīng)分區(qū)的offset
// 提交事務(wù)
})
offsetBroadCast.destroy(true)
})
3. 與receiver-based的差別
這里的receiver指的是ReliableKafkaReceiver并且開啟了WAL,如果不開啟WAL則可能會導(dǎo)致數(shù)據(jù)丟失,存儲數(shù)據(jù)的過程如下:
- 添加到
BlockGenerator的buffer中- 將buffer中的數(shù)據(jù)寫入到WAL,寫入到
BlockManager- 通知給
ReceiverTracker,并把元數(shù)據(jù)寫入到WAL- 更新offset到zookeeper
整個過程能保證數(shù)據(jù)不丟,但是只能保證at-least-once語義。差別如下:
- receiver-based需要存儲一份數(shù)據(jù),kafka里面也有一份,這樣的數(shù)據(jù)冗余沒有必要
Direct approach少了receiver,整個架構(gòu)更簡單- receiver-based的topic partition offset只能由zookeeper管理,
Direct approach更靈活