Spark Streaming基于kafka獲取數(shù)據(jù)

[TOC]
SparkStreaming基于kafka獲取數(shù)據(jù)的方式,主要有倆種,即Receiver和Direct,基于Receiver的方式,是SparkStreaming給我們提供了kafka訪問的高層api的封裝,而基于Direct的方式,就是直接訪問,在SparkSteaming中直接去操作kafka中的數(shù)據(jù),不需要前面的高層api的封裝。而Direct的方式,可以對(duì)kafka進(jìn)行更好的控制!同時(shí)性能也更好。

1.Spark streaming基于kafka以Receiver方式獲取數(shù)據(jù)
實(shí)際上做kafka receiver的時(shí)候,通過receiver來獲取數(shù)據(jù),這個(gè)時(shí)候,kafka receiver是使用的kafka高層次的comsumer api來實(shí)現(xiàn)的。receiver會(huì)從kafka中獲取數(shù)據(jù),然后把它存儲(chǔ)到我們具體的Executor內(nèi)存中。然后Spark streaming也就是driver中,會(huì)根據(jù)這獲取到的數(shù)據(jù),啟動(dòng)job去處理。

image.png

1)在通過kafka receiver去獲取kafka的數(shù)據(jù),在正在獲取數(shù)據(jù)的過程中,這臺(tái)機(jī)器有可能崩潰了。如果來不及做備份,數(shù)據(jù)就會(huì)丟失,切換到另外一臺(tái)機(jī)器上,也沒有相關(guān)數(shù)據(jù)。這時(shí)候,為了數(shù)據(jù)安全,采用WAL的方式。write ahead log,預(yù)寫日志的方式會(huì)同步的將接收到的kafka數(shù)據(jù),寫入到分布式文件系統(tǒng)中。但是預(yù)寫日志的方式消耗時(shí)間,所以存儲(chǔ)時(shí)建議Memory_and_Disc2.如果是寫到hdfs,會(huì)自動(dòng)做副本。如果是寫到本地,這其實(shí)有個(gè)風(fēng)險(xiǎn),就是如果這臺(tái)機(jī)器崩潰了,再想恢復(fù)過來,這個(gè)是需要時(shí)間的。

2)我們的kafka receiver接收數(shù)據(jù)的時(shí)候,通過線程或者多線程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的數(shù)據(jù),也是通過線程并發(fā)的方式去獲取的不同的partition,例如用五條線程同時(shí)去讀取kafka中的topics中的不同的partition數(shù)據(jù),這時(shí)你這個(gè)讀取數(shù)據(jù)的并發(fā)線程數(shù),和RDD實(shí)際處理數(shù)據(jù)的并發(fā)線程數(shù)是沒任何關(guān)系的。因?yàn)楂@取數(shù)據(jù)時(shí)都還沒產(chǎn)生RDD呢。RDD是Driver端決定產(chǎn)生RDD的。

3)默認(rèn)情況下,一個(gè)Executor中是不是只有一個(gè)receiver去接收kafka中的數(shù)據(jù)。那能不能多找一些Executor去更高的并發(fā)度,就是使用更多的機(jī)器去接收數(shù)據(jù),當(dāng)然可以,基于kafa的api去創(chuàng)建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的數(shù)據(jù),最后你計(jì)算的時(shí)候,再把他優(yōu)聯(lián)就行了。其實(shí)這是非常靈活的,因?yàn)榭梢宰杂傻慕M合。

【代碼實(shí)戰(zhàn)】

producer端:

public class SparkStreamingDataManuallyProducerForKafka extends Thread
{
   static String[] channelNames = new  String[]{
    "Spark","Scala","Kafka","Flink","Hadoop","Storm",
    "Hive","Impala","HBase","ML"
   };
   static String[] actionNames = new String[]{"View", "Register"};
   private String topic; //發(fā)送給Kafka的數(shù)據(jù)的類別
   private Producer<Integer, String> producerForKafka;
   
