前面我們已經(jīng)了解了sparksql的使用,這一節(jié)我們將了解spark當中的流處理即spark-streaming。
1 系統(tǒng)、軟件以及前提約束
- CentOS 7 64 工作站 作者的機子ip是192.168.100.200,請讀者根據(jù)自己實際情況設置
- 已完成spark中的DataFrame編程
http://www.itdecent.cn/nb/37554943 - xshell
- 為去除權(quán)限對操作的影響,所有操作都以root進行
- 確保hadoop,spark已經(jīng)啟動
2 操作
- 1 在linux命令行中執(zhí)行以下命令
# 啟動8888端口,可以輸入值
nc -lk 8888
- 2 啟動另外一個窗口,執(zhí)行以下命令:
# 進入spark的bin目錄
cd /root/spark-2.2.1-bin-hadoop2.7/bin
# 進入scala命令行
./spark-shell
# 在命令行中執(zhí)行以下語句
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("192.168.100.200", 8888, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
- 3 測試
在前一個nc窗口不斷輸入字符串,回車,我們會看到spark命令行中正在統(tǒng)計這一秒的輸入的字符串的詞頻。
以上,就是sparkstreaming監(jiān)聽端口并進行詞頻統(tǒng)計的過程。