一、從kafka讀數(shù)據(jù)保證不丟失的方案
?0.8版本
lines.foreachRDD( rdd =>{
//業(yè)務(wù)邏輯代碼處理
// rdd.map(_._2).flatMap(_.split(","))
rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map( o =>{
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//更新偏移量就可以了。
})
})
lines對(duì)象里面有偏移量,但是凡事代碼對(duì)這個(gè)對(duì)象做了任何算子的操作(比如map),偏移量就丟失了.所以調(diào)用了createDirectStream之后,只要在多調(diào)用一個(gè)其他的算子,偏移量就丟失了,只能調(diào)用foreachRDD,然后進(jìn)行業(yè)務(wù)邏輯處理.
但是有一個(gè)小小的遺憾,就是KafkaUtils.createDirectStream返回的其實(shí)是一個(gè)DStream,但是調(diào)用了foreachRDD之后變成了RDD把streaming的處理,變成了spark core 去處理了..雖然問(wèn)題不大,但是streaming特有的算子就不能用了,比如 window、updatestatebykey也就是說(shuō),他不能適用100%的場(chǎng)景
?那么怎么寫(xiě)可以100%適用呢?
整體思想:如果我們能在讀取kafka的時(shí)候,傳遞進(jìn)去offset的信息,并且保證這個(gè)offset的更新跟數(shù)據(jù)處理成功與否是原子性的,就能保證數(shù)據(jù)不丟失

