TCP Scoket數(shù)據(jù)流WordCount


安裝nc: yum install nc

WordCount
package cn.spark.streaming;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

/**
 * 基于 scoket的wordcount程序
 * @author ThinkVision
 *
 */
public class WordCount {

    public static void main(String[] args) throws Exception {
        
        SparkConf conf = new SparkConf().setAppName("WordCount");
        
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        
        // create InputDStream
        JavaReceiverInputDStream<String> lineInputDStream = jssc.socketTextStream("hserver-1", 9999);
        
        // transform lineInputDStream into wordDStream
        JavaDStream<String> wordDStream = lineInputDStream.flatMap(
                
                new FlatMapFunction<String, String>() {

                    private static final long serialVersionUID = 425086794973670380L;
        
                    @Override
                    public Iterator<String> call(String line) throws Exception {
                        
                        return Arrays.asList(line.split(" ")).iterator();
                    }
                });
        
        // transform wordDStream into pairDStream
        JavaPairDStream<String, Integer> pairDStream = wordDStream.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = -7388596223277641539L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {

                        return new Tuple2<String, Integer>(word, 1);
                    }
                });
        
        
        // reduceByKey
        JavaPairDStream<String, Integer> resultDStream = pairDStream.reduceByKey(
                
                new Function2<Integer, Integer, Integer>() {
                    
                    private static final long serialVersionUID = 5957698719526594872L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {

                        return v1 + v2;
                    }
                });
        
        resultDStream.print();
        
        jssc.start();
        
        jssc.awaitTermination();
        
        jssc.close();
        
    }
}

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 環(huán)境: 這里我假設你已經(jīng)安裝并且配置好了運行spark的環(huán)境,本文只記錄官網(wǎng)教程給出的Spark Streamin...
    東皇Amrzs閱讀 747評論 0 0
  • http://blog.csdn.net/mathewsking/article/details/8211273 ...
    liuboxx1閱讀 6,230評論 1 1
  • 圖片在最下面 1.vm安裝center so 7系統(tǒng) 2.配置網(wǎng)絡 宿機與linux在同一個網(wǎng)段 更改vim /e...
    F的平方閱讀 1,494評論 0 0
  • 原來,在剛才森林之中,董宣、呂峰、丁寒、月殘四人已經(jīng)發(fā)現(xiàn)了顏良帶領著軒轅他們所在的方位。 所以施展 神通 驅(qū)趕這些...
    和煦晴天閱讀 151評論 0 0

友情鏈接更多精彩內(nèi)容