   private static String dateToday;
   private static Random random;
   public SparkStreamingDataManuallyProducerForKafka(String topic){
    dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
       this.topic = topic;
       random = new Random();
       Properties conf = new Properties();
       conf.put("metadata.broker.list","node03:9092,node04:9092,node05:9092");
       conf.put("serializer.class",  StringEncoder.class.getName());
       producerForKafka = new Producer<Integer, String>
       (new ProducerConfig(conf)) ;
   }  
   @Override
   public void run() {
    int counter = 0;
    while(true){
        counter++;
        String userLog = userlogs();
        //System.out.println("product:"+userLog+"   ");
        producerForKafka.send(new KeyedMessage<Integer, String>
        (topic, userLog));
        if(0 == counter%500){
            counter = 0;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        }
    }
   public static void main( String[] args )
   {
    new SparkStreamingDataManuallyProducerForKafka("kfk").start();
   }
private static String userlogs() {  
    StringBuffer userLogBuffer = new StringBuffer("");
    int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
    long timestamp = new Date().getTime();
        Long userID = 0L;
        long pageID = 0L;
        
        //隨機(jī)生成的用戶ID 
        if(unregisteredUsers[random.nextInt(8)] == 1) {
            userID = null;
        } else {
            userID = (long) random.nextInt((int) 2000);
        }           
        //隨機(jī)生成的頁面ID
        pageID =  random.nextInt((int) 2000);
        
        //隨機(jī)生成Channel
        String channel = channelNames[random.nextInt(10)];
        
        //隨機(jī)生成action行為
        String action = actionNames[random.nextInt(2)];
        
        
        userLogBuffer.append(dateToday)
                    .append("\t")
                    .append(timestamp)
                    .append("\t")
                    .append(userID)
                    .append("\t")
                    .append(pageID)
                    .append("\t")
                    .append(channel)
                    .append("\t")
                    .append(action);
//                      .append("\n");              
    return userLogBuffer.toString();    
}
}

consumer端:

public class SparkStreamingOnKafkaReceiver {
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("SparkStreamingOnKafkaReceiver")
            .setMaster("local[2]")
        .set("spark.streaming.receiver.writeAheadLog.enable","true");    
    JavaStreamingContext jsc = new JavaStreamingContext(conf, 
            Durations.seconds(5));
        jsc.checkpoint("hdfs://node02:8020/checkpoint");
    Map<String, Integer> topicConsumerConcurrency =
            new HashMap<String, Integer>();
    topicConsumerConcurrency.put("kfk", 1);
    
    JavaPairReceiverInputDStream<String,String> lines = 
            KafkaUtils.createStream(jsc,
            "node02:2181,node03:2181,node04:2181", 
            "MyFiestConsumerGroup", topicConsumerConcurrency);
    
    
    JavaDStream<String> words = lines.flatMap(new
            FlatMapFunction<Tuple2<String,String>, String>() { 
        //如果是Scala,由于SAM轉(zhuǎn)換,所以可以寫成val words = 
        //lines.flatMap { line => line.split(" ")}
        private static final long serialVersionUID = 1L;
        @Override
        public Iterable<String> call(Tuple2<String,String> tuple) 
                throws Exception {
            return Arrays.asList(tuple._2.split("\t"));
        }
    });
  
    JavaPairDStream<String, Integer> pairs = words.mapToPair
            (new PairFunction<String, String, Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Tuple2<String, Integer> call(String word) throws Exception {
            return new Tuple2<String, Integer>(word, 1);
        }
    });
 
    JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey
            (new Function2<Integer, Integer, Integer>() { 
    //對(duì)相同的Key,進(jìn)行Value的累計(jì)(包括Local和Reducer級(jí)別同時(shí)Reduce)
        private static final long serialVersionUID = 1L;
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1 + v2;
        }
    });
    wordsCount.print();     
    jsc.start();
    jsc.awaitTermination();
    jsc.close();
}
}

【結(jié)果展示】

-------------------------------------------
Time: 1488815425000 ms
-------------------------------------------
(273,1)
(1148,4)
(1119,2)
(1816,1)
(312,1)
(62,1)
(1184,1)
(1625,1)
(1566,1)
(1488815421523,1)
...

2.sparkStreaming基于kafka的Direct詳解