1、擴(kuò)展createDirectStream方法,新建KafkaManager
2、設(shè)置一個(gè)監(jiān)聽(tīng)器,幫我們完成偏移量的提交(因?yàn)?.8默認(rèn)offset保存在zk,所以我們需要調(diào)用kafka往zk寫(xiě)offset的方法,重點(diǎn)是什么時(shí)候?qū)?
3、在讀取kafka的數(shù)據(jù)之前,先把偏移量更新到zk(setOrUpdateOffsets),然后在讀取偏移量(kc.getConsumerOffsets)
?注意0.8版本是保存在zk的,所以我們只需要把offset寫(xiě)入zk,然后調(diào)用kafka的方法去zk讀取就好了
4、保證提交偏移量和數(shù)據(jù)處理邏輯的原子性
NOTE:最后這個(gè)方案,在架構(gòu)設(shè)計(jì)上有一個(gè)缺點(diǎn):
如果企業(yè)中spark streaming太多(上千個(gè)-->幾十個(gè)其實(shí)是沒(méi)問(wèn)題的),會(huì)頻繁的往zk寫(xiě)數(shù)據(jù),這對(duì)zk來(lái)說(shuō)是一個(gè)高并發(fā)的場(chǎng)景,而zk是不適合高并發(fā)的,所以用redis代替是可以的
分別解釋一下:
新建KafkaManager類,新建一個(gè)createDirectStream方法和setOrUpdateOffsets方法
class KakfaManager {
def createDirectStream(){
// 在zookeeper上讀取offsets前先根據(jù)實(shí)際情況更新offsets
setOrUpdateOffsets()
...
//獲取offset
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
//還是調(diào)用的sparkStermaing的API
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
}
setOrUpdateOffsets(){
...
//根據(jù)實(shí)際消費(fèi)情況更新消費(fèi)offsets
kc.setConsumerOffsets(groupId, offsets)
...
}
監(jiān)聽(tīng)器(我們只需要把offset放入zk就可以了)
class MyListener implements StreamingListener(){
@Override
//batchCompleted對(duì)象里面是有任務(wù)的offset信息
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
//提交偏移量
//這個(gè)方法是 0.8的依賴提供的API
//這樣話,就自動(dòng)幫我把offset存到了ZK
kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
/**
* 批次完成時(shí)調(diào)用的方法
* @param batchCompleted
*
* batchCompleted 對(duì)象里面帶有了偏移量對(duì)信息,所以我提交偏移量對(duì)時(shí)候
* 就是從這個(gè)對(duì)象里面讀取offset就可以了。
*
* 注意:任務(wù)運(yùn)行完了,但是有可能是成功的,也有可能是失敗的
*/
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {//這個(gè)對(duì)象里面是有任務(wù)的offset信息
//任務(wù)運(yùn)行完了以后會(huì)(任務(wù)成功,)提交偏移量
//
//如果本批次里面有任務(wù)失敗了,那么就終止偏移量提交
scala.collection.immutable.Map<Object, OutputOperationInfo> opsMap = batchCompleted.batchInfo().outputOperationInfos();
//
Map<Object, OutputOperationInfo> javaOpsMap = JavaConversions.mapAsJavaMap(opsMap);
for (Map.Entry<Object, OutputOperationInfo> entry : javaOpsMap.entrySet()) {
//failureReason不等于None(是scala中的None),說(shuō)明有異常,不保存offset
//OutputOperationInfo task
//task -> failureReason
if (!"None".equalsIgnoreCase(entry.getValue().failureReason().toString())) {
return;
}
}
// long batchTime = batchCompleted.batchInfo().batchTime().milliseconds();
/**
* topic,分區(qū),偏移量
*/
Map<String, Map<Integer, Long>> offset = getOffset(batchCompleted);
for (Map.Entry<String, Map<Integer, Long>> entry : offset.entrySet()) {
String topic = entry.getKey();
Map<Integer, Long> paritionToOffset = entry.getValue();
//我只需要在這里把offset放入到zookeeper就可以了。
for(Map.Entry<Integer,Long> p2o : paritionToOffset.entrySet()){
Map<TopicAndPartition, Object> map = new HashMap<TopicAndPartition, Object>();
TopicAndPartition topicAndPartition =
new TopicAndPartition(topic,p2o.getKey());
map.put(topicAndPartition,p2o.getValue());
scala.collection.immutable.Map<TopicAndPartition, Object>
topicAndPartitionObjectMap = TypeHelper.toScalaImmutableMap(map);
//提交偏移量
//這個(gè)方法是 0.8的依賴提供的API
//這樣話,就自動(dòng)幫我把offset存到了ZK
kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap);
}
}
}
private Map<String, Map<Integer, Long>> getOffset(StreamingListenerBatchCompleted batchCompleted) {
Map<String, Map<Integer, Long>> map = new HashMap<>();
scala.collection.immutable.Map<Object, StreamInputInfo> inputInfoMap =
batchCompleted.batchInfo().streamIdToInputInfo();
Map<Object, StreamInputInfo> infos = JavaConversions.mapAsJavaMap(inputInfoMap);
infos.forEach((k, v) -> {
Option<Object> optOffsets = v.metadata().get("offsets");
if (!optOffsets.isEmpty()) {
Object objOffsets = optOffsets.get();
if (List.class.isAssignableFrom(objOffsets.getClass())) {
List<OffsetRange> scalaRanges = (List<OffsetRange>) objOffsets;
Iterable<OffsetRange> ranges = JavaConversions.asJavaIterable(scalaRanges);
for (OffsetRange range : ranges) {
if (!map.containsKey(range.topic())) {
map.put(range.topic(), new HashMap<>());
}
map.get(range.topic()).put(range.partition(), range.untilOffset());
}
}
}
});
return map;
}
}
0.10版本
subscribe方式
1、創(chuàng)建一個(gè)監(jiān)聽(tīng)器,繼承StreamingListener,在onBatchCompleted方法中去提交offset
//獲取數(shù)據(jù)源(主題里面讀取offset)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//設(shè)置監(jiān)聽(tīng)器
ssc.addStreamingListener(new MyListener(stream))
val result = stream.map(_.value()).flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
result.foreachRDD( rdd =>{
/**
* 其它的操作
*/
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
if(offsetRanges != null){
提交偏移量(Kafka)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
}
因?yàn)?.10版本的kafka的offset默認(rèn)已經(jīng)不下zk,而是寫(xiě)kafka了(_consumer_offset)
如何保證數(shù)據(jù)exactly-once語(yǔ)義
我們程序是通過(guò)kafka的offset去讀取數(shù)據(jù),我們一般是處理數(shù)據(jù),然后寫(xiě)offset,
正常情況是沒(méi)問(wèn)題的,但是如果處理完了,但是offset沒(méi)寫(xiě)成功,或者處理失敗,offset寫(xiě)成功了
就出現(xiàn)了,重復(fù)消費(fèi),或者丟數(shù)據(jù)的情況;
所以原則上就是保證事務(wù)或者保證原子性,就是處理寫(xiě)庫(kù)操作(保存mysql、HBase)和寫(xiě)offset操作要么一起成功,要么一起失敗就行了
還有就是設(shè)計(jì)成冪等操作,重復(fù)了也無(wú)所謂
import scalikejdbc.{ConnectionPool, DB, _}
//第一步:在Driver端創(chuàng)建數(shù)據(jù)庫(kù)連接池
ConnectionPool.singleton("jdbc:mysql://**", "", "")
//第二步:從數(shù)據(jù)庫(kù)讀取offset
//1)初次啟動(dòng)或重啟時(shí),從指定的Partition、Offset構(gòu)建TopicPartition
//2)運(yùn)行過(guò)程中,每個(gè)Partition、Offset保存在內(nèi)部currentOffsets = Map[TopicPartition, Long]()變量中
//3)后期Kafka Topic分區(qū)動(dòng)態(tài)擴(kuò)展,在運(yùn)行過(guò)程中不能自動(dòng)感知
val initOffset=DB.readOnly(implicit session=>{
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
.map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
//第三步:CreateDirectStream
//從指定的Topic、Partition、Offset開(kāi)始消費(fèi)
val sourceDStream =KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
)
sourceDStream.foreachRDD(rdd =>{
//業(yè)務(wù)處理
//在Driver端存儲(chǔ)數(shù)據(jù)、提交Offset
//結(jié)果存儲(chǔ)與Offset提交在同一事務(wù)中原子執(zhí)行
//這里將偏移量保存在Mysql中
//事務(wù)
DB.localTx(
//儲(chǔ)存結(jié)果到mysql
//儲(chǔ)存offset到mysql
)
})
參考文檔:奈學(xué)教育