一 Spark Streaming
1 介紹
參考資料
2 spark streaming第一例
2.1 導(dǎo)入依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
2.2 Spark Streaming的第一個代碼:單詞統(tǒng)計
package cn.qphone.spark.streaming.day1
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Demo1_Streaming {
def main(args: Array[String]): Unit = {
//1. 參數(shù)過濾
if (args == null || args.length != 2) {
println(
"""
|Useage : <hostname> <port>
|""".stripMargin)
System.exit(-1)
}
var Array(hostname, port) = args
//2. 獲取核心類
val streamContext: StreamingContext = new StreamingContext(new SparkConf()
.setAppName("Demo1_Streaming").setMaster("local[*]"), Seconds(5))
//3. 業(yè)務(wù):單詞統(tǒng)計
val lines:ReceiverInputDStream[String] = streamContext.socketTextStream(hostname, port.toInt)
val retDStream:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
retDStream.print
//4. 為了執(zhí)行的流式計算,必須要調(diào)用start來啟動
streamContext.start()
//5. 為了不至于start啟動程序結(jié)束,必須要調(diào)用awaitTermination方法等待程序業(yè)務(wù)完成之后調(diào)用stop方法結(jié)束程序,或者異常
streamContext.awaitTermination()
}
}
2.3 配置

001.png
2.4 安裝web服務(wù)器
yum -y install nc
nc -lk qphone01 4444
2.5 Receiver
Receiver,顧名思義,就是數(shù)據(jù)的接收者,這里把資源分成了兩部分,一部分用來接收數(shù)據(jù),一部分用來處理數(shù)據(jù)。Receiver接收到的數(shù)據(jù),說白了就是一個個的batch數(shù)據(jù),是RDD,存儲在Executor內(nèi)存。Receiver就是Executor內(nèi)存中的一部分。
不是所有的streaming作業(yè)都需要有Receiver。
通過下圖,來闡述基于Receiver的程序執(zhí)行的流程
2.6 Spark Streaming和HDFS
package cn.qphone.spark.streaming.day1
import cn.qphone.spark.utils.LoggerTrait
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object Demo2_HDFS extends LoggerTrait{
def main(args: Array[String]): Unit = {
//2. 獲取核心類
val streamContext: StreamingContext = new StreamingContext(new SparkConf()
.setAppName("Demo2_HDFS").setMaster("local[*]"), Seconds(5))
//3. 業(yè)務(wù):單詞統(tǒng)計
val lines: DStream[String] = streamContext.textFileStream("hdfs://192.168.49.111:9000/data")
val retDStream:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
retDStream.print
//4. 為了執(zhí)行的流式計算,必須要調(diào)用start來啟動
streamContext.start()
//5. 為了不至于start啟動程序結(jié)束,必須要調(diào)用awaitTermination方法等待程序業(yè)務(wù)完成之后調(diào)用stop方法結(jié)束程序,或者異常
streamContext.awaitTermination()
}
}
2.7 Spark Streaming和Kafka
2.7.1 導(dǎo)入依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
2.7.2 整合兩種版本的區(qū)別

