spark streaming

一 Spark Streaming

1 介紹

參考資料

2 spark streaming第一例

2.1 導(dǎo)入依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

2.2 Spark Streaming的第一個代碼:單詞統(tǒng)計

package cn.qphone.spark.streaming.day1

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Demo1_Streaming {
    def main(args: Array[String]): Unit = {

        //1. 參數(shù)過濾
        if (args == null || args.length != 2) {
            println(
                """
                  |Useage : <hostname> <port>
                  |""".stripMargin)
            System.exit(-1)
        }

        var Array(hostname, port) = args

        //2. 獲取核心類
        val streamContext: StreamingContext = new StreamingContext(new SparkConf()
            .setAppName("Demo1_Streaming").setMaster("local[*]"), Seconds(5))

        //3. 業(yè)務(wù):單詞統(tǒng)計
        val lines:ReceiverInputDStream[String] = streamContext.socketTextStream(hostname, port.toInt)

        val retDStream:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
        retDStream.print

        //4. 為了執(zhí)行的流式計算,必須要調(diào)用start來啟動
        streamContext.start()

        //5. 為了不至于start啟動程序結(jié)束,必須要調(diào)用awaitTermination方法等待程序業(yè)務(wù)完成之后調(diào)用stop方法結(jié)束程序,或者異常
        streamContext.awaitTermination()
    }
}

2.3 配置

001.png

2.4 安裝web服務(wù)器

yum -y install nc
nc -lk qphone01 4444

2.5 Receiver

    Receiver,顧名思義,就是數(shù)據(jù)的接收者,這里把資源分成了兩部分,一部分用來接收數(shù)據(jù),一部分用來處理數(shù)據(jù)。Receiver接收到的數(shù)據(jù),說白了就是一個個的batch數(shù)據(jù),是RDD,存儲在Executor內(nèi)存。Receiver就是Executor內(nèi)存中的一部分。
    不是所有的streaming作業(yè)都需要有Receiver。
    通過下圖,來闡述基于Receiver的程序執(zhí)行的流程

2.6 Spark Streaming和HDFS

package cn.qphone.spark.streaming.day1

import cn.qphone.spark.utils.LoggerTrait
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo2_HDFS extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        //2. 獲取核心類
        val streamContext: StreamingContext = new StreamingContext(new SparkConf()
            .setAppName("Demo2_HDFS").setMaster("local[*]"), Seconds(5))

        //3. 業(yè)務(wù):單詞統(tǒng)計
        val lines: DStream[String] = streamContext.textFileStream("hdfs://192.168.49.111:9000/data")

        val retDStream:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)
        retDStream.print

        //4. 為了執(zhí)行的流式計算,必須要調(diào)用start來啟動
        streamContext.start()

        //5. 為了不至于start啟動程序結(jié)束,必須要調(diào)用awaitTermination方法等待程序業(yè)務(wù)完成之后調(diào)用stop方法結(jié)束程序,或者異常
        streamContext.awaitTermination()
    }
}

2.7 Spark Streaming和Kafka

2.7.1 導(dǎo)入依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

2.7.2 整合兩種版本的區(qū)別

002.png

2.7.3 ReceicerStream

2.7.3.1 SparkUtils
package cn.qphone.spark.utils

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {

    val DEFAULT_STREAMING_INTERVAL:Int = 5

    /*
    *  spark core
    * */
    def getContext(appName:String):SparkContext = getContext(appName, "local[*]")

    def getContext(appName:String, masterUrl:String):SparkContext = {
        val sc = new SparkContext(
            new SparkConf().setMaster(masterUrl).setAppName(appName)
        )
        sc
    }

    /*
    * spark sql
    * */
    def getDefaultSession(appName:String):SparkSession = getSessionWithNotHiveSupport(appName, "local[*]")


    def getSession(appName:String, isSupportHive:Boolean):SparkSession = {
        if (isSupportHive) getSessionWithHiveSupport(appName, "local[*]")
        else getSessionWithNotHiveSupport(appName, "local[*]")
    }

    def getSessionWithHiveSupport(appName:String, masterUrl:String):SparkSession = SparkSession.builder()
            .appName(appName).master(masterUrl).enableHiveSupport().getOrCreate() // 支持hive


    def getSessionWithNotHiveSupport(appName:String, masterUrl:String):SparkSession = SparkSession.builder().appName(appName).master(masterUrl).getOrCreate()


    /*
    * spark streaming
    * */
    def getDefaultStreamingContext(appName:String):StreamingContext = getStreamingContext(appName, "local[*]", DEFAULT_STREAMING_INTERVAL)

    def getStreamingContext(appName:String, master:String, interval:Int):StreamingContext = new StreamingContext(new SparkConf().setAppName(appName).setMaster(master), Seconds(interval))

    def stop(sparkContext: SparkContext) = {
        if (null != sparkContext && !sparkContext.isStopped) {
            sparkContext.stop()
        }
    }

    def stop(sparkSession: SparkSession) = {
        if (null != sparkSession && !sparkSession.sparkContext.isStopped) {
            sparkSession.stop()
        }
    }

    def stop(streamingContext:StreamingContext) = {
        if (null != streamingContext && !streamingContext.sparkContext.isStopped) {
            streamingContext.stop()
        }
    }
}
2.7.3.2 Demo3_Receiver
package cn.qphone.spark.streaming.day1