Direct方式特點(diǎn):

1)Direct的方式是會(huì)直接操作kafka底層的元數(shù)據(jù)信息,這樣如果計(jì)算失敗了,可以把數(shù)據(jù)重新讀一下,重新處理。即數(shù)據(jù)一定會(huì)被處理。拉數(shù)據(jù),是RDD在執(zhí)行的時(shí)候直接去拉數(shù)據(jù)。

2)由于直接操作的是kafka,kafka就相當(dāng)于你底層的文件系統(tǒng)。這個(gè)時(shí)候能保證嚴(yán)格的事務(wù)一致性,即一定會(huì)被處理,而且只會(huì)被處理一次。而Receiver的方式則不能保證,因?yàn)镽eceiver和ZK中的數(shù)據(jù)可能不同步,Spark Streaming可能會(huì)重復(fù)消費(fèi)數(shù)據(jù),這個(gè)調(diào)優(yōu)可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,

spark streaming自己負(fù)責(zé)追蹤消費(fèi)這個(gè)數(shù)據(jù)的偏移量或者offset,并且自己保存到checkpoint,所以它的數(shù)據(jù)一定是同步的,一定不會(huì)被重復(fù)。即使重啟也不會(huì)重復(fù),因?yàn)閏heckpoint了,但是程序升級(jí)的時(shí)候,不能讀取原先的checkpoint,面對(duì)升級(jí)checkpoint無效這個(gè)問題,怎么解決呢?升級(jí)的時(shí)候讀取我指定的備份就可以了,即手動(dòng)的指定checkpoint也是可以的,這就再次完美的確保了事務(wù)性,有且僅有一次的事務(wù)機(jī)制。那么怎么手動(dòng)checkpoint呢?構(gòu)建SparkStreaming的時(shí)候,有g(shù)etorCreate這個(gè)api,它就會(huì)獲取checkpoint的內(nèi)容,具體指定下這個(gè)checkpoint在哪就好了?;蛘呷缦聢D:

image.png

而如果從checkpoint恢復(fù)后,如果數(shù)據(jù)累積太多處理不過來,怎么辦?1)限速2)增強(qiáng)機(jī)器的處理能力3)放到數(shù)據(jù)緩沖池中。

3)由于底層是直接讀數(shù)據(jù),沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數(shù)據(jù)的時(shí)候,我們會(huì)使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數(shù)據(jù)。這個(gè)時(shí)候,Direct Api訪問kafka帶來的一個(gè)顯而易見的性能上的好處就是,如果你要讀取多個(gè)partition,Spark也會(huì)創(chuàng)建RDD的partition,這個(gè)時(shí)候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個(gè)partition是沒任何關(guān)系的。這個(gè)優(yōu)勢(shì)是你的RDD,其實(shí)本質(zhì)上講在底層讀取kafka的時(shí)候,kafka的partition就相當(dāng)于原先hdfs上的一個(gè)block。這就符合了數(shù)據(jù)本地性。RDD和kafka數(shù)據(jù)都在這邊。所以讀數(shù)據(jù)的地方,處理數(shù)據(jù)的地方和驅(qū)動(dòng)數(shù)據(jù)處理的程序都在同樣的機(jī)器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對(duì)一的,想提高并行度就會(huì)比較麻煩。提高并行度還是repartition,即重新分區(qū),因?yàn)楫a(chǎn)生shuffle,很耗時(shí)。這個(gè)問題,以后也許新版本可以自由配置比例,不是一對(duì)一。因?yàn)樘岣卟⑿卸?,可以更好的利用集群的?jì)算資源,這是很有意義的。

4)不需要開啟wal機(jī)制,從數(shù)據(jù)零丟失的角度來看,極大的提升了效率,還至少能節(jié)省一倍的磁盤空間。從kafka獲取數(shù)據(jù),比從hdfs獲取數(shù)據(jù),因?yàn)閦ero copy的方式,速度肯定更快。

【代碼實(shí)戰(zhàn)】