002.png
2.7.3 ReceicerStream
2.7.3.1 SparkUtils
package cn.qphone.spark.utils
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkUtils {
val DEFAULT_STREAMING_INTERVAL:Int = 5
/*
* spark core
* */
def getContext(appName:String):SparkContext = getContext(appName, "local[*]")
def getContext(appName:String, masterUrl:String):SparkContext = {
val sc = new SparkContext(
new SparkConf().setMaster(masterUrl).setAppName(appName)
)
sc
}
/*
* spark sql
* */
def getDefaultSession(appName:String):SparkSession = getSessionWithNotHiveSupport(appName, "local[*]")
def getSession(appName:String, isSupportHive:Boolean):SparkSession = {
if (isSupportHive) getSessionWithHiveSupport(appName, "local[*]")
else getSessionWithNotHiveSupport(appName, "local[*]")
}
def getSessionWithHiveSupport(appName:String, masterUrl:String):SparkSession = SparkSession.builder()
.appName(appName).master(masterUrl).enableHiveSupport().getOrCreate() // 支持hive
def getSessionWithNotHiveSupport(appName:String, masterUrl:String):SparkSession = SparkSession.builder().appName(appName).master(masterUrl).getOrCreate()
/*
* spark streaming
* */
def getDefaultStreamingContext(appName:String):StreamingContext = getStreamingContext(appName, "local[*]", DEFAULT_STREAMING_INTERVAL)
def getStreamingContext(appName:String, master:String, interval:Int):StreamingContext = new StreamingContext(new SparkConf().setAppName(appName).setMaster(master), Seconds(interval))
def stop(sparkContext: SparkContext) = {
if (null != sparkContext && !sparkContext.isStopped) {
sparkContext.stop()
}
}
def stop(sparkSession: SparkSession) = {
if (null != sparkSession && !sparkSession.sparkContext.isStopped) {
sparkSession.stop()
}
}
def stop(streamingContext:StreamingContext) = {
if (null != streamingContext && !streamingContext.sparkContext.isStopped) {
streamingContext.stop()
}
}
}
2.7.3.2 Demo3_Receiver
package cn.qphone.spark.streaming.day1
import cn.qphone.spark.utils.{LoggerTrait, SparkUtils}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
object Demo3_Kafka_Receiver extends LoggerTrait{
def main(args: Array[String]): Unit = {
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_Kafka_Receiver")
//2.準備參數(shù)
val zkQuorum:String = "qphone01,qphone02,qphone03/kafka"
val groupId = "streaming_receiver_hz2002"
val topics = Map(
"receiver_topic" -> 1
)
//3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
val msgDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
//4. 業(yè)務(wù)
msgDStream.print
//5. 先啟動
ssc.start()
//6. 監(jiān)控
ssc.awaitTermination()
}
}
2.7.3.3 其他步驟
##1. 啟動kafka
##2. 創(chuàng)建指定的主題
##3. 啟動代碼
##4. 利用生產(chǎn)者進程生產(chǎn)數(shù)據(jù)
2.7.3.4 關(guān)于recevicer為何被棄用
##1.
這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù).
如果要啟用高可靠機制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進行恢復(fù)。
##2.
需要注意的地方
1. Kafka的topic分區(qū)和Spark Streaming中生成的RDD分區(qū)沒有關(guān)系。 在KafkaUtils.createStream中增加分區(qū)數(shù)量只會增加單個receiver的線程數(shù),不會增加Spark的并行度
2. 可以創(chuàng)建多個的Kafka的輸入DStream, 使用不同的group和topic, 使用多個receiver并行接收數(shù)據(jù)。
3. 如果啟用了HDFS等有容錯的存儲系統(tǒng),并且啟用了寫入日志,則接收到的數(shù)據(jù)已經(jīng)被復(fù)制到日志中。因此,輸入流的存儲級別設(shè)置StorageLevel.MEMORY_AND_DISK_SER(即使用KafkaUtils.createStream(...,StorageLevel.MEMORY_AND_DISK_SER))的存儲級別。
##3. 數(shù)據(jù)會丟失原因