import cn.qphone.spark.utils.{LoggerTrait, SparkUtils}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

object Demo3_Kafka_Receiver extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_Kafka_Receiver")
        //2.準備參數(shù)
        val zkQuorum:String = "qphone01,qphone02,qphone03/kafka"
        val groupId = "streaming_receiver_hz2002"
        val topics = Map(
            "receiver_topic" -> 1
        )
        //3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
        val msgDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
        //4. 業(yè)務(wù)
        msgDStream.print
        //5. 先啟動
        ssc.start()

        //6. 監(jiān)控
        ssc.awaitTermination()
    }
}
2.7.3.3 其他步驟
##1. 啟動kafka
##2. 創(chuàng)建指定的主題
##3. 啟動代碼
##4. 利用生產(chǎn)者進程生產(chǎn)數(shù)據(jù)
2.7.3.4 關(guān)于recevicer為何被棄用
##1. 
    這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。
    然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù).
    如果要啟用高可靠機制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進行恢復(fù)。
##2. 
    需要注意的地方
    1. Kafka的topic分區(qū)和Spark Streaming中生成的RDD分區(qū)沒有關(guān)系。 在KafkaUtils.createStream中增加分區(qū)數(shù)量只會增加單個receiver的線程數(shù),不會增加Spark的并行度
    2. 可以創(chuàng)建多個的Kafka的輸入DStream, 使用不同的group和topic, 使用多個receiver并行接收數(shù)據(jù)。
    3. 如果啟用了HDFS等有容錯的存儲系統(tǒng),并且啟用了寫入日志,則接收到的數(shù)據(jù)已經(jīng)被復(fù)制到日志中。因此,輸入流的存儲級別設(shè)置StorageLevel.MEMORY_AND_DISK_SER(即使用KafkaUtils.createStream(...,StorageLevel.MEMORY_AND_DISK_SER))的存儲級別。
##3. 數(shù)據(jù)會丟失原因
003.png

2.7.4 DirectStream

package cn.qphone.spark.streaming.day2

import java.util.Properties

import cn.qphone.spark.utils.{CommonScalaUtils, CommonUtils, SparkUtils}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.JavaConversions

object Demo1_Kafka_Direct{
    def main(args: Array[String]): Unit = {
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo4_Kafka_Direct")
        //2.準備參數(shù)
        val properties = new Properties()
        properties.load(Demo1_Kafka_Direct.getClass.getClassLoader.getResourceAsStream("kafka.properties"))

//        val kafkaParams = JavaConversions.mapAsScalaMap(CommonUtils.toMap(properties)).toMap
        val kafkaParams = CommonScalaUtils.toMap(properties)
        val topics = Set(
            "hzbigdata2002"
        )
        //3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
        val msgDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

        //4. 業(yè)務(wù)
        msgDStream.print
        msgDStream.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
                val offsetRanges = offsetRDD.offsetRanges
                for(offsetRange <- offsetRanges) {
                    val topic = offsetRange.topic
                    val partition = offsetRange.partition
                    val fromOffset = offsetRange.fromOffset
                    val untilOffset = offsetRange.untilOffset
                    val rddcount = rdd.count()
                    println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}\tcount:${rddcount}")
                }
                rdd.count()
            }
        })

        //5. 先啟動
        ssc.start()

        //6. 監(jiān)控
        ssc.awaitTermination()
    }
}
  • kafka.properties
bootstrap.servers=qphone01:9092,qphone02:9092,qphone03:9092
group.id=a_streaming
auto.offset.reset=smallest
  • Java-CommonUtils
public class CommonUtils {

    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        properties.load(CommonUtils.class.getClassLoader().getResourceAsStream("kafka.properties"));
        Map<String, String> map = toMap(properties);
        System.out.println(map);
    }

    public static Map<String, String> toMap(Properties properties) {
        Map<String, String> map = new HashMap<>();
        Set<Map.Entry<Object, Object>> entries = properties.entrySet();
        for (Map.Entry entry : entries) {
            map.put((String) entry.getKey(), (String) entry.getValue());
        }
        return map;
    }
}
  • Scala-CommonScalaUtils