public class SparkStreamingOnKafkaDirected {
    public static void main(String[] args) {    
        SparkConf conf = new SparkConf().
                setAppName("SparkStreamingOnKafkaDirected")
                .setMaster("local");    
        JavaStreamingContext jsc = new JavaStreamingContext
                (conf, Durations.seconds(10));  
        Map<String, String> kafkaParameters = new HashMap
                <String, String>();
        kafkaParameters.put("metadata.broker.list", 
                "node03:9092,node04:9092,node05:9092"); 
        HashSet<String> topics = new HashSet<String>();
        topics.add("kfk");
        JavaPairInputDStream<String,String> lines = 
                KafkaUtils.createDirectStream(jsc,
                String.class, 
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParameters,
                topics);    
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<Tuple2<String,String>, String>() { 
                    //如果是Scala,由于SAM轉(zhuǎn)換,所以可以寫成val words = 
                    //lines.flatMap { line => line.split(" ")}
            @Override
            public Iterable<String> call(Tuple2<String,String> tuple) 
                    throws Exception {
                return Arrays.asList(tuple._2.split("\t"));
            }
        });     
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) 
                    throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
            
        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() { 
    //對(duì)相同的Key,進(jìn)行Value的累計(jì)(包括Local和Reducer級(jí)別同時(shí)Reduce)
            
            @Override
            public Integer call(Integer v1, Integer v2) 
                    throws Exception {
                return v1 + v2;
            }
        }); 
        wordsCount.print(); 
        jsc.start();    
        jsc.awaitTermination();
        jsc.close();
    }
}

并行度問題:

并行度和RDD中的partition個(gè)數(shù)有關(guān)系??梢酝ㄟ^repartition增加。

1、receiver模式:并行度與block Interval(默認(rèn)200ms)有關(guān)系,但是建議不低于50ms。

2、direct模式:并行度與kafak中topic的partition數(shù)據(jù)有關(guān)。增加kafka中的topic的partition數(shù)量可以提高并行度。

receive模式和direct模式最大不同是消費(fèi)偏移量管理者不同,一個(gè)是zookeeper,另一個(gè)是SparkStreaming(checkpoint)

direct模式和receive模式對(duì)比:

1.簡化并行性:無需創(chuàng)建多個(gè)輸入Kafka流并且結(jié)合它們。 使用directStream,Spark Streaming將創(chuàng)建與要消費(fèi)的Kafkatopic中partition分區(qū)一樣多的RDD分區(qū),這將從Kafka并行讀取數(shù)據(jù)。 因此,在Kafka和RDD分區(qū)之間存在一對(duì)一映射,這更容易理解和調(diào)整。

2.效率:在第一種方法中實(shí)現(xiàn)零數(shù)據(jù)丟失需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫日志中,該日志進(jìn)一步復(fù)制數(shù)據(jù)。 這實(shí)際上是低效的,因?yàn)閿?shù)據(jù)有效地被復(fù)制兩次 - 一次是Kafka,另一次是寫入提前日志。 第二種方法消除了問題,因?yàn)闆]有接收器(zookeeper),因此不需要預(yù)寫日志。 將元數(shù)據(jù)信息直接保存在kafka中,可以從Kafka恢復(fù)消息。

3.Exactly-once語義:第一種方法使用Kafka的高級(jí)API在Zookeeper中存儲(chǔ)消耗的偏移量。這是傳統(tǒng)上消費(fèi)Kafka數(shù)據(jù)的方式。雖然這種方法(與預(yù)寫日志結(jié)合)可以確保零數(shù)據(jù)丟失(即至少一次語義),但是一些記錄在一些故障下可能被消耗兩次。這是因?yàn)镾park Streaming可靠接收的數(shù)據(jù)與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用簡單的Kafka API,不使用Zookeeper的。偏移由Spark Streaming在其檢查點(diǎn)內(nèi)跟蹤。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,所以每個(gè)記錄被Spark Streaming有效地精確接收一次,盡管失敗了。為了實(shí)現(xiàn)輸出結(jié)果的一次性語義,將數(shù)據(jù)保存到外部數(shù)據(jù)存儲(chǔ)的輸出操作必須是冪等的,或者是保存結(jié)果和偏移量的原子事務(wù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容