003.png
2.7.4 DirectStream
package cn.qphone.spark.streaming.day2
import java.util.Properties
import cn.qphone.spark.utils.{CommonScalaUtils, CommonUtils, SparkUtils}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.JavaConversions
object Demo1_Kafka_Direct{
def main(args: Array[String]): Unit = {
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo4_Kafka_Direct")
//2.準備參數(shù)
val properties = new Properties()
properties.load(Demo1_Kafka_Direct.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
// val kafkaParams = JavaConversions.mapAsScalaMap(CommonUtils.toMap(properties)).toMap
val kafkaParams = CommonScalaUtils.toMap(properties)
val topics = Set(
"hzbigdata2002"
)
//3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
val msgDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//4. 業(yè)務(wù)
msgDStream.print
msgDStream.foreachRDD((rdd, bTime) => {
if(!rdd.isEmpty()) {
val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
val offsetRanges = offsetRDD.offsetRanges
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val fromOffset = offsetRange.fromOffset
val untilOffset = offsetRange.untilOffset
val rddcount = rdd.count()
println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}\tcount:${rddcount}")
}
rdd.count()
}
})
//5. 先啟動
ssc.start()
//6. 監(jiān)控
ssc.awaitTermination()
}
}
- kafka.properties
bootstrap.servers=qphone01:9092,qphone02:9092,qphone03:9092
group.id=a_streaming
auto.offset.reset=smallest
- Java-CommonUtils
public class CommonUtils {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
properties.load(CommonUtils.class.getClassLoader().getResourceAsStream("kafka.properties"));
Map<String, String> map = toMap(properties);
System.out.println(map);
}
public static Map<String, String> toMap(Properties properties) {
Map<String, String> map = new HashMap<>();
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
for (Map.Entry entry : entries) {
map.put((String) entry.getKey(), (String) entry.getValue());
}
return map;
}
}
- Scala-CommonScalaUtils
object CommonScalaUtils {
def toMap(properties: Properties):immutable.Map[String, String] = { // scala.collection.immutable.Map
val entries: util.Set[Map.Entry[AnyRef, AnyRef]] = properties.entrySet() // 獲取到properties的kv,kv是被存放到j(luò)ava.util.Set
val set: mutable.Set[Map.Entry[AnyRef, AnyRef]] = JavaConversions.asScalaSet(entries) // 將java.util.Set轉(zhuǎn)換為scala.collection.mutable.Set
var map = mutable.Map[String, String]() // scala.collection.mutable.Map
set.foreach(entry => map.put(entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
map.toMap
}
}
2.8 偏移量保存
2.8.1 使用zk來保存具體的偏移量
- ZKCuratorUtils
package cn.qphone.spark.utils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class ZKCuratorUtils {
private static final String ZK_CONNECTIONS = "qphone01,qphone02,qphone03";
private static final int BASE_SLEEP_TIME_MS = 1000;
private static final int MAX_RETRIES = 3;
public static CuratorFramework getClient() {
CuratorFramework client = CuratorFrameworkFactory
.newClient(ZK_CONNECTIONS, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
client.start();
return client;
}
}
- 主要代碼
package cn.qphone.spark.streaming.day2
import java.util.Properties
import cn.qphone.spark.utils.{CommonScalaUtils, LoggerTrait, SparkUtils, ZKCuratorUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat
import scala.collection._
import scala.collection.JavaConversions
object Demo2_Offset_ZK extends LoggerTrait{
var client = ZKCuratorUtils.getClient
def main(args: Array[String]): Unit = {
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo2_Offset_ZK")
//2.準備參數(shù)
val properties = new Properties()
properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
val topics: Predef.Set[String] = "zk_offset".split(",").toSet
//3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
val messageStream: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)
//4.遍歷
messageStream.foreachRDD((rdd, btime) => {
if (!rdd.isEmpty()) {
val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
storeOffset(offsetRanges, kafkaParams("group.id"))
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 創(chuàng)建消息流
* 偏移量保存在zk中
* 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在zk上創(chuàng)建目錄
* 如果不是第一次,就從指定的偏移量開始讀取,從zk的指定目錄讀取偏移量
*/
def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
//1. 讀取偏移量(TopicAndPartion,offset)
val offsets:Predef.Map[TopicAndPartition, Long] = getOffset(topics, kafkaParams("group.id"))
//2.如果沒有偏移量就代表第一次讀取
var msgDStream: InputDStream[(String, String)] = null
if(offsets.isEmpty) {
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else { //否則就不是第一次讀取
val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
}
msgDStream
}
/**
* 獲取到指定主題的各個分區(qū)的偏移量
*/
def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
//0. 創(chuàng)建一個可變map
val map = mutable.Map[TopicAndPartition, Long]()
//1. 遍歷主題
topics.foreach(topic => {
//2. 通過zk查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
val path = s"/${topic}/$groupId"
checkExist(path)
//3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
//4. 讀取偏移量
val fullPath = s"$path/$partition"
val offset = new String(client.getData.forPath(fullPath)).toLong
val tap:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
map.put(tap, offset)
})
})
map.toMap
}
/**
* 校驗此路徑是否存在,如果不存在就創(chuàng)建他,并返回其偏移量
*/
def checkExist(path:String):Unit = {
//1. 判斷此路徑是否存在
val stat: Stat = client.checkExists().forPath(path)
if (stat == null) { // 2.說明不存在
//3. 創(chuàng)建路徑
client.create().creatingParentsIfNeeded().forPath(path)
}
}
/**
* 存儲偏移量
*/
def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val fromOffset = offsetRange.fromOffset
val untilOffset = offsetRange.untilOffset
val path = s"/$topic/$group_id/$partition"
checkExist(path) // 創(chuàng)建分區(qū)目錄
client.setData().forPath(path, untilOffset.toString.getBytes())
//存儲偏移量
println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
}
}
}
2.8.2 使用redis保存偏移量
- JedisUtils
package cn.qphone.spark.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisUtils {
private static final String DEFAULT_HOST = "192.168.49.111";
private static final int DEPAULT_PORT = 6379;
private static JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), DEFAULT_HOST, DEPAULT_PORT);
public static JedisPool getDefaultPool() {
return jedisPool;
}
public static JedisPool getPool(String host, int port) {
jedisPool = new JedisPool(new JedisPoolConfig(), host, port);
return jedisPool;
}
public static Jedis getJedisWithDefaultPool() {
Jedis jedis = jedisPool.getResource();
jedis.auth("123456");
return jedis;
}
public static Jedis getJedisWithPool(JedisPool pool) {
if (pool != null) return pool.getResource();
return null;
}
public static void returnJedisWithDefaultPool(Jedis jedis) {
if(jedis != null) jedisPool.returnResource(jedis);
}
public static void returnJedisWithPool(Jedis jedis, JedisPool pool) {
if (jedis != null && pool != null) pool.returnResource(jedis);
}
}
- 具體代碼
package cn.qphone.spark.streaming.day2
import java.util
import java.util.Properties
import cn.qphone.spark.utils.{CommonScalaUtils, JedisUtils, LoggerTrait, SparkUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import scala.collection.{JavaConversions, Set, mutable}
object Demo3_Offset_Redis extends LoggerTrait{
def main(args: Array[String]): Unit = {
//一 準備基本參數(shù)StreamingContext,kafkaParams,topics
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_Offset_Redis")
//2.準備參數(shù)
val properties = new Properties()
properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
val topics: Predef.Set[String] = "redis_offset".split(",").toSet
//二 創(chuàng)建離散流:如果沒有offset怎么辦,有的話怎么辦
//3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
val messageStream: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)
//4.遍歷
messageStream.foreachRDD((rdd, btime) => {
if (!rdd.isEmpty()) {
val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
storeOffset(offsetRanges, kafkaParams("group.id"))
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 創(chuàng)建消息流
* 偏移量保存在redis中
* 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在redis上創(chuàng)建目錄
* 如果不是第一次,就從指定的偏移量開始讀取,從redis的指定key讀取偏移量
*/
def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
//1. 讀取偏移量(TopicAndPartion,offset)
val offsets:Predef.Map[TopicAndPartition, Long] = getOffset(topics, kafkaParams("group.id"))
//2.如果沒有偏移量就代表第一次讀取
var msgDStream: InputDStream[(String, String)] = null
if(offsets.isEmpty) {
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else { //否則就不是第一次讀取
val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
}
msgDStream
}
/**
* 獲取到指定主題的各個分區(qū)的偏移量
* 127.0.0.1:6379> HSET topic groupid 0_1
* topic_groupid partition offset
* key field value
*
*/
def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
//0. 創(chuàng)建一個可變map
val map = mutable.Map[TopicAndPartition, Long]()
//1. 遍歷主題
topics.foreach(topic => {
//2. 通過redis查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
val jedis = JedisUtils.getJedisWithDefaultPool
val key:String = s"${topic}_${groupId}"
val partitionSet: util.Set[String] = jedis.keys(key)
val fields: Predef.Set[String] = JavaConversions.asScalaSet(partitionSet).toSet
//3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
if(!fields.isEmpty) {
fields.foreach(partition => {
val offset: Long = jedis.hget(key, partition).toLong
val top:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
map.put(top, offset)
})
}
JedisUtils.returnJedisWithDefaultPool(jedis)
})
map.toMap
}
/**
* 存儲偏移量
*/
def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val fromOffset = offsetRange.fromOffset
val untilOffset = offsetRange.untilOffset
val jedis = JedisUtils.getJedisWithDefaultPool
val key:String = s"${topic}_${group_id}"
jedis.hset(key, partition.toString, untilOffset.toString)
//存儲偏移量
println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
JedisUtils.returnJedisWithDefaultPool(jedis)
}
}
}
- 導(dǎo)入依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.1.0</version>
</dependency>
2.8.3 保存偏移量到hbase
作業(yè)
2.9 冪等
2.9.1 介紹
為了實現(xiàn)結(jié)果輸出的一次語義,將數(shù)據(jù)保存到外部數(shù)據(jù)存儲的輸出操作必須是冪等的,或者是保存結(jié)果和偏移量的原子轉(zhuǎn)換
冪等(idempotent、idempotence)是一個數(shù)學與計算機學概念,常見于抽象代數(shù)中。
在編程中一個冪等操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。冪等函數(shù),或冪等方法,是指可以使用相同參數(shù)重復(fù)執(zhí)行,并能獲得相同結(jié)果的函數(shù)。這些函數(shù)不會影響系統(tǒng)狀態(tài),也不用擔心重復(fù)執(zhí)行會對系統(tǒng)造成改變。例如,“setTrue()”函數(shù)就是一個冪等函數(shù),無論多次執(zhí)行,其結(jié)果都是一樣的.更復(fù)雜的操作冪等保證是利用唯一交易號(流水號)實現(xiàn).
f(f(x)) = f(x)
2.9.2 測試
- 需求
## 需求:將kafka中的消息保存到mysql數(shù)據(jù)庫,將偏移量保存到zk
- KafkaTools
package cn.qphone.spark.utils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat
import scala.collection.{JavaConversions, _}
object KafkaTools {
var client = ZKCuratorUtils.getClient
/**
* 創(chuàng)建消息流
* 偏移量保存在zk中
* 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在zk上創(chuàng)建目錄
* 如果不是第一次,就從指定的偏移量開始讀取,從zk的指定目錄讀取偏移量
*/
def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
//1. 讀取偏移量(TopicAndPartion,offset)
val offsets:Predef.Map[TopicAndPartition, Long] = getOffset(topics, kafkaParams("group.id"))
//2.如果沒有偏移量就代表第一次讀取
var msgDStream: InputDStream[(String, String)] = null
if(offsets.isEmpty) {
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else { //否則就不是第一次讀取
val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
}
msgDStream
}
/**
* 獲取到指定主題的各個分區(qū)的偏移量
*/
def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
//0. 創(chuàng)建一個可變map
val map = mutable.Map[TopicAndPartition, Long]()
//1. 遍歷主題
topics.foreach(topic => {
//2. 通過zk查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
val path = s"/${topic}/$groupId"
checkExist(path)
//3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
//4. 讀取偏移量
val fullPath = s"$path/$partition"
val offset = new String(client.getData.forPath(fullPath)).toLong
val tap:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
map.put(tap, offset)
})
})
map.toMap
}
/**
* 校驗此路徑是否存在,如果不存在就創(chuàng)建他,并返回其偏移量
*/
def checkExist(path:String):Unit = {
//1. 判斷此路徑是否存在
val stat: Stat = client.checkExists().forPath(path)
if (stat == null) { // 2.說明不存在
//3. 創(chuàng)建路徑
client.create().creatingParentsIfNeeded().forPath(path)
}
}
/**
* 存儲偏移量
*/
def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val fromOffset = offsetRange.fromOffset
val untilOffset = offsetRange.untilOffset
val path = s"/$topic/$group_id/$partition"
checkExist(path) // 創(chuàng)建分區(qū)目錄
client.setData().forPath(path, untilOffset.toString.getBytes())
//存儲偏移量
println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
}
}
}
- 開啟生產(chǎn)者
kafka-console-producer.sh \
--topic mytopic1 \
--broker-list qphone01:9092,qphone02:9092,qphone03:9092
tip:
lixi,1 rocklee,2 lee,3
- 代碼
package cn.qphone.spark.streaming.day2
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import cn.qphone.spark.streaming.day2.Demo2_Offset_ZK.{createMsg, storeOffset}
import cn.qphone.spark.utils.{CommonScalaUtils, KafkaTools, SparkUtils}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
/**
*
*
* 1. 創(chuàng)建測試的mysql數(shù)據(jù)庫
* create database test;
* 2. 建表
* create table myorders(name varchar(20), orderid varchar(100) primary key);
* 3. 新建topic: mytopic1
* kafka-topics.sh --zookeeper qphone01,qphone02,qphone03/kafka --create --topic mytopic1 --partitions 3 --replication-factor 1
* 4. 往mytopic1發(fā)送數(shù)據(jù), 數(shù)據(jù)格式為 "字符,數(shù)字" 比如 abc,3
**/
object Demo4_Idempotent {
def main(args: Array[String]): Unit = {
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo4_Idempotent")
//2.準備參數(shù)
val properties = new Properties()
properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
val topics: Predef.Set[String] = "mytopic1".split(",").toSet
//3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
val messageStream: InputDStream[(String, String)] = KafkaTools.createMsg(ssc, kafkaParams, topics)
//4.遍歷
messageStream.foreachRDD((rdd, btime) => {
if (!rdd.isEmpty()) {
val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
//4. 保存數(shù)據(jù)
rdd.map(kv => {
println(kv + "---->")
println(kv._2 + "=====>")
kv._2
}).foreachPartition(partition =>{
val connection: Connection = DriverManager.getConnection("jdbc:mysql://192.168.49.111:3306/test", "root", "123456")
val sql = "insert into myorders(orderid, name) values (?, ?) ON DUPLICATE KEY UPDATE orderid=?"
val statement: PreparedStatement = connection.prepareStatement(sql)
partition.foreach(msg => {
val name:String = msg.split(",")(0)
val id:String = msg.split(",")(1)
statement.setString(1, id)
statement.setString(2, name)
statement.setString(3, id)
statement.execute()
})
connection.close()
})
//5. 保存偏移量
KafkaTools.storeOffset(offsetRanges, kafkaParams("group.id"))
}
})
ssc.start()
ssc.awaitTermination()
}
}
2.10 原子性操作
2.10.1 建表語句
1. 創(chuàng)建測試的mysql數(shù)據(jù)庫
create database test;
2. 新建topic: mytopic1
kafka-topics.sh --zookeeper qphone01,qphone02,qphone03/kafka --create --topic mytopic1 --partitions 3 --replication-factor 1
3. 建表
create table `test`.mytopic(topic varchar(200), groupid varchar(20), partid int, offset bigint);
create table `test`.mydata(data varchar(200), id int);
初始化表:
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',0,0);
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',1,0);
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',2,0);
4. 往mytopic1發(fā)送數(shù)據(jù), 數(shù)據(jù)格式為 "字符,數(shù)字" 比如 abc,3
5. 在pom文件加入依賴
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>3.2.0</version>
</dependency>
2.10.2 代碼
package cn.qphone.spark.streaming.day3
import java.util.Properties
import cn.qphone.spark.streaming.day2.Demo2_Offset_ZK
import cn.qphone.spark.utils.{CommonScalaUtils, LoggerTrait, SparkUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.TaskContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import scalikejdbc.{ConnectionPool, DB}
import scalikejdbc._
object Demo1_Atometic extends LoggerTrait{
def main(args: Array[String]): Unit = {
//1. 獲取到context
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo1_Atometic")
//2.準備參數(shù)
val properties = new Properties()
properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
val topics: Predef.Set[String] = "mytopic1".split(",").toSet
//3. 先要通過jdbc將指定主題的偏移量和分區(qū)讀取出來
val driver = "com.mysql.jdbc.Driver"
val jdbcUrl = "jdbc:mysql://192.168.49.111:3306/test"
val jdbcUser = "root"
val jdbcPassword = "123456"
val group_id = "hzbigdata2002"
// 注冊驅(qū)動
Class.forName(driver)
// 設(shè)置連接池,不用返回對象,因為在底層,scala的jdbc會給你隱式的創(chuàng)建連接對象
ConnectionPool.singleton(jdbcUrl, jdbcUser, jdbcPassword)
// scala jdbc查詢數(shù)據(jù)庫的內(nèi)容
val fromOffsets = DB.readOnly {
implicit session => sql"select topic, partid, offset from mytopic"
.map { r =>
TopicAndPartition(r.string(1), r.int(2)) -> r.long(3)
}.list.apply().toMap
}
//4. 創(chuàng)建離散流
val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
//5. 寫操作
messages.foreachRDD((rdd, btime) => {
if(!rdd.isEmpty()) {
val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
rdd.foreachPartition(partition => {
//獲取到指定分區(qū)的偏移量范圍
val pOffsetRange: OffsetRange = offsetRanges(TaskContext.getPartitionId())
//6. 開啟事務(wù)
DB.localTx {implicit session => {
partition.foreach(msg => { // (key, value)
//1. 插入數(shù)據(jù)
val data = msg._2.split(",")(0)
val id =msg._2.split(",")(1)
val dataResult =
sql"""
insert into mydata(data,id) values (${data},${id})
""".execute().apply()
})
// println(1/0)
val offsetResult =
sql"""
update
mytopic
set
offset = ${pOffsetRange.untilOffset}
where
topic = ${pOffsetRange.topic}
and
partid = ${pOffsetRange.partition}
and
groupid = ${group_id }
""".update.apply()
}}
})
}
})
ssc.start()
ssc.awaitTermination()
}
}
2.11 Spark Streaming常見的轉(zhuǎn)換算子
| 函數(shù)名稱 | 描述 |
| map(func) | 對DStream中的各個元素進行func函數(shù)參數(shù),返回一個新的DStream |
| flatMap(func) | 與map相似,只不過各個輸入項可以被輸出為零個或者多個輸出項 |
| filter(func) | 過濾出所有函數(shù)func返回值為true的Dstream元素并返回一個新的DStream |
| repartition(numPartitions) | 增加或減少DStream中的分區(qū)數(shù),從而改變DStream的并行度 |
| union(otherDStream) | 將數(shù)個DStream合并,返回一個新的DStream |
| count() | 通過DStream中的各個RDD中的元素計數(shù),返回只有一個元素的RDD構(gòu)成DStream |
| reduce(func) | 對源DStream中的各個RDD的元素利用func函數(shù)進行聚合操作,然后返回只有一個元數(shù)的RDD構(gòu)成的新的DStream |
| countByValue() | 對于元素類型為K的DStream,返回一個元數(shù)為(K,Long)鍵值對行的新的DStream,Long對應(yīng)的值為源DStream中個RDD的key的出現(xiàn)的個數(shù) |
| reduceByKey(func, [numTasks]) | 利用func函數(shù)對源DStream中key進行聚合操作,然會新的(K, V)的新的DStream |
| join(otherDStream) | |
| cogroup(otherDStream) | |
| transform(func) | |
| updateStateByKey(func) | |
| window |
2.11.1 transform
可以獲取到內(nèi)部rdd,對其就行轉(zhuǎn)換處理
transform是一個transformation算子,轉(zhuǎn)換算子。
DStream上述提供的所有的transformation操作,都是DStream-2-DStream操作,每一個DStream和RDD的直接操作,而DStream本質(zhì)上是一系列RDD,所以RDD-2-RDD操作是顯然被需要的,所以此時官方api中提供了一個為了達成此操作的算子——transform操作。
其最最最經(jīng)典的實現(xiàn)就是DStream和rdd的join操作,還有dstream重分區(qū)(分區(qū)減少,coalsce)。
也就是說transform主要就是用來自定義官方api沒有提供的一些操作。
- 黑名單案例
object Demo2_BlackList extends LoggerTrait{
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo2_BlackList")
//黑名單RDD
/**
* 110.52.250.126##2016-05-30 17:38:20##GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1##200##1292
*
* 110.52.250.127##2016-05-30 17:38:20##GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1##200##1292
*/
val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List(
("27.19.74.143", true),
("110.52.250.126", true)
))
//接入外部的數(shù)據(jù)流
val lines:DStream[String] = ssc.socketTextStream("192.168.49.111", 9999)
val ip2OtherDStream:DStream[(String, String)] = lines.map(line => {
val index = line.indexOf("##")
val ip = line.substring(0, index)
val other = line.substring(index + 2)
(ip, other)
})
val filteredDStream: DStream[(String, String)] = ip2OtherDStream.transform(rdd => {
val join = rdd.leftOuterJoin(blacklistRDD)
join.filter {case (ip, (left, right)) => {
!right.isDefined // 取沒有定義右邊
}}.map{case (ip, (left, right)) => {
(ip, left)
}}
})
filteredDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
2.11.2 updateStateByKey
updateStateByKey(func) 根據(jù)于key的前置狀態(tài)和key的新值,對key進行更新,返回一個新狀態(tài)的Dstream。
人話:統(tǒng)計截止到目前為止key的狀態(tài)。統(tǒng)計全局的數(shù)據(jù)
通過分析,我們需要清楚:在這個操作中需要兩個數(shù)據(jù),一個是key的前置狀態(tài),一個是key的新增(當前批次的數(shù)據(jù));還有歷史數(shù)據(jù)(前置狀態(tài))得需要存儲在磁盤,不應(yīng)該保存在內(nèi)存中。
同時key的前置狀態(tài)可能有可能沒有。
- 代碼
package cn.qphone.spark.streaming.day3
import cn.qphone.spark.utils.{LoggerTrait, SparkUtils}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
object Demo3_WordCount extends LoggerTrait{
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_WordCount")
val lines:DStream[String] = ssc.socketTextStream("192.168.49.111", 9999)
ssc.checkpoint("file:/d:/data/chk") // 必須,持久化
val retDStream:DStream[(String, Int)] =
lines.flatMap(_.split("\\s+")).map((_, 1)).updateStateByKey(updateFunc _, 1)
retDStream.print
ssc.start()
ssc.awaitTermination()
}
/**
*
* @param seq 以前的歷史的值
* @param option 當前的值
*/
def updateFunc(seq:Seq[Int], option:Option[Int]):Option[Int] = {
println(s"seq : ${seq.mkString(",")}")
println(s"currnt : ${option.getOrElse("empty")}")
Option(seq.sum + option.getOrElse(0))
}
}
2.11.3 window算子
window操作其實就是窗口函數(shù)。Spark Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù),會被聚合起來執(zhí)行計算操作,然后生成RDD,會作為window DStream的一個RDD。比如下圖中,每3秒鐘的數(shù)據(jù)會執(zhí)行一次滑動窗口計算,這3秒內(nèi)的3個RDD會被聚合起來進行處理,然后過了2秒種,又會對最近的3秒的數(shù)據(jù)行滑動窗口計算。所以每個滑動窗口操作,都必須指定兩個參數(shù):窗口長度,以及滑動間隔。而且這兩個參數(shù)必須是batch間隔的整數(shù)倍。