object CommonScalaUtils {
    def toMap(properties: Properties):immutable.Map[String, String] = { // scala.collection.immutable.Map
        val entries: util.Set[Map.Entry[AnyRef, AnyRef]] = properties.entrySet() // 獲取到properties的kv,kv是被存放到j(luò)ava.util.Set
        val set: mutable.Set[Map.Entry[AnyRef, AnyRef]] = JavaConversions.asScalaSet(entries) // 將java.util.Set轉(zhuǎn)換為scala.collection.mutable.Set
        var map = mutable.Map[String, String]() // scala.collection.mutable.Map
        set.foreach(entry => map.put(entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
        map.toMap
    }
}

2.8 偏移量保存

2.8.1 使用zk來保存具體的偏移量

  • ZKCuratorUtils
package cn.qphone.spark.utils;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class ZKCuratorUtils {

    private static final String ZK_CONNECTIONS = "qphone01,qphone02,qphone03";
    private static final int BASE_SLEEP_TIME_MS = 1000;
    private static final int MAX_RETRIES = 3;

    public static CuratorFramework getClient() {
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(ZK_CONNECTIONS, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
        client.start();
        return client;
    }
}

  • 主要代碼
package cn.qphone.spark.streaming.day2

import java.util.Properties

import cn.qphone.spark.utils.{CommonScalaUtils, LoggerTrait, SparkUtils, ZKCuratorUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat

import scala.collection._
import scala.collection.JavaConversions

object Demo2_Offset_ZK extends LoggerTrait{

    var client = ZKCuratorUtils.getClient

    def main(args: Array[String]): Unit = {
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo2_Offset_ZK")
        //2.準備參數(shù)
        val properties = new Properties()
        properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
        val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
        val topics: Predef.Set[String] = "zk_offset".split(",").toSet

        //3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
        val messageStream: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)

        //4.遍歷
        messageStream.foreachRDD((rdd, btime) => {
            if (!rdd.isEmpty()) {
                val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
                val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
                storeOffset(offsetRanges, kafkaParams("group.id"))
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 創(chuàng)建消息流
     * 偏移量保存在zk中
     * 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在zk上創(chuàng)建目錄
     * 如果不是第一次,就從指定的偏移量開始讀取,從zk的指定目錄讀取偏移量
     */
    def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
        //1. 讀取偏移量(TopicAndPartion,offset)
        val offsets:Predef.Map[TopicAndPartition, Long]  = getOffset(topics, kafkaParams("group.id"))
        //2.如果沒有偏移量就代表第一次讀取
        var msgDStream: InputDStream[(String, String)] = null
        if(offsets.isEmpty) {
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { //否則就不是第一次讀取
            val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
        }
        msgDStream
    }

    /**
     * 獲取到指定主題的各個分區(qū)的偏移量
     */
    def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
        //0. 創(chuàng)建一個可變map
        val map = mutable.Map[TopicAndPartition, Long]()
        //1. 遍歷主題
        topics.foreach(topic => {
            //2. 通過zk查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
            val path = s"/${topic}/$groupId"
            checkExist(path)
            //3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
            JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
                //4. 讀取偏移量
                val fullPath = s"$path/$partition"
                val offset = new String(client.getData.forPath(fullPath)).toLong
                val tap:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
                map.put(tap, offset)
            })
        })
        map.toMap
    }


    /**
     * 校驗此路徑是否存在,如果不存在就創(chuàng)建他,并返回其偏移量
     */
    def checkExist(path:String):Unit = {
        //1. 判斷此路徑是否存在
        val stat: Stat = client.checkExists().forPath(path)
        if (stat == null) { // 2.說明不存在
            //3. 創(chuàng)建路徑
            client.create().creatingParentsIfNeeded().forPath(path)
        }
    }

    /**
     * 存儲偏移量
     */
    def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
        for(offsetRange <- offsetRanges) {
            val topic = offsetRange.topic
            val partition = offsetRange.partition
            val fromOffset = offsetRange.fromOffset
            val untilOffset = offsetRange.untilOffset
            val path = s"/$topic/$group_id/$partition"
            checkExist(path) // 創(chuàng)建分區(qū)目錄
            client.setData().forPath(path, untilOffset.toString.getBytes())
            //存儲偏移量
            println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
        }
    }
}

2.8.2 使用redis保存偏移量

  • JedisUtils
package cn.qphone.spark.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisUtils {

    private static final String DEFAULT_HOST = "192.168.49.111";
    private static final int DEPAULT_PORT = 6379;

    private static JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), DEFAULT_HOST, DEPAULT_PORT);

    public static JedisPool getDefaultPool() {
        return jedisPool;
    }

    public static JedisPool getPool(String host, int port) {
        jedisPool = new JedisPool(new JedisPoolConfig(), host, port);
        return jedisPool;
    }

    public static Jedis getJedisWithDefaultPool() {
        Jedis jedis = jedisPool.getResource();
        jedis.auth("123456");
        return jedis;
    }

    public static Jedis getJedisWithPool(JedisPool pool) {
        if (pool != null) return pool.getResource();
        return null;
    }

    public static void returnJedisWithDefaultPool(Jedis jedis) {
        if(jedis != null) jedisPool.returnResource(jedis);
    }

    public static void returnJedisWithPool(Jedis jedis, JedisPool pool) {
        if (jedis != null && pool != null) pool.returnResource(jedis);
    }
}
  • 具體代碼
package cn.qphone.spark.streaming.day2

import java.util
import java.util.Properties

import cn.qphone.spark.utils.{CommonScalaUtils, JedisUtils, LoggerTrait, SparkUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import scala.collection.{JavaConversions, Set, mutable}

object Demo3_Offset_Redis extends LoggerTrait{
    def main(args: Array[String]): Unit = {

        //一 準備基本參數(shù)StreamingContext,kafkaParams,topics
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_Offset_Redis")
        //2.準備參數(shù)
        val properties = new Properties()
        properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
        val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
        val topics: Predef.Set[String] = "redis_offset".split(",").toSet

        //二 創(chuàng)建離散流:如果沒有offset怎么辦,有的話怎么辦
        //3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
        val messageStream: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)

        //4.遍歷
        messageStream.foreachRDD((rdd, btime) => {
            if (!rdd.isEmpty()) {
                val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
                val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges
                storeOffset(offsetRanges, kafkaParams("group.id"))
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 創(chuàng)建消息流
     * 偏移量保存在redis中
     * 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在redis上創(chuàng)建目錄
     * 如果不是第一次,就從指定的偏移量開始讀取,從redis的指定key讀取偏移量
     */
    def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
        //1. 讀取偏移量(TopicAndPartion,offset)
        val offsets:Predef.Map[TopicAndPartition, Long]  = getOffset(topics, kafkaParams("group.id"))
        //2.如果沒有偏移量就代表第一次讀取
        var msgDStream: InputDStream[(String, String)] = null
        if(offsets.isEmpty) {
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { //否則就不是第一次讀取
            val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
        }
        msgDStream
    }

    /**
     * 獲取到指定主題的各個分區(qū)的偏移量
     * 127.0.0.1:6379> HSET topic groupid 0_1
     * topic_groupid partition offset
     * key             field             value
     *
     */
    def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
        //0. 創(chuàng)建一個可變map
        val map = mutable.Map[TopicAndPartition, Long]()
        //1. 遍歷主題
        topics.foreach(topic => {
            //2. 通過redis查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
            val jedis = JedisUtils.getJedisWithDefaultPool
            val key:String = s"${topic}_${groupId}"
            val partitionSet: util.Set[String] = jedis.keys(key)
            val fields: Predef.Set[String] = JavaConversions.asScalaSet(partitionSet).toSet
            //3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
            if(!fields.isEmpty) {
                fields.foreach(partition => {
                    val offset: Long = jedis.hget(key, partition).toLong
                    val top:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
                    map.put(top, offset)
                })
            }
            JedisUtils.returnJedisWithDefaultPool(jedis)
        })
        map.toMap
    }

    /**
     * 存儲偏移量
     */
    def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
        for(offsetRange <- offsetRanges) {
            val topic = offsetRange.topic
            val partition = offsetRange.partition
            val fromOffset = offsetRange.fromOffset
            val untilOffset = offsetRange.untilOffset

            val jedis = JedisUtils.getJedisWithDefaultPool
            val key:String = s"${topic}_${group_id}"
            jedis.hset(key, partition.toString, untilOffset.toString)
            //存儲偏移量
            println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
            JedisUtils.returnJedisWithDefaultPool(jedis)
        }
    }
}

  • 導(dǎo)入依賴
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.1.0</version>
</dependency>

2.8.3 保存偏移量到hbase

作業(yè)

2.9 冪等

2.9.1 介紹

    為了實現(xiàn)結(jié)果輸出的一次語義,將數(shù)據(jù)保存到外部數(shù)據(jù)存儲的輸出操作必須是冪等的,或者是保存結(jié)果和偏移量的原子轉(zhuǎn)換
    冪等(idempotent、idempotence)是一個數(shù)學與計算機學概念,常見于抽象代數(shù)中。
    在編程中一個冪等操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。冪等函數(shù),或冪等方法,是指可以使用相同參數(shù)重復(fù)執(zhí)行,并能獲得相同結(jié)果的函數(shù)。這些函數(shù)不會影響系統(tǒng)狀態(tài),也不用擔心重復(fù)執(zhí)行會對系統(tǒng)造成改變。例如,“setTrue()”函數(shù)就是一個冪等函數(shù),無論多次執(zhí)行,其結(jié)果都是一樣的.更復(fù)雜的操作冪等保證是利用唯一交易號(流水號)實現(xiàn).
    f(f(x)) = f(x)

2.9.2 測試

  • 需求
## 需求:將kafka中的消息保存到mysql數(shù)據(jù)庫,將偏移量保存到zk
  • KafkaTools
package cn.qphone.spark.utils

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
import org.apache.zookeeper.data.Stat

import scala.collection.{JavaConversions, _}

object KafkaTools {

    var client = ZKCuratorUtils.getClient

    /**
     * 創(chuàng)建消息流
     * 偏移量保存在zk中
     * 如果是第一次讀取到這個內(nèi)容,就從頭開始讀取,并且在zk上創(chuàng)建目錄
     * 如果不是第一次,就從指定的偏移量開始讀取,從zk的指定目錄讀取偏移量
     */
    def createMsg(ssc:StreamingContext, kafkaParams:Predef.Map[String, String], topics:Predef.Set[String]):InputDStream[(String, String)] = {
        //1. 讀取偏移量(TopicAndPartion,offset)
        val offsets:Predef.Map[TopicAndPartition, Long]  = getOffset(topics, kafkaParams("group.id"))
        //2.如果沒有偏移量就代表第一次讀取
        var msgDStream: InputDStream[(String, String)] = null
        if(offsets.isEmpty) {
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { //否則就不是第一次讀取
            val messagehandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
            msgDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offsets, messagehandler)
        }
        msgDStream
    }

    /**
     * 獲取到指定主題的各個分區(qū)的偏移量
     */
    def getOffset(topics:Set[String], groupId:String):Predef.Map[TopicAndPartition, Long] = {
        //0. 創(chuàng)建一個可變map
        val map = mutable.Map[TopicAndPartition, Long]()
        //1. 遍歷主題
        topics.foreach(topic => {
            //2. 通過zk查詢這個主題目錄是否存在,如果不存在說明之前沒有創(chuàng)建,說明是第一次,第一次就需要先創(chuàng)建這個目錄
            val path = s"/${topic}/$groupId"
            checkExist(path)
            //3. 讀取zk的path目錄下的子目錄,因為這個子目錄全是保存的分區(qū)目錄
            JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
                //4. 讀取偏移量
                val fullPath = s"$path/$partition"
                val offset = new String(client.getData.forPath(fullPath)).toLong
                val tap:TopicAndPartition = new TopicAndPartition(topic, partition.toInt)
                map.put(tap, offset)
            })
        })
        map.toMap
    }


    /**
     * 校驗此路徑是否存在,如果不存在就創(chuàng)建他,并返回其偏移量
     */
    def checkExist(path:String):Unit = {
        //1. 判斷此路徑是否存在
        val stat: Stat = client.checkExists().forPath(path)
        if (stat == null) { // 2.說明不存在
            //3. 創(chuàng)建路徑
            client.create().creatingParentsIfNeeded().forPath(path)
        }
    }

    /**
     * 存儲偏移量
     */
    def storeOffset(offsetRanges: Array[OffsetRange], group_id:String) = {
        for(offsetRange <- offsetRanges) {
            val topic = offsetRange.topic
            val partition = offsetRange.partition
            val fromOffset = offsetRange.fromOffset
            val untilOffset = offsetRange.untilOffset
            val path = s"/$topic/$group_id/$partition"
            checkExist(path) // 創(chuàng)建分區(qū)目錄
            client.setData().forPath(path, untilOffset.toString.getBytes())
            //存儲偏移量
            println(s"topic:${topic}\tpartition:${partition}\tstart:${fromOffset}\tend:${untilOffset}")
        }
    }
}
  • 開啟生產(chǎn)者
kafka-console-producer.sh \
--topic mytopic1 \
--broker-list qphone01:9092,qphone02:9092,qphone03:9092

tip:
lixi,1 rocklee,2 lee,3
  • 代碼
package cn.qphone.spark.streaming.day2

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

import cn.qphone.spark.streaming.day2.Demo2_Offset_ZK.{createMsg, storeOffset}
import cn.qphone.spark.utils.{CommonScalaUtils, KafkaTools, SparkUtils}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}

