[TOC]
SparkStreaming基于kafka獲取數(shù)據(jù)的方式,主要有倆種,即Receiver和Direct,基于Receiver的方式,是SparkStreaming給我們提供了kafka訪問的高層api的封裝,而基于Direct的方式,就是直接訪問,在SparkSteaming中直接去操作kafka中的數(shù)據(jù),不需要前面的高層api的封裝。而Direct的方式,可以對(duì)kafka進(jìn)行更好的控制!同時(shí)性能也更好。
1.Spark streaming基于kafka以Receiver方式獲取數(shù)據(jù)
實(shí)際上做kafka receiver的時(shí)候,通過receiver來獲取數(shù)據(jù),這個(gè)時(shí)候,kafka receiver是使用的kafka高層次的comsumer api來實(shí)現(xiàn)的。receiver會(huì)從kafka中獲取數(shù)據(jù),然后把它存儲(chǔ)到我們具體的Executor內(nèi)存中。然后Spark streaming也就是driver中,會(huì)根據(jù)這獲取到的數(shù)據(jù),啟動(dòng)job去處理。

1)在通過kafka receiver去獲取kafka的數(shù)據(jù),在正在獲取數(shù)據(jù)的過程中,這臺(tái)機(jī)器有可能崩潰了。如果來不及做備份,數(shù)據(jù)就會(huì)丟失,切換到另外一臺(tái)機(jī)器上,也沒有相關(guān)數(shù)據(jù)。這時(shí)候,為了數(shù)據(jù)安全,采用WAL的方式。write ahead log,預(yù)寫日志的方式會(huì)同步的將接收到的kafka數(shù)據(jù),寫入到分布式文件系統(tǒng)中。但是預(yù)寫日志的方式消耗時(shí)間,所以存儲(chǔ)時(shí)建議Memory_and_Disc2.如果是寫到hdfs,會(huì)自動(dòng)做副本。如果是寫到本地,這其實(shí)有個(gè)風(fēng)險(xiǎn),就是如果這臺(tái)機(jī)器崩潰了,再想恢復(fù)過來,這個(gè)是需要時(shí)間的。
2)我們的kafka receiver接收數(shù)據(jù)的時(shí)候,通過線程或者多線程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的數(shù)據(jù),也是通過線程并發(fā)的方式去獲取的不同的partition,例如用五條線程同時(shí)去讀取kafka中的topics中的不同的partition數(shù)據(jù),這時(shí)你這個(gè)讀取數(shù)據(jù)的并發(fā)線程數(shù),和RDD實(shí)際處理數(shù)據(jù)的并發(fā)線程數(shù)是沒任何關(guān)系的。因?yàn)楂@取數(shù)據(jù)時(shí)都還沒產(chǎn)生RDD呢。RDD是Driver端決定產(chǎn)生RDD的。
3)默認(rèn)情況下,一個(gè)Executor中是不是只有一個(gè)receiver去接收kafka中的數(shù)據(jù)。那能不能多找一些Executor去更高的并發(fā)度,就是使用更多的機(jī)器去接收數(shù)據(jù),當(dāng)然可以,基于kafa的api去創(chuàng)建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的數(shù)據(jù),最后你計(jì)算的時(shí)候,再把他優(yōu)聯(lián)就行了。其實(shí)這是非常靈活的,因?yàn)榭梢宰杂傻慕M合。
【代碼實(shí)戰(zhàn)】
producer端:
public class SparkStreamingDataManuallyProducerForKafka extends Thread
{
static String[] channelNames = new String[]{
"Spark","Scala","Kafka","Flink","Hadoop","Storm",
"Hive","Impala","HBase","ML"
};
static String[] actionNames = new String[]{"View", "Register"};
private String topic; //發(fā)送給Kafka的數(shù)據(jù)的類別
private Producer<Integer, String> producerForKafka;
private static String dateToday;
private static Random random;
public SparkStreamingDataManuallyProducerForKafka(String topic){
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
this.topic = topic;
random = new Random();
Properties conf = new Properties();
conf.put("metadata.broker.list","node03:9092,node04:9092,node05:9092");
conf.put("serializer.class", StringEncoder.class.getName());
producerForKafka = new Producer<Integer, String>
(new ProducerConfig(conf)) ;
}
@Override
public void run() {
int counter = 0;
while(true){
counter++;
String userLog = userlogs();
//System.out.println("product:"+userLog+" ");
producerForKafka.send(new KeyedMessage<Integer, String>
(topic, userLog));
if(0 == counter%500){
counter = 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main( String[] args )
{
new SparkStreamingDataManuallyProducerForKafka("kfk").start();
}
private static String userlogs() {
StringBuffer userLogBuffer = new StringBuffer("");
int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
long timestamp = new Date().getTime();
Long userID = 0L;
long pageID = 0L;
//隨機(jī)生成的用戶ID
if(unregisteredUsers[random.nextInt(8)] == 1) {
userID = null;
} else {
userID = (long) random.nextInt((int) 2000);
}
//隨機(jī)生成的頁面ID
pageID = random.nextInt((int) 2000);
//隨機(jī)生成Channel
String channel = channelNames[random.nextInt(10)];
//隨機(jī)生成action行為
String action = actionNames[random.nextInt(2)];
userLogBuffer.append(dateToday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action);
// .append("\n");
return userLogBuffer.toString();
}
}
consumer端:
public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("SparkStreamingOnKafkaReceiver")
.setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,
Durations.seconds(5));
jsc.checkpoint("hdfs://node02:8020/checkpoint");
Map<String, Integer> topicConsumerConcurrency =
new HashMap<String, Integer>();
topicConsumerConcurrency.put("kfk", 1);
JavaPairReceiverInputDStream<String,String> lines =
KafkaUtils.createStream(jsc,
"node02:2181,node03:2181,node04:2181",
"MyFiestConsumerGroup", topicConsumerConcurrency);
JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<Tuple2<String,String>, String>() {
//如果是Scala,由于SAM轉(zhuǎn)換,所以可以寫成val words =
//lines.flatMap { line => line.split(" ")}
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String,String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair
(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey
(new Function2<Integer, Integer, Integer>() {
//對(duì)相同的Key,進(jìn)行Value的累計(jì)(包括Local和Reducer級(jí)別同時(shí)Reduce)
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
【結(jié)果展示】
-------------------------------------------
Time: 1488815425000 ms
-------------------------------------------
(273,1)
(1148,4)
(1119,2)
(1816,1)
(312,1)
(62,1)
(1184,1)
(1625,1)
(1566,1)
(1488815421523,1)
...
2.sparkStreaming基于kafka的Direct詳解
Direct方式特點(diǎn):
1)Direct的方式是會(huì)直接操作kafka底層的元數(shù)據(jù)信息,這樣如果計(jì)算失敗了,可以把數(shù)據(jù)重新讀一下,重新處理。即數(shù)據(jù)一定會(huì)被處理。拉數(shù)據(jù),是RDD在執(zhí)行的時(shí)候直接去拉數(shù)據(jù)。
2)由于直接操作的是kafka,kafka就相當(dāng)于你底層的文件系統(tǒng)。這個(gè)時(shí)候能保證嚴(yán)格的事務(wù)一致性,即一定會(huì)被處理,而且只會(huì)被處理一次。而Receiver的方式則不能保證,因?yàn)镽eceiver和ZK中的數(shù)據(jù)可能不同步,Spark Streaming可能會(huì)重復(fù)消費(fèi)數(shù)據(jù),這個(gè)調(diào)優(yōu)可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,
spark streaming自己負(fù)責(zé)追蹤消費(fèi)這個(gè)數(shù)據(jù)的偏移量或者offset,并且自己保存到checkpoint,所以它的數(shù)據(jù)一定是同步的,一定不會(huì)被重復(fù)。即使重啟也不會(huì)重復(fù),因?yàn)閏heckpoint了,但是程序升級(jí)的時(shí)候,不能讀取原先的checkpoint,面對(duì)升級(jí)checkpoint無效這個(gè)問題,怎么解決呢?升級(jí)的時(shí)候讀取我指定的備份就可以了,即手動(dòng)的指定checkpoint也是可以的,這就再次完美的確保了事務(wù)性,有且僅有一次的事務(wù)機(jī)制。那么怎么手動(dòng)checkpoint呢?構(gòu)建SparkStreaming的時(shí)候,有g(shù)etorCreate這個(gè)api,它就會(huì)獲取checkpoint的內(nèi)容,具體指定下這個(gè)checkpoint在哪就好了?;蛘呷缦聢D:

而如果從checkpoint恢復(fù)后,如果數(shù)據(jù)累積太多處理不過來,怎么辦?1)限速2)增強(qiáng)機(jī)器的處理能力3)放到數(shù)據(jù)緩沖池中。
3)由于底層是直接讀數(shù)據(jù),沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數(shù)據(jù)的時(shí)候,我們會(huì)使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數(shù)據(jù)。這個(gè)時(shí)候,Direct Api訪問kafka帶來的一個(gè)顯而易見的性能上的好處就是,如果你要讀取多個(gè)partition,Spark也會(huì)創(chuàng)建RDD的partition,這個(gè)時(shí)候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個(gè)partition是沒任何關(guān)系的。這個(gè)優(yōu)勢(shì)是你的RDD,其實(shí)本質(zhì)上講在底層讀取kafka的時(shí)候,kafka的partition就相當(dāng)于原先hdfs上的一個(gè)block。這就符合了數(shù)據(jù)本地性。RDD和kafka數(shù)據(jù)都在這邊。所以讀數(shù)據(jù)的地方,處理數(shù)據(jù)的地方和驅(qū)動(dòng)數(shù)據(jù)處理的程序都在同樣的機(jī)器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對(duì)一的,想提高并行度就會(huì)比較麻煩。提高并行度還是repartition,即重新分區(qū),因?yàn)楫a(chǎn)生shuffle,很耗時(shí)。這個(gè)問題,以后也許新版本可以自由配置比例,不是一對(duì)一。因?yàn)樘岣卟⑿卸?,可以更好的利用集群的?jì)算資源,這是很有意義的。
4)不需要開啟wal機(jī)制,從數(shù)據(jù)零丟失的角度來看,極大的提升了效率,還至少能節(jié)省一倍的磁盤空間。從kafka獲取數(shù)據(jù),比從hdfs獲取數(shù)據(jù),因?yàn)閦ero copy的方式,速度肯定更快。
【代碼實(shí)戰(zhàn)】
public class SparkStreamingOnKafkaDirected {
public static void main(String[] args) {
SparkConf conf = new SparkConf().
setAppName("SparkStreamingOnKafkaDirected")
.setMaster("local");
JavaStreamingContext jsc = new JavaStreamingContext
(conf, Durations.seconds(10));
Map<String, String> kafkaParameters = new HashMap
<String, String>();
kafkaParameters.put("metadata.broker.list",
"node03:9092,node04:9092,node05:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("kfk");
JavaPairInputDStream<String,String> lines =
KafkaUtils.createDirectStream(jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParameters,
topics);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
//如果是Scala,由于SAM轉(zhuǎn)換,所以可以寫成val words =
//lines.flatMap { line => line.split(" ")}
@Override
public Iterable<String> call(Tuple2<String,String> tuple)
throws Exception {
return Arrays.asList(tuple._2.split("\t"));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
//對(duì)相同的Key,進(jìn)行Value的累計(jì)(包括Local和Reducer級(jí)別同時(shí)Reduce)
@Override
public Integer call(Integer v1, Integer v2)
throws Exception {
return v1 + v2;
}
});
wordsCount.print();
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
并行度問題:
并行度和RDD中的partition個(gè)數(shù)有關(guān)系??梢酝ㄟ^repartition增加。
1、receiver模式:并行度與block Interval(默認(rèn)200ms)有關(guān)系,但是建議不低于50ms。
2、direct模式:并行度與kafak中topic的partition數(shù)據(jù)有關(guān)。增加kafka中的topic的partition數(shù)量可以提高并行度。
receive模式和direct模式最大不同是消費(fèi)偏移量管理者不同,一個(gè)是zookeeper,另一個(gè)是SparkStreaming(checkpoint)
direct模式和receive模式對(duì)比:
1.簡化并行性:無需創(chuàng)建多個(gè)輸入Kafka流并且結(jié)合它們。 使用directStream,Spark Streaming將創(chuàng)建與要消費(fèi)的Kafkatopic中partition分區(qū)一樣多的RDD分區(qū),這將從Kafka并行讀取數(shù)據(jù)。 因此,在Kafka和RDD分區(qū)之間存在一對(duì)一映射,這更容易理解和調(diào)整。
2.效率:在第一種方法中實(shí)現(xiàn)零數(shù)據(jù)丟失需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫日志中,該日志進(jìn)一步復(fù)制數(shù)據(jù)。 這實(shí)際上是低效的,因?yàn)閿?shù)據(jù)有效地被復(fù)制兩次 - 一次是Kafka,另一次是寫入提前日志。 第二種方法消除了問題,因?yàn)闆]有接收器(zookeeper),因此不需要預(yù)寫日志。 將元數(shù)據(jù)信息直接保存在kafka中,可以從Kafka恢復(fù)消息。
3.Exactly-once語義:第一種方法使用Kafka的高級(jí)API在Zookeeper中存儲(chǔ)消耗的偏移量。這是傳統(tǒng)上消費(fèi)Kafka數(shù)據(jù)的方式。雖然這種方法(與預(yù)寫日志結(jié)合)可以確保零數(shù)據(jù)丟失(即至少一次語義),但是一些記錄在一些故障下可能被消耗兩次。這是因?yàn)镾park Streaming可靠接收的數(shù)據(jù)與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用簡單的Kafka API,不使用Zookeeper的。偏移由Spark Streaming在其檢查點(diǎn)內(nèi)跟蹤。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,所以每個(gè)記錄被Spark Streaming有效地精確接收一次,盡管失敗了。為了實(shí)現(xiàn)輸出結(jié)果的一次性語義,將數(shù)據(jù)保存到外部數(shù)據(jù)存儲(chǔ)的輸出操作必須是冪等的,或者是保存結(jié)果和偏移量的原子事務(wù)。