004.png
object Demo4_Window extends LoggerTrait{
def main(args: Array[String]): Unit = {
//1. 入口類
val ssc = SparkUtils.getDefaultStreamingContext("Demo4_Window")
//2. 讀取數(shù)據(jù)
val lines:DStream[String] = ssc.socketTextStream("qphone01", 9999)
//3.
val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
//4. window : reduceByKey,window
val batchInterval = 2
val ret = pairs.reduceByKeyAndWindow(_+_,
windowDuration = Seconds(batchInterval * 3),
slideDuration = Seconds(batchInterval * 2)
)
ret.print()
ssc.start()
ssc.awaitTermination()
}
}
2.12 spark sql 集合 spark streaming
package cn.qphone.spark.streaming.day3
import cn.qphone.spark.utils.SparkUtils
import org.apache.spark.streaming.dstream.DStream
object Demo5_sparksql_streaming {
def main(args: Array[String]): Unit = {
//1. 入口類
//1.1 spark sql
val spark = SparkUtils.getDefaultSession("Demo5_sparksql_streaming")
//1.2 spark streaming
val ssc = SparkUtils.getStreamingContext(spark.sparkContext, 2)
ssc.checkpoint("file:///d:/data/out")
//2. 讀取數(shù)據(jù):001 mi mobile
val lines:DStream[String] = ssc.socketTextStream("qphone01", 9999)
//3.切割數(shù)據(jù)并過濾
val pairs:DStream[(String, Int)] = lines.map(line => {
val fields = line.split("\\s+")
if(fields == null || fields.length != 3) {
("", -1)
}else {
val brand = fields(1)
val category = fields(2)
(s"${category}_${brand}", 1)
}
}).filter(t => t._2 != -1)
//(001_Mi,1)(001_Mi,1)(002_Huawei,1)
//4. 聚合
val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
//(001_Mi,2)(002_Huawei,1)
//5. 遍歷
usb.foreachRDD((rdd, bTime) => {
if(!rdd.isEmpty()) {
//5.1 重構(gòu)數(shù)據(jù)然后獲取到DataFrame
import spark.implicits._
val df = rdd.map { case (cb, cnt) => {
val category = cb.substring(0, cb.indexOf("_"))
val brand = cb.substring(cb.indexOf("_") + 1)
(category, brand, cnt)
}}.toDF("category", "brand", "sales")
//5.2 創(chuàng)建視圖然后查詢
df.createOrReplaceTempView("tmp_category_brand_sales")
val sql =
"""
|select
|t.category,
|t.brand,
|t.sales,
|t.rank
|from
|(
|select
|category,
|brand,
|sales,
|row_number() over(partition by category order by sales desc) rank
|from
|tmp_category_brand_sales
|) t
|where
|rank < 4
|""".stripMargin
spark.sql(sql).show()
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* seq : 表示相同的key聚合之和每一次的業(yè)務(wù)操作之后的綜合
* option : 當前那次操作的狀態(tài)
*/
def updateFunc(seq:Seq[Int], option:Option[Int]):Option[Int] = Option(seq.sum + option.getOrElse(0))
}