/**
 *
 *
 *    1. 創(chuàng)建測試的mysql數(shù)據(jù)庫
 *         create database test;
 *    2. 建表
 *        create table myorders(name varchar(20), orderid varchar(100) primary key);
 *    3. 新建topic: mytopic1
 *        kafka-topics.sh --zookeeper qphone01,qphone02,qphone03/kafka --create --topic mytopic1 --partitions 3 --replication-factor 1
 *    4. 往mytopic1發(fā)送數(shù)據(jù), 數(shù)據(jù)格式為 "字符,數(shù)字"  比如  abc,3
 **/
object Demo4_Idempotent {
    def main(args: Array[String]): Unit = {
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo4_Idempotent")
        //2.準備參數(shù)
        val properties = new Properties()
        properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
        val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
        val topics: Predef.Set[String] = "mytopic1".split(",").toSet
        //3. 通過配置參數(shù)獲取到kafka中的數(shù)據(jù)離散流
        val messageStream: InputDStream[(String, String)] = KafkaTools.createMsg(ssc, kafkaParams, topics)
        //4.遍歷
        messageStream.foreachRDD((rdd, btime) => {
            if (!rdd.isEmpty()) {
                val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
                val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges

                //4. 保存數(shù)據(jù)
                rdd.map(kv => {
                        println(kv + "---->")
                        println(kv._2 + "=====>")
                        kv._2
                    }).foreachPartition(partition =>{
                    val connection: Connection = DriverManager.getConnection("jdbc:mysql://192.168.49.111:3306/test", "root", "123456")
                    val sql = "insert into myorders(orderid, name) values (?, ?) ON DUPLICATE KEY UPDATE orderid=?"
                    val statement: PreparedStatement = connection.prepareStatement(sql)
                    partition.foreach(msg => {
                        val name:String = msg.split(",")(0)
                        val id:String = msg.split(",")(1)
                        statement.setString(1, id)
                        statement.setString(2, name)
                        statement.setString(3, id)
                        statement.execute()
                    })
                    connection.close()
                })
                //5. 保存偏移量
                KafkaTools.storeOffset(offsetRanges, kafkaParams("group.id"))
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }
}

2.10 原子性操作

2.10.1 建表語句

1. 創(chuàng)建測試的mysql數(shù)據(jù)庫
create database test;
2. 新建topic: mytopic1
kafka-topics.sh --zookeeper qphone01,qphone02,qphone03/kafka --create --topic mytopic1 --partitions 3 --replication-factor 1
3. 建表
create table `test`.mytopic(topic varchar(200), groupid varchar(20), partid int, offset bigint);
create table `test`.mydata(data varchar(200), id int);

初始化表:
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',0,0);
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',1,0);
insert into mytopic(topic, groupid, partid, offset) values('mytopic1','hzbigdata2002',2,0);
4. 往mytopic1發(fā)送數(shù)據(jù), 數(shù)據(jù)格式為 "字符,數(shù)字"  比如  abc,3
5. 在pom文件加入依賴
<dependency>
   <groupId>org.scalikejdbc</groupId>
   <artifactId>scalikejdbc_2.11</artifactId>
   <version>3.2.0</version>
</dependency>

2.10.2 代碼

package cn.qphone.spark.streaming.day3

import java.util.Properties

import cn.qphone.spark.streaming.day2.Demo2_Offset_ZK
import cn.qphone.spark.utils.{CommonScalaUtils, LoggerTrait, SparkUtils}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.TaskContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import scalikejdbc.{ConnectionPool, DB}
import scalikejdbc._

object Demo1_Atometic extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        //1. 獲取到context
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo1_Atometic")
        //2.準備參數(shù)
        val properties = new Properties()
        properties.load(Demo2_Offset_ZK.getClass.getClassLoader.getResourceAsStream("kafka.properties"))
        val kafkaParams: Predef.Map[String, String] = CommonScalaUtils.toMap(properties)
        val topics: Predef.Set[String] = "mytopic1".split(",").toSet

        //3. 先要通過jdbc將指定主題的偏移量和分區(qū)讀取出來
        val driver = "com.mysql.jdbc.Driver"
        val jdbcUrl =  "jdbc:mysql://192.168.49.111:3306/test"
        val jdbcUser = "root"
        val jdbcPassword = "123456"

        val group_id = "hzbigdata2002"

        // 注冊驅(qū)動
        Class.forName(driver)
        // 設(shè)置連接池,不用返回對象,因為在底層,scala的jdbc會給你隱式的創(chuàng)建連接對象
        ConnectionPool.singleton(jdbcUrl, jdbcUser, jdbcPassword)

        // scala jdbc查詢數(shù)據(jù)庫的內(nèi)容
        val fromOffsets = DB.readOnly {
            implicit session => sql"select topic, partid, offset from mytopic"
                .map { r =>
                    TopicAndPartition(r.string(1), r.int(2)) -> r.long(3)
                }.list.apply().toMap
        }

        //4. 創(chuàng)建離散流
       val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
       val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

       //5. 寫操作
        messages.foreachRDD((rdd, btime) => {
            if(!rdd.isEmpty()) {
                val offsetRDD = rdd.asInstanceOf[HasOffsetRanges]
                val offsetRanges: Array[OffsetRange] = offsetRDD.offsetRanges

                rdd.foreachPartition(partition => {
                    //獲取到指定分區(qū)的偏移量范圍
                    val pOffsetRange: OffsetRange = offsetRanges(TaskContext.getPartitionId())
                    //6. 開啟事務(wù)
                    DB.localTx {implicit session => {
                        partition.foreach(msg => { // (key, value)
                            //1. 插入數(shù)據(jù)
                            val data = msg._2.split(",")(0)
                            val id =msg._2.split(",")(1)
                            val dataResult =
                                sql"""
                                    insert into mydata(data,id) values (${data},${id})
                                   """.execute().apply()
                        })
//                        println(1/0)
                        val offsetResult =
                            sql"""
                                 update
                                 mytopic
                                 set
                                 offset = ${pOffsetRange.untilOffset}
                                 where
                                 topic = ${pOffsetRange.topic}
                                 and
                                 partid = ${pOffsetRange.partition}
                                 and
                                 groupid = ${group_id }
                                 """.update.apply()
                    }}
                })
            }
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

2.11 Spark Streaming常見的轉(zhuǎn)換算子

函數(shù)名稱 描述
map(func) 對DStream中的各個元素進行func函數(shù)參數(shù),返回一個新的DStream
flatMap(func) 與map相似,只不過各個輸入項可以被輸出為零個或者多個輸出項
filter(func) 過濾出所有函數(shù)func返回值為true的Dstream元素并返回一個新的DStream
repartition(numPartitions) 增加或減少DStream中的分區(qū)數(shù),從而改變DStream的并行度
union(otherDStream) 將數(shù)個DStream合并,返回一個新的DStream
count() 通過DStream中的各個RDD中的元素計數(shù),返回只有一個元素的RDD構(gòu)成DStream
reduce(func) 對源DStream中的各個RDD的元素利用func函數(shù)進行聚合操作,然后返回只有一個元數(shù)的RDD構(gòu)成的新的DStream
countByValue() 對于元素類型為K的DStream,返回一個元數(shù)為(K,Long)鍵值對行的新的DStream,Long對應(yīng)的值為源DStream中個RDD的key的出現(xiàn)的個數(shù)
reduceByKey(func, [numTasks]) 利用func函數(shù)對源DStream中key進行聚合操作,然會新的(K, V)的新的DStream
join(otherDStream)
cogroup(otherDStream)
transform(func)
updateStateByKey(func)
window

2.11.1 transform

    可以獲取到內(nèi)部rdd,對其就行轉(zhuǎn)換處理
    transform是一個transformation算子,轉(zhuǎn)換算子。
    DStream上述提供的所有的transformation操作,都是DStream-2-DStream操作,每一個DStream和RDD的直接操作,而DStream本質(zhì)上是一系列RDD,所以RDD-2-RDD操作是顯然被需要的,所以此時官方api中提供了一個為了達成此操作的算子——transform操作。

    其最最最經(jīng)典的實現(xiàn)就是DStream和rdd的join操作,還有dstream重分區(qū)(分區(qū)減少,coalsce)。

    也就是說transform主要就是用來自定義官方api沒有提供的一些操作。
  • 黑名單案例
object Demo2_BlackList extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo2_BlackList")
        //黑名單RDD
        /**
         * 110.52.250.126##2016-05-30 17:38:20##GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1##200##1292
         *
         * 110.52.250.127##2016-05-30 17:38:20##GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1##200##1292
         */
        val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List(
            ("27.19.74.143", true),
            ("110.52.250.126", true)
        ))

        //接入外部的數(shù)據(jù)流
        val lines:DStream[String] = ssc.socketTextStream("192.168.49.111", 9999)
        val ip2OtherDStream:DStream[(String, String)] = lines.map(line => {
            val index = line.indexOf("##")
            val ip = line.substring(0, index)
            val other = line.substring(index + 2)
            (ip, other)
        })

        val filteredDStream: DStream[(String, String)] = ip2OtherDStream.transform(rdd => {
            val join = rdd.leftOuterJoin(blacklistRDD)
            join.filter {case (ip, (left, right)) => {
                !right.isDefined // 取沒有定義右邊
            }}.map{case (ip, (left, right)) => {
                (ip, left)
            }}
        })

        filteredDStream.print()

        ssc.start()

        ssc.awaitTermination()
    }
}

2.11.2 updateStateByKey

    updateStateByKey(func)  根據(jù)于key的前置狀態(tài)和key的新值,對key進行更新,返回一個新狀態(tài)的Dstream。

    人話:統(tǒng)計截止到目前為止key的狀態(tài)。統(tǒng)計全局的數(shù)據(jù)

    通過分析,我們需要清楚:在這個操作中需要兩個數(shù)據(jù),一個是key的前置狀態(tài),一個是key的新增(當前批次的數(shù)據(jù));還有歷史數(shù)據(jù)(前置狀態(tài))得需要存儲在磁盤,不應(yīng)該保存在內(nèi)存中。

    同時key的前置狀態(tài)可能有可能沒有。
  • 代碼
package cn.qphone.spark.streaming.day3

import cn.qphone.spark.utils.{LoggerTrait, SparkUtils}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

object Demo3_WordCount extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        val ssc: StreamingContext = SparkUtils.getDefaultStreamingContext("Demo3_WordCount")
        val lines:DStream[String] = ssc.socketTextStream("192.168.49.111", 9999)

        ssc.checkpoint("file:/d:/data/chk") // 必須,持久化

        val retDStream:DStream[(String, Int)] =
            lines.flatMap(_.split("\\s+")).map((_, 1)).updateStateByKey(updateFunc _, 1)

        retDStream.print

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     *
     * @param seq 以前的歷史的值
     * @param option 當前的值
     */
    def updateFunc(seq:Seq[Int], option:Option[Int]):Option[Int] = {
        println(s"seq : ${seq.mkString(",")}")
        println(s"currnt : ${option.getOrElse("empty")}")
        Option(seq.sum + option.getOrElse(0))
    }
}

2.11.3 window算子

window操作其實就是窗口函數(shù)。Spark Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù),會被聚合起來執(zhí)行計算操作,然后生成RDD,會作為window DStream的一個RDD。比如下圖中,每3秒鐘的數(shù)據(jù)會執(zhí)行一次滑動窗口計算,這3秒內(nèi)的3個RDD會被聚合起來進行處理,然后過了2秒種,又會對最近的3秒的數(shù)據(jù)行滑動窗口計算。所以每個滑動窗口操作,都必須指定兩個參數(shù):窗口長度,以及滑動間隔。而且這兩個參數(shù)必須是batch間隔的整數(shù)倍。
004.png
object Demo4_Window extends LoggerTrait{
    def main(args: Array[String]): Unit = {
        //1. 入口類
        val ssc = SparkUtils.getDefaultStreamingContext("Demo4_Window")
        //2. 讀取數(shù)據(jù)
        val lines:DStream[String] = ssc.socketTextStream("qphone01", 9999)
        //3.
        val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
        //4. window : reduceByKey,window
        val batchInterval = 2
        val ret = pairs.reduceByKeyAndWindow(_+_,
            windowDuration = Seconds(batchInterval * 3),
            slideDuration = Seconds(batchInterval * 2)
        )
        ret.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

2.12 spark sql 集合 spark streaming

package cn.qphone.spark.streaming.day3

import cn.qphone.spark.utils.SparkUtils
import org.apache.spark.streaming.dstream.DStream

object Demo5_sparksql_streaming {
    def main(args: Array[String]): Unit = {
        //1. 入口類
        //1.1 spark sql
        val spark = SparkUtils.getDefaultSession("Demo5_sparksql_streaming")
        //1.2 spark streaming
        val ssc = SparkUtils.getStreamingContext(spark.sparkContext, 2)
        ssc.checkpoint("file:///d:/data/out")
        //2. 讀取數(shù)據(jù):001 mi mobile
        val lines:DStream[String] = ssc.socketTextStream("qphone01", 9999)

        //3.切割數(shù)據(jù)并過濾
        val pairs:DStream[(String, Int)] = lines.map(line => {
            val fields = line.split("\\s+")
            if(fields == null || fields.length != 3) {
                ("", -1)
            }else {
                val brand = fields(1)
                val category = fields(2)
                (s"${category}_${brand}", 1)
            }
        }).filter(t => t._2 != -1)
        //(001_Mi,1)(001_Mi,1)(002_Huawei,1)


        //4. 聚合
        val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)

        //(001_Mi,2)(002_Huawei,1)

        //5. 遍歷
        usb.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                //5.1 重構(gòu)數(shù)據(jù)然后獲取到DataFrame
                import spark.implicits._

                val df = rdd.map { case (cb, cnt) => {
                    val category = cb.substring(0, cb.indexOf("_"))
                    val brand = cb.substring(cb.indexOf("_") + 1)
                    (category, brand, cnt)
                }}.toDF("category", "brand", "sales")

                //5.2 創(chuàng)建視圖然后查詢
                df.createOrReplaceTempView("tmp_category_brand_sales")
                val sql =
                    """
                      |select
                      |t.category,
                      |t.brand,
                      |t.sales,
                      |t.rank
                      |from
                      |(
                      |select
                      |category,
                      |brand,
                      |sales,
                      |row_number() over(partition by category order by sales desc) rank
                      |from
                      |tmp_category_brand_sales
                      |) t
                      |where
                      |rank < 4
                      |""".stripMargin
                spark.sql(sql).show()
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * seq : 表示相同的key聚合之和每一次的業(yè)務(wù)操作之后的綜合
     * option : 當前那次操作的狀態(tài)
     */
    def updateFunc(seq:Seq[Int], option:Option[Int]):Option[Int] = Option(seq.sum + option.getOrElse(0))
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容