面試題2

17.分區(qū)分桶的區(qū)別,為什么要分區(qū)

分區(qū)表:原來(lái)的一個(gè)大表存儲(chǔ)的時(shí)候分成不同的數(shù)據(jù)目錄進(jìn)行存儲(chǔ)。如果說(shuō)是單分區(qū)表,那么在表的目錄下就只有一級(jí)子目錄,如果說(shuō)是多分區(qū)表,那么在表的目錄下有多少分區(qū)就有多少級(jí)子目錄。不管是單分區(qū)表,還是多分區(qū)表,在表的目錄下,和非最終分區(qū)目錄下是不能直接存儲(chǔ)數(shù)據(jù)文件的

分桶表:原理和hashpartitioner 一樣,將hive中的一張表的數(shù)據(jù)進(jìn)行歸納分類的時(shí)候,歸納分類規(guī)則就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)

分區(qū)表和分桶的區(qū)別除了存儲(chǔ)的格式不同外,最主要的是作用:

分區(qū)表:細(xì)化數(shù)據(jù)管理,縮小mapreduce程序 需要掃描的數(shù)據(jù)量

分桶表:提高join查詢的效率,在一份數(shù)據(jù)會(huì)被經(jīng)常用來(lái)做連接查詢的時(shí)候建立分桶,分桶字段就是連接字段;提高采樣的效率。

有了分區(qū)為什么還要分桶?

(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時(shí)能利用這個(gè)結(jié)構(gòu)。

(2)使取樣( sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時(shí),在開發(fā)和修改査詢的階段,如果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運(yùn)行查詢,會(huì)帶來(lái)很多方便。

分桶是相對(duì)分區(qū)進(jìn)行更細(xì)粒度的劃分。分桶將表或者分區(qū)的某列值進(jìn)行hash值進(jìn)行區(qū)分,如要安裝name屬性分為3個(gè)桶,就是對(duì)name屬性值的hash值對(duì)3取摸,按照取模結(jié)果對(duì)數(shù)據(jù)分桶。

與分區(qū)不同的是,分區(qū)依據(jù)的不是真實(shí)數(shù)據(jù)表文件中的列,而是我們指定的偽列,但是分桶是依據(jù)數(shù)據(jù)表中真實(shí)的列而不是偽列

18.mapjoin的原理

MapJoin通常用于一個(gè)很小的表和一個(gè)大表進(jìn)行join的場(chǎng)景,具體小表有多小,由參數(shù)hive.mapjoin.smalltable.filesize來(lái)決定,該參數(shù)表示小表的總大小,默認(rèn)值為25000000字節(jié),即25M。

Hive0.7之前,需要使用hint提示 *+ mapjoin(table) */才會(huì)執(zhí)行MapJoin,否則執(zhí)行Common Join,但在0.7版本之后,默認(rèn)自動(dòng)會(huì)轉(zhuǎn)換Map Join,由參數(shù)hive.auto.convert.join來(lái)控制,默認(rèn)為true.

假設(shè)a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執(zhí)行時(shí)候會(huì)自動(dòng)轉(zhuǎn)化為MapJoin。

MapJoin簡(jiǎn)單說(shuō)就是在Map階段將小表讀入內(nèi)存,順序掃描大表完成Join。減少昂貴的shuffle操作及reduce操作

MapJoin分為兩個(gè)階段:

通過(guò)MapReduce Local Task,將小表讀入內(nèi)存,生成HashTableFiles上傳至Distributed Cache中,這里會(huì)HashTableFiles進(jìn)行壓縮。

MapReduce Job在Map階段,每個(gè)Mapper從Distributed Cache讀取HashTableFiles到內(nèi)存中,順序掃描大表,在Map階段直接進(jìn)行Join,將數(shù)據(jù)傳遞給下一個(gè)MapReduce任務(wù)。

19.在hive的row_number中distribute by 和 partition by的區(qū)別?

20.hive開發(fā)中遇到什么問(wèn)題?

21.什么時(shí)候使用內(nèi)部表,什么時(shí)候使用外部表

Hive內(nèi)部表外部表區(qū)別及各自使用場(chǎng)景

22.hive都有哪些函數(shù),你平常工作中用到哪些

數(shù)學(xué)函數(shù)

round(DOUBLE a)

floor(DOUBLE a)

ceil(DOUBLE a)

rand()

集合函數(shù)

size(Map<K.V>)

map_keys(Map<K.V>)

map_values(Map<K.V>)

array_contains(Array<T>, value)

sort_array(Array<T>)

類型轉(zhuǎn)換函數(shù)

cast(expr as <type>)

日期函數(shù)

date_format函數(shù)(根據(jù)格式整理日期)

date_add、date_sub函數(shù)(加減日期)

next_day函數(shù)

last_day函數(shù)(求當(dāng)月最后一天日期)

collect_set函數(shù)

get_json_object解析json函數(shù)

from_unixtime(bigint unixtime, string format)

to_date(string timestamp)

year(string date)

month(string date)

hour(string date)

weekofyear(string date)

datediff(string enddate, string startdate)

add_months(string start_date, int num_months)

date_format(date/timestamp/string ts, string fmt)

條件函數(shù)

if(boolean testCondition, T valueTrue, T valueFalseOrNull)

nvl(T value, T default_value)

COALESCE(T v1, T v2, ...)

CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END

isnull( a )

isnotnull ( a )

字符函數(shù)

concat(string|binary A, string|binary B...)

concat_ws(string SEP, string A, string B...)

get_json_object(string json_string, string path)

length(string A)

lower(string A) lcase(string A)

parse_url(string urlString, string partToExtract [, string keyToExtract])

regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT)

reverse(string A)

split(string str, string pat)

substr(string|binary A, int start) substring(string|binary A, int start)

聚合函數(shù)

count? sum min max avg

表生成函數(shù)

explode(array<TYPE> a)

explode(ARRAY)

json_tuple(jsonStr, k1, k2, ...)

parse_url_tuple(url, p1, p2, ...)

23.手寫sql,連續(xù)活躍用戶

大廠高頻面試題-連續(xù)登錄問(wèn)題

24.left semi?join和left?join區(qū)別

left semi join和left join區(qū)別

25.group by為什么要排序

26.說(shuō)說(shuō)印象最深的一次優(yōu)化場(chǎng)景,hive常見的優(yōu)化思路

Hive調(diào)優(yōu),數(shù)據(jù)工程師成神之路

27.聊聊hive的執(zhí)行引擎,spark和mr的區(qū)別?

28.hive的join底層mr是如何實(shí)現(xiàn)的?

29.sql問(wèn)題,連續(xù)幾天活躍的用戶?

大廠高頻面試題-連續(xù)登錄問(wèn)題

30.建好了外部表,用什么語(yǔ)句把數(shù)據(jù)文件加載到表里

31.Hive的執(zhí)行流程?

32.hive的元數(shù)據(jù)信息存儲(chǔ)在哪?

33.sql語(yǔ)句的執(zhí)行順序from-where-group by-having -select-order by -limit

34.on和where的區(qū)別

left join(on&where)

35.hive和傳統(tǒng)數(shù)據(jù)庫(kù)之間的區(qū)別

1、寫時(shí)模式和讀時(shí)模式

傳統(tǒng)數(shù)據(jù)庫(kù)是寫時(shí)模式,在load過(guò)程中,提升了査詢性能,因?yàn)轭A(yù)先解析之后可以對(duì)列建立索引,并壓縮,但這樣也會(huì)花費(fèi)更多的加載時(shí)間。

Hive是讀時(shí)模式,1 oad data非常迅速,因?yàn)樗恍枰x取數(shù)據(jù)進(jìn)行解析,僅僅進(jìn)行文件的復(fù)制或者移動(dòng)。

2、數(shù)據(jù)格式。Hive中沒(méi)有定義專門的數(shù)據(jù)格式,由用戶指定,需要指定三個(gè)屬性:列分隔符,行分隔符,以及讀取文件數(shù)據(jù)的方法。數(shù)據(jù)庫(kù)中,存儲(chǔ)引擎定義了自己的數(shù)據(jù)格式。所有數(shù)據(jù)都會(huì)按照一定的組織存儲(chǔ)

3、數(shù)據(jù)更新。Hive的內(nèi)容是讀多寫少的,因此,不支持對(duì)數(shù)據(jù)的改寫和刪除,數(shù)據(jù)都在加載的時(shí)候中確定好的。數(shù)據(jù)庫(kù)中的數(shù)據(jù)通常是需要經(jīng)常進(jìn)行修改

4、執(zhí)行延遲。Hive在查詢數(shù)據(jù)的時(shí)候,需要掃描整個(gè)表(或分區(qū)),因此延遲較高,只有在處理大數(shù)據(jù)是才有優(yōu)勢(shì)。數(shù)據(jù)庫(kù)在處理小數(shù)據(jù)是執(zhí)行延遲較低。

5、索引。Hive比較弱,不適合實(shí)時(shí)查詢。數(shù)據(jù)庫(kù)有。

6、執(zhí)行。Hive是 Mapreduce,數(shù)據(jù)庫(kù)是 Executor

7、可擴(kuò)展性。Hive高,數(shù)據(jù)庫(kù)低

8、數(shù)據(jù)規(guī)模。Hive大,數(shù)據(jù)庫(kù)小

36.hive中導(dǎo)入數(shù)據(jù)的4種方式

從本地導(dǎo)入:load data local inpath home/liuzc into table ods.test

從hdfs導(dǎo)入:load data inpath user/hive/warehouse/a.txt into ods.test

查詢導(dǎo)入:create table tmp_test as select * from ods.test

查詢結(jié)果導(dǎo)入:insert into table tmp.test select * from ods.test

三.Spark

1.rdd的屬性

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

2.算子分為哪幾類(RDD支持哪幾種類型的操作)

轉(zhuǎn)換(Transformation) ?現(xiàn)有的RDD通過(guò)轉(zhuǎn)換生成一個(gè)新的RDD。lazy模式,延遲執(zhí)行。

轉(zhuǎn)換函數(shù)包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce?等等。

動(dòng)作(Action) ?在RDD上運(yùn)行計(jì)算,并返回結(jié)果給驅(qū)動(dòng)程序(Driver)或?qū)懭胛募到y(tǒng)。

動(dòng)作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。

collect ?該方法把數(shù)據(jù)收集到driver端 ??Array數(shù)組類型

所有的transformation只有遇到action才能被執(zhí)行。

當(dāng)觸發(fā)執(zhí)行action之后,數(shù)據(jù)類型不再是rdd了,數(shù)據(jù)就會(huì)存儲(chǔ)到指定文件系統(tǒng)中,或者直接打印結(jié)?果或者收集起來(lái)。

3.創(chuàng)建rdd的幾種方式

1.集合并行化創(chuàng)建(有數(shù)據(jù))

val arr = Array(1,2,3,4,5)

val rdd = sc.parallelize(arr)

val rdd =sc.makeRDD(arr)

2.讀取外部文件系統(tǒng),如hdfs,或者讀取本地文件(最常用的方式)(沒(méi)數(shù)據(jù))

val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")

// 讀取本地文件

val rdd2 = sc.textFile(“file:///root/words.txt”)

3.從父RDD轉(zhuǎn)換成新的子RDD

調(diào)用Transformation類的方法,生成新的RDD

4.spark運(yùn)行流程

Worker的功能:定時(shí)和master通信;調(diào)度并管理自身的executor

executor:由Worker啟動(dòng)的,程序最終在executor中運(yùn)行,(程序運(yùn)行的一個(gè)容器)

spark-submit命令執(zhí)行時(shí),會(huì)根據(jù)master地址去向 Master發(fā)送請(qǐng)求,

Master接收到Dirver端的任務(wù)請(qǐng)求之后,根據(jù)任務(wù)的請(qǐng)求資源進(jìn)行調(diào)度,(打散的策略),盡可能的?把任務(wù)資源平均分配,然后向WOrker發(fā)送指令

Worker收到Master的指令之后,就根據(jù)相應(yīng)的資源,啟動(dòng)executor(cores,memory)

executor會(huì)向dirver端建立請(qǐng)求,通知driver,任務(wù)已經(jīng)可以運(yùn)行了

driver運(yùn)行任務(wù)的時(shí)候,會(huì)把任務(wù)發(fā)送到executor中去運(yùn)行。

5.Spark中coalesce與repartition的區(qū)別

1)關(guān)系:

兩者都是用來(lái)改變 RDD 的 partition 數(shù)量的,repartition 底層調(diào)用的就是 coalesce 方法:coalesce(numPartitions, shuffle = true)

2)區(qū)別:

repartition 一定會(huì)發(fā)生 shuffle,coalesce 根據(jù)傳入的參數(shù)來(lái)判斷是否發(fā)生 shuffle

一般情況下增大 rdd 的 partition 數(shù)量使用 repartition,減少 partition 數(shù)量時(shí)使用coalesce

6.sortBy 和 sortByKey的區(qū)別

sortBy既可以作用于RDD[K] ,還可以作用于RDD[(k,v)]

sortByKey ?只能作用于 RDD[K,V] 類型上。

7.map和mapPartitions的區(qū)別

8.數(shù)據(jù)存入Redis ?優(yōu)先使用map mapPartitions ?foreach ?foreachPartions哪個(gè)

使用 foreachPartition

???* 1,map mapPartition ??是轉(zhuǎn)換類的算子, 有返回值

???* 2, 寫mysql,redis?的連接

???foreach ?* 100萬(wàn) ????????100萬(wàn)次的連接

???foreachPartions * 200 個(gè)分區(qū) ????200次連接 ?一個(gè)分區(qū)中的數(shù)據(jù),共用一個(gè)連接

foreachParititon 每次迭代一個(gè)分區(qū),foreach每次迭代一個(gè)元素。

該方法沒(méi)有返回值,或者Unit

主要作用于,沒(méi)有返回值類型的操作(打印結(jié)果,寫入到mysql數(shù)據(jù)庫(kù)中)

在寫入到redis,mysql的時(shí)候,優(yōu)先使用foreachPartititon

9.reduceByKey和groupBykey的區(qū)別

reduceByKey會(huì)傳一個(gè)聚合函數(shù), 相當(dāng)于 ?groupByKey?+ mapValues

reduceByKey 會(huì)有一個(gè)分區(qū)內(nèi)聚合,而groupByKey沒(méi)有 ?最核心的區(qū)別 ?

結(jié)論:reduceByKey有分區(qū)內(nèi)聚合,更高效,優(yōu)先選擇使用reduceByKey。

10.cache和checkPoint的比較

都是做 RDD 持久化的

1.緩存,是在觸發(fā)action之后,把數(shù)據(jù)寫入到內(nèi)存或者磁盤中。不會(huì)截?cái)嘌夑P(guān)系

(設(shè)置緩存級(jí)別為memory_only:內(nèi)存不足,只會(huì)部分緩存或者沒(méi)有緩存,緩存會(huì)丟失,memory_and_disk :內(nèi)存不足,會(huì)使用磁盤)

2.checkpoint 也是在觸發(fā)action之后,執(zhí)行任務(wù)。單獨(dú)再啟動(dòng)一個(gè)job,負(fù)責(zé)寫入數(shù)據(jù)到hdfs中。(把rdd中的數(shù)據(jù),以二進(jìn)制文本的方式寫入到hdfs中,有幾個(gè)分區(qū),就有幾個(gè)二進(jìn)制文件)

3.某一個(gè)RDD被checkpoint之后,他的父依賴關(guān)系會(huì)被刪除,血緣關(guān)系被截?cái)?,該RDD轉(zhuǎn)換成了CheckPointRDD,以后再對(duì)該rdd的所有操作,都是從hdfs中的checkpoint的具體目錄來(lái)讀取數(shù)據(jù)。緩存之后,rdd的依賴關(guān)系還是存在的。

11.spark streaming流式統(tǒng)計(jì)單詞數(shù)量代碼

object WordCountAll {

? newValues當(dāng)前批次的出現(xiàn)的單詞次數(shù), runningCount表示之前運(yùn)行的單詞出現(xiàn)的結(jié)果

* def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

? ? val newCount =? newValues.sum + runningCount.getOrElse(0)// 將歷史前幾個(gè)批次的值和當(dāng)前批次的值進(jìn)行累加返回當(dāng)前批次最終的結(jié)果

? ? Some(newCount)

? }*/

? **

? ? * String : 單詞 hello

? ? * Seq[Int] :?jiǎn)卧~在當(dāng)前批次出現(xiàn)的次數(shù)

? ? * Option[Int] :歷史結(jié)果

? ? */

? val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {

? ? iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))

? ? iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}

? }

? 屏蔽日志

? Logger.getLogger("org.apache").setLevel(Level.ERROR)

? def main(args: Array[String]) {

? ? 必須要開啟2個(gè)以上的線程,一個(gè)線程用來(lái)接收數(shù)據(jù),另外一個(gè)線程用來(lái)計(jì)算

? ? val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

? ? ? 設(shè)置sparkjob計(jì)算時(shí)所采用的序列化方式

? ? ? .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

? ? ? .set("spark.rdd.compress", "true")? 節(jié)約大量的內(nèi)存內(nèi)容

? ? 如果你的程序出現(xiàn)垃圾回收時(shí)間過(guò)程,可以設(shè)置一下java的垃圾回收參數(shù)

? ? 同時(shí)也會(huì)創(chuàng)建sparkContext對(duì)象

? ? 批次時(shí)間 >= 批次處理的總時(shí)間 (批次數(shù)據(jù)量,集群的計(jì)算節(jié)點(diǎn)數(shù)量和配置)

? ? val ssc = new StreamingContext(conf, Seconds(5))

? ? 做checkpoint 寫入共享存儲(chǔ)中

? ? ssc.checkpoint("c://aaa")

? ? 創(chuàng)建一個(gè)將要連接到 hostname:port 的 DStream,如 localhost:9999

? ? val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.175.101", 44444)

? ? updateStateByKey結(jié)果可以累加但是需要傳入一個(gè)自定義的累加函數(shù):updateFunc

? ? val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

? ? 打印結(jié)果到控制臺(tái)

? ? results.print()

? ? 開始計(jì)算

? ? ssc.start()

? ? 等待停止

? ? ssc.awaitTermination()

? }

}

12.簡(jiǎn)述map和flatMap的區(qū)別和應(yīng)用場(chǎng)景

map是對(duì)每一個(gè)元素進(jìn)行操作,flatmap是對(duì)每一個(gè)元素操作后并壓平

13.計(jì)算曝光數(shù)和點(diǎn)擊數(shù)

14.分別列出幾個(gè)常用的transformation和action算子

轉(zhuǎn)換算子:map,map,filter,reduceByKey,groupByKey,groupBy

行動(dòng)算子:foreach,foreachpartition,collect,collectAsMap,take,top,first,count,countByKey

15.按照需求使用spark編寫以下程序,要求使用scala語(yǔ)言

當(dāng)前文件a.txt的格式,請(qǐng)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)

A,b,c

B,b,f,e

objectWordCount {

def main(args: Array[String]):Unit= {

valconf = new SparkConf()

.setAppName(this.getClass.getSimpleName)

.setMaster("local[*]")

valsc = new SparkContext(conf)

varsData: RDD[String] = sc.textFile("a.txt")

valsortData: RDD[(String,Int)] = sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)

? ? sortData.foreach(print)

? }

}

16.spark應(yīng)用程序的執(zhí)行命令是什么?

/usr/local/spark-current2.3/bin/spark-submit \

--class com.wedoctor.Application \

--master yarn \

--deploy-mode client \

--driver-memory 1g \

--executor-memory 2g \

--queue root.wedw \

--num-executors 200 \

--jars home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar?\

/home/pgxl/liuzc/sen.jar

17.Spark應(yīng)用執(zhí)行有哪些模式,其中哪幾種是集群模式

本地local模式

standalone模式

spark on yarn模式

spark on mesos模式

其中,standalone模式,spark on yarn模式,spark on mesos模式是集群模式

18.請(qǐng)說(shuō)明spark中廣播變量的用途

使用廣播變量,每個(gè) Executor 的內(nèi)存中,只駐留一份變量副本,而不是對(duì) 每個(gè) task 都傳輸一次大變量,省了很多的網(wǎng)絡(luò)傳輸, 對(duì)性能提升具有很大幫助, 而且會(huì)通過(guò)高效的廣播算法來(lái)減少傳輸代價(jià)。

19.以下代碼會(huì)報(bào)錯(cuò)嗎?如果會(huì)怎么解決 val arr = new ArrayList[String]; arr.foreach(println)

val arr = new ArrayList[String]; 這里會(huì)報(bào)錯(cuò),需要改成 val arr: Array[String] = new Array[String](10)

arr.foreach(println)打印不會(huì)報(bào)空指針

20.寫出你用過(guò)的spark中的算子,其中哪些會(huì)產(chǎn)生shuffle過(guò)程

reduceBykey:

groupByKey:

…ByKey:

21.Spark中rdd與partition的區(qū)別

22.請(qǐng)寫出創(chuàng)建Dateset的幾種方式

23.描述一下RDD,DataFrame,DataSet的區(qū)別?

1)RDD

優(yōu)點(diǎn):

編譯時(shí)類型安全

編譯時(shí)就能檢查出類型錯(cuò)誤

面向?qū)ο蟮木幊田L(fēng)格

直接通過(guò)類名點(diǎn)的方式來(lái)操作數(shù)據(jù)

缺點(diǎn):

序列化和反序列化的性能開銷

無(wú)論是集群間的通信, 還是 IO 操作都需要對(duì)對(duì)象的結(jié)構(gòu)和數(shù)據(jù)進(jìn)行序列化和反序列化。

GC 的性能開銷,頻繁的創(chuàng)建和銷毀對(duì)象, 勢(shì)必會(huì)增加 GC

2)DataFrame

DataFrame 引入了 schema 和 off-heap

schema : RDD 每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的,這個(gè)結(jié)構(gòu)就存儲(chǔ)在 schema 中。Spark 通過(guò) schema 就能夠讀懂?dāng)?shù)據(jù), 因此在通信和 IO 時(shí)就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了。

3)DataSet

DataSet 結(jié)合了 RDD 和 DataFrame 的優(yōu)點(diǎn),并帶來(lái)的一個(gè)新的概念 Encoder。

當(dāng)序列化數(shù)據(jù)時(shí),Encoder 產(chǎn)生字節(jié)碼與 off-heap 進(jìn)行交互,能夠達(dá)到按需訪問(wèn)數(shù)據(jù)的效果,而不用反序列化整個(gè)對(duì)象。Spark 還沒(méi)有提供自定義 Encoder 的 API,但是未來(lái)會(huì)加入。

三者之間的轉(zhuǎn)換:

24.描述一下Spark中stage是如何劃分的?描述一下shuffle的概念

25.Spark 在yarn上運(yùn)行需要做哪些關(guān)鍵的配置工作?如何kill -個(gè)Spark在yarn運(yùn)行中Application

26.通常來(lái)說(shuō),Spark與MapReduce相比,Spark運(yùn)行效率更高。請(qǐng)說(shuō)明效率更高來(lái)源于Spark內(nèi)置的哪些機(jī)制?并請(qǐng)列舉常見spark的運(yùn)行模式?

27.RDD中的數(shù)據(jù)在哪?

RDD中的數(shù)據(jù)在數(shù)據(jù)源,RDD只是一個(gè)抽象的數(shù)據(jù)集,我們通過(guò)對(duì)RDD的操作就相當(dāng)于對(duì)數(shù)據(jù)進(jìn)行操作。

28.如果對(duì)RDD進(jìn)行cache操作后,數(shù)據(jù)在哪里?

數(shù)據(jù)在第一執(zhí)行cache算子時(shí)會(huì)被加載到各個(gè)Executor進(jìn)程的內(nèi)存中,第二次就會(huì)直接從內(nèi)存中讀取而不會(huì)區(qū)磁盤。

29.Spark中Partition的數(shù)量由什么決定

和Mr一樣,但是Spark默認(rèn)最少有兩個(gè)分區(qū)。

30.Scala里面的函數(shù)和方法有什么區(qū)別

31.SparkStreaming怎么進(jìn)行監(jiān)控?

32.Spark判斷Shuffle的依據(jù)?

?父RDD的一個(gè)分區(qū)中的數(shù)據(jù)有可能被分配到子RDD的多個(gè)分區(qū)中

33.Scala有沒(méi)有多繼承?可以實(shí)現(xiàn)多繼承么?

34.Sparkstreaming和flink做實(shí)時(shí)處理的區(qū)別

35.Sparkcontext的作用

36.Sparkstreaming讀取kafka數(shù)據(jù)為什么選擇直連方式

37.離線分析什么時(shí)候用sparkcore和sparksq

38.Sparkstreaming實(shí)時(shí)的數(shù)據(jù)不丟失的問(wèn)題

39.簡(jiǎn)述寬依賴和窄依賴概念,groupByKey,reduceByKey,map,filter,union五種操作哪些會(huì)導(dǎo)致寬依賴,哪些會(huì)導(dǎo)致窄依賴

40.數(shù)據(jù)傾斜可能會(huì)導(dǎo)致哪些問(wèn)題,如何監(jiān)控和排查,在設(shè)計(jì)之初,要考慮哪些來(lái)避免

41.有一千萬(wàn)條短信,有重復(fù),以文本文件的形式保存,一行一條數(shù)據(jù),請(qǐng)用五分鐘時(shí)間,找出重復(fù)出現(xiàn)最多的前10條

42.現(xiàn)有一文件,格式如下,請(qǐng)用spark統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)

18619304961,18619304064,186193008,186193009

18619304962,18619304065,186193007,186193008

18619304963,18619304066,186193006,186193010

43.共享變量和累加器

累加器(accumulator)是 Spark 中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對(duì)作業(yè)執(zhí)行過(guò)程中的事件進(jìn)行計(jì)數(shù)。而廣播變量用來(lái)高效分發(fā)較大的對(duì)象。

共享變量出現(xiàn)的原因:

通常在向 Spark 傳遞函數(shù)時(shí),比如使用 map() 函數(shù)或者用 filter() 傳條件時(shí),可以使用驅(qū)動(dòng)器程序中定義的變量,但是集群中運(yùn)行的每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本,更新這些副本的值也不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量。

Spark 的兩個(gè)共享變量,累加器與廣播變量,分別為結(jié)果聚合與廣播這兩種常見的通信模式突破了這一限制。

44.當(dāng) Spark 涉及到數(shù)據(jù)庫(kù)的操作時(shí),如何減少 Spark 運(yùn)行中的數(shù)據(jù)庫(kù)連接數(shù)?

使用 foreachPartition 代替 foreach,在 foreachPartition 內(nèi)獲取數(shù)據(jù)庫(kù)的連接。

45.特別大的數(shù)據(jù),怎么發(fā)送到excutor中?

46.spark調(diào)優(yōu)都做過(guò)哪些方面?

47.spark任務(wù)為什么會(huì)被yarn kill掉?

48.Spark on Yarn作業(yè)執(zhí)行流程?yarn-client和yarn-cluster有什么區(qū)別?

你們公司還在用SparkOnYan嗎?

49.Flatmap底層編碼實(shí)現(xiàn)?

Spark flatMap 源碼:

/**

? *? Return a new RDD by first applying a function to all elements of this

? *? RDD, and then flattening the results.

? */

? def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {

? ? val cleanF = sc.clean(f)

? ? new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))

? }

Scala flatMap 源碼:

/** Creates a new iterator by applying a function to all values produced by this iterator

? *? and concatenating the results.

? *

? *? @param f the function to apply on each element.

? *? @return? the iterator resulting from applying the given iterator-valued function

? *? ? ? ? ? `f` to each value produced by this iterator and concatenating the results.

? *? @note? ? Reuse: $consumesAndProducesIterator

? */

? def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {

? ? private var cur: Iterator[B] = empty

? ? private def nextCur() { cur = f(self.next()).toIterator }

? ? def hasNext: Boolean = {

? ? ? Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }

? ? ? but slightly shorter bytecode (better JVM inlining!)

? ? ? while (!cur.hasNext) {

? ? ? ? if (!self.hasNext) return false

? ? ? ? nextCur()

? ? ? }

? ? ? true

? ? }

? ? def next(): B =<span style="color:#ffffff"> <span style="background-color:rgb(255,0,0)">(if (hasNext) cur else empty).next()</span></span>

? }

flatMap其實(shí)就是將RDD里的每一個(gè)元素執(zhí)行自定義函數(shù)f,這時(shí)這個(gè)元素的結(jié)果轉(zhuǎn)換成iterator,最后將這些再拼接成一個(gè)

新的RDD,也可以理解成原本的每個(gè)元素由橫向執(zhí)行函數(shù)f后再變?yōu)榭v向。畫紅部分一直在回調(diào),當(dāng)RDD內(nèi)沒(méi)有元素為止。

50.spark_1.X與spark_2.X區(qū)別?

51.說(shuō)說(shuō)spark與flink

52.spark streaming如何保證7*24小時(shí)運(yùn)行機(jī)制?

53.spark streaming是Exactly-Once嗎?

四.Kafka

1.Kafka名詞解釋和工作方式

Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。

Consumer :消息消費(fèi)者,向kafka broker取消息的客戶端

Topic :咋們可以理解為一個(gè)隊(duì)列。

Consumer Group (CG):這是kafka用來(lái)實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG。topic的消息會(huì)復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)partion只會(huì)把消息發(fā)給該CG中的一個(gè)consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)CG。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic。

Broker :一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。

Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給consumer,不保證一個(gè)topic的整體(多個(gè)partition間)的順序。

Offset:kafka的存儲(chǔ)文件都是按照offset.kafka來(lái)命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka

2.Consumer與topic關(guān)系

本質(zhì)上kafka只支持Topic;

每個(gè)group中可以有多個(gè)consumer,每個(gè)consumer屬于一個(gè)consumer group;

通常情況下,一個(gè)group中會(huì)包含多個(gè)consumer,這樣不僅可以提高topic中消息的并發(fā)消費(fèi)能力,而且還能提高"故障容錯(cuò)"性,如果group中的某個(gè)consumer失效那么其消費(fèi)的partitions將會(huì)有其他consumer自動(dòng)接管。

對(duì)于Topic中的一條特定的消息,只會(huì)被訂閱此Topic的每個(gè)group中的其中一個(gè)consumer消費(fèi),此消息不會(huì)發(fā)送給一個(gè)group的多個(gè)consumer;

那么一個(gè)group中所有的consumer將會(huì)交錯(cuò)的消費(fèi)整個(gè)Topic,每個(gè)group中consumer消息消費(fèi)互相獨(dú)立,我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者。

在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi)(同一時(shí)刻);

一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以同時(shí)消費(fèi)多個(gè)partitions中的消息。

kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息。

kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí)是順序的;事實(shí)上,從Topic角度來(lái)說(shuō),當(dāng)有多個(gè)partitions時(shí),消息仍不是全局有序的。

3.kafka中生產(chǎn)數(shù)據(jù)的時(shí)候,如何保證寫入的容錯(cuò)性?

設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個(gè)值0,1,-1

0: producer不會(huì)等待broker發(fā)送ack

1: 當(dāng)leader接收到消息之后發(fā)送ack

-1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack

request.required.acks=0

4.如何保證kafka消費(fèi)者消費(fèi)數(shù)據(jù)是全局有序的

偽命題

每個(gè)分區(qū)內(nèi),每條消息都有一個(gè)offset,故只能保證分區(qū)內(nèi)有序。

如果要全局有序的,必須保證生產(chǎn)有序,存儲(chǔ)有序,消費(fèi)有序。

由于生產(chǎn)可以做集群,存儲(chǔ)可以分片,消費(fèi)可以設(shè)置為一個(gè)consumerGroup,要保證全局有序,就需要保證每個(gè)環(huán)節(jié)都有序。

只有一個(gè)可能,就是一個(gè)生產(chǎn)者,一個(gè)partition,一個(gè)消費(fèi)者。這種場(chǎng)景和大數(shù)據(jù)應(yīng)用場(chǎng)景相悖。

5.有兩個(gè)數(shù)據(jù)源,一個(gè)記錄的是廣告投放給用戶的日志,一個(gè)記錄用戶訪問(wèn)日志,另外還有一個(gè)固定的用戶基礎(chǔ)表記錄用戶基本信息(比如學(xué)歷,年齡等等)。現(xiàn)在要分析廣告投放對(duì)與哪類用戶更有效,請(qǐng)采用熟悉的技術(shù)描述解決思路。另外如果兩個(gè)數(shù)據(jù)源都是實(shí)時(shí)數(shù)據(jù)源(比如來(lái)自kafka),他們數(shù)據(jù)在時(shí)間上相差5分鐘,需要哪些調(diào)整來(lái)解決實(shí)時(shí)分析問(wèn)題?

6.Kafka和SparkStreaing如何集成?

7.列舉Kafka的優(yōu)點(diǎn),簡(jiǎn)述Kafka為什么可以做到每秒數(shù)十萬(wàn)甚至上百萬(wàn)消息的高效分發(fā)?

8.為什么離線分析要用kafka?

Kafka的作用是解耦,如果直接從日志服務(wù)器上采集的話,實(shí)時(shí)離線都要采集,等于要采集兩份數(shù)據(jù),而使用了kafka的話,只需要從日志服務(wù)器上采集一份數(shù)據(jù),然后在kafka中使用不同的兩個(gè)組讀取就行了

9.Kafka怎么進(jìn)行監(jiān)控?

Kafka?Manager

10.Kafka與傳統(tǒng)的消息隊(duì)列服務(wù)有很么不同

11.Kafka api ?low-level與high-level有什么區(qū)別,使用low-level需要處理哪些細(xì)節(jié)

12.Kafka的ISR副本同步隊(duì)列

ISR(In-Sync Replicas),副本同步隊(duì)列。ISR中包括Leader和Follower。如果Leader進(jìn)程掛掉,會(huì)在ISR隊(duì)列中選擇一個(gè)服務(wù)作為新的Leader。有replica.lag.max.messages(延遲條數(shù))和replica.lag.time.max.ms(延遲時(shí)間)兩個(gè)參數(shù)決定一臺(tái)服務(wù)是否可以加入ISR副本隊(duì)列,在0.10版本移除了replica.lag.max.messages參數(shù),防止服務(wù)頻繁的進(jìn)去隊(duì)列。

任意一個(gè)維度超過(guò)閾值都會(huì)把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會(huì)先存放在OSR中。

13.Kafka消息數(shù)據(jù)積壓,Kafka消費(fèi)能力不足怎么處理?

1)如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)

2)如果是下游的數(shù)據(jù)處理不及時(shí):提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過(guò)少(拉取數(shù)據(jù)/處理時(shí)間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。

14.Kafka中的ISR、AR又代表什么?

?ISR:in-sync replica set (ISR),與leader保持同步的follower集合

??? AR:分區(qū)的所有副本

15.Kafka中的HW、LEO等分別代表什么?

LEO:每個(gè)副本的最后條消息的offset

??? HW:一個(gè)分區(qū)中所有副本最小的offset

16.哪些情景會(huì)造成消息漏消費(fèi)?

先提交offset,后消費(fèi),有可能造成數(shù)據(jù)的重復(fù)

17.當(dāng)你使用kafka-topics.sh創(chuàng)建了一個(gè)topic之后,Kafka背后會(huì)執(zhí)行什么邏輯?

? 1)會(huì)在zookeeper中的/brokers/topics節(jié)點(diǎn)下創(chuàng)建一個(gè)新的topic節(jié)點(diǎn),如:/brokers/topics/first

????2)觸發(fā)Controller的監(jiān)聽程序

????3)kafka Controller 負(fù)責(zé)topic的創(chuàng)建工作,并更新metadata cache

18.topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?

可以增加

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

19.topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?

不可以減少,被刪除的分區(qū)數(shù)據(jù)難以處理。

20.Kafka有內(nèi)部的topic嗎?如果有是什么?有什么所用?

?__consumer_offsets,保存消費(fèi)者offset

21.聊一聊Kafka Controller的作用?

負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。

22.失效副本是指什么?有那些應(yīng)對(duì)措施?

不能及時(shí)與leader同步,暫時(shí)踢出ISR,等其追上leader之后再重新加入

23.Kafka 都有哪些特點(diǎn)?

高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作。

可擴(kuò)展性:kafka集群支持熱擴(kuò)展

持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失

容錯(cuò)性:允許集群中節(jié)點(diǎn)失?。ㄈ舾北緮?shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敗)

高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫

24.請(qǐng)簡(jiǎn)述下你在哪些場(chǎng)景下會(huì)選擇 Kafka?

日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、HBase、Solr等。

消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。

用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁(yè)、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘。

運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。

流式處理:比如spark streaming和 Flink

25.Kafka 的設(shè)計(jì)架構(gòu)你知道嗎?

簡(jiǎn)單架構(gòu)如下

詳細(xì)如下

Kafka 架構(gòu)分為以下幾個(gè)部分

Producer :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端。

Consumer :消息消費(fèi)者,向 kafka broker 取消息的客戶端。

Topic :可以理解為一個(gè)隊(duì)列,一個(gè) Topic 又分為一個(gè)或多個(gè)分區(qū)。

Consumer Group:這是 kafka 用來(lái)實(shí)現(xiàn)一個(gè) topic 消息的廣播(發(fā)給所有的 consumer)和單播(發(fā)給任意一個(gè) consumer)的手段。一個(gè) topic 可以有多個(gè) Consumer Group。

Broker :一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。

Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker上,每個(gè) partition 是一個(gè)有序的隊(duì)列。partition 中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。將消息發(fā)給 consumer,kafka 只保證按一個(gè) partition 中的消息的順序,不保證一個(gè) topic 的整體(多個(gè) partition 間)的順序。

Offset:kafka 的存儲(chǔ)文件都是按照 offset.kafka 來(lái)命名,用 offset 做名字的好處是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。當(dāng)然 the first offset 就是 00000000000.kafka。

26.Kafka 分區(qū)的目的?

分區(qū)對(duì)于 Kafka 集群的好處是:實(shí)現(xiàn)負(fù)載均衡。分區(qū)對(duì)于消費(fèi)者來(lái)說(shuō),可以提高并發(fā)度,提高效率。

27.你知道 Kafka 是如何做到消息的有序性?

kafka 中的每個(gè) partition 中的消息在寫入時(shí)都是有序的,而且消息帶有offset偏移量,消費(fèi)者按偏移量的順序從前往后消費(fèi),從而保證了消息的順序性。但是分區(qū)之間的消息是不保證有序的。

28.Kafka 的高可靠性是怎么實(shí)現(xiàn)的?

kafka通過(guò)分區(qū)的多副本機(jī)制來(lái)保證消息的可靠性。1. 每個(gè)分區(qū)可以設(shè)置多個(gè)副本,這些副本分布在不同的broker上;2. 相同partition的多個(gè)副本能動(dòng)態(tài)選舉leader來(lái)對(duì)外服務(wù)和管理內(nèi)部數(shù)據(jù)同步。這樣,即使有broker出現(xiàn)故障,受影響的partition也會(huì)在其他broker上重新選舉出新的leader來(lái)繼續(xù)服務(wù)

更具體來(lái)說(shuō),可參看下文:

Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個(gè)副本可以使 Kafka 在發(fā)生崩潰時(shí)仍能保證消息的持久性。

Producer 往 Broker 發(fā)送消息

如果我們要往 Kafka 對(duì)應(yīng)的主題發(fā)送消息,我們需要通過(guò) Producer 完成。前面我們講過(guò) Kafka 主題對(duì)應(yīng)了多個(gè)分區(qū),每個(gè)分區(qū)下面又對(duì)應(yīng)了多個(gè)副本;為了讓用戶設(shè)置數(shù)據(jù)可靠性, Kafka 在 Producer 里面提供了消息確認(rèn)機(jī)制。也就是說(shuō)我們可以通過(guò)配置來(lái)決定消息發(fā)送到對(duì)應(yīng)分區(qū)的幾個(gè)副本才算消息發(fā)送成功。可以在定義 Producer 時(shí)通過(guò) acks 參數(shù)指定(在 0.8.2.X 版本之前是通過(guò) request.required.acks 參數(shù)設(shè)置的,詳見 KAFKA-3043)。這個(gè)參數(shù)支持以下三種值:

acks = 0:意味著如果生產(chǎn)者能夠通過(guò)網(wǎng)絡(luò)把消息發(fā)送出去,那么就認(rèn)為消息已成功寫入 Kafka 。在這種情況下還是有可能發(fā)生錯(cuò)誤,比如發(fā)送的對(duì)象無(wú)能被序列化或者網(wǎng)卡發(fā)生故障,但如果是分區(qū)離線或整個(gè)集群長(zhǎng)時(shí)間不可用,那就不會(huì)收到任何錯(cuò)誤。在 acks=0 模式下的運(yùn)行速度是非常快的(這就是為什么很多基準(zhǔn)測(cè)試都是基于這個(gè)模式),你可以得到驚人的吞吐量和帶寬利用率,不過(guò)如果選擇了這種模式, 一定會(huì)丟失一些消息。

acks = 1:意味若 Leader 在收到消息并把它寫入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤上)時(shí)會(huì)返回確認(rèn)或錯(cuò)誤響應(yīng)。在這個(gè)模式下,如果發(fā)生正常的 Leader 選舉,生產(chǎn)者會(huì)在選舉時(shí)收到一個(gè) LeaderNotAvailableException 異常,如果生產(chǎn)者能恰當(dāng)?shù)靥幚磉@個(gè)錯(cuò)誤,它會(huì)重試發(fā)送悄息,最終消息會(huì)安全到達(dá)新的 Leader 那里。不過(guò)在這個(gè)模式下仍然有可能丟失數(shù)據(jù),比如消息已經(jīng)成功寫入 Leader,但在消息被復(fù)制到 follower 副本之前 Leader發(fā)生崩潰。

acks = all(這個(gè)和 request.required.acks = -1 含義一樣):意味著 Leader 在返回確認(rèn)或錯(cuò)誤響應(yīng)之前,會(huì)等待所有同步副本都收到悄息。如果和 min.insync.replicas 參數(shù)結(jié)合起來(lái),就可以決定在返回確認(rèn)前至少有多少個(gè)副本能夠收到悄息,生產(chǎn)者會(huì)一直重試直到消息被成功提交。不過(guò)這也是最慢的做法,因?yàn)樯a(chǎn)者在繼續(xù)發(fā)送其他消息之前需要等待所有副本都收到當(dāng)前的消息。

根據(jù)實(shí)際的應(yīng)用場(chǎng)景,我們?cè)O(shè)置不同的 acks,以此保證數(shù)據(jù)的可靠性。

另外,Producer 發(fā)送消息還可以選擇同步(默認(rèn),通過(guò) producer.type=sync 配置) 或者異步(producer.type=async)模式。如果設(shè)置成異步,雖然會(huì)極大的提高消息發(fā)送的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。如果需要確保消息的可靠性,必須將 producer.type 設(shè)置為 sync。

Leader 選舉

在介紹 Leader 選舉之前,讓我們先來(lái)了解一下 ISR(in-sync replicas)列表。每個(gè)分區(qū)的 leader 會(huì)維護(hù)一個(gè) ISR 列表,ISR 列表里面就是 follower 副本的 Borker 編號(hào),只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,這個(gè)是通過(guò) replica.lag.time.max.ms 參數(shù)配置的,具體可以參見?《一文了解 Kafka 的副本復(fù)制機(jī)制》。只有 ISR 里的成員才有被選為 leader 的可能。

所以當(dāng) Leader 掛掉了,而且 unclean.leader.election.enable=false 的情況下,Kafka 會(huì)從 ISR 列表中選擇第一個(gè) follower 作為新的 Leader,因?yàn)檫@個(gè)分區(qū)擁有最新的已經(jīng) committed 的消息。通過(guò)這個(gè)可以保證已經(jīng) committed 的消息的數(shù)據(jù)可靠性。

綜上所述,為了保證數(shù)據(jù)的可靠性,我們最少需要配置一下幾個(gè)參數(shù):

producer 級(jí)別:acks=all(或者 request.required.acks=-1),同時(shí)發(fā)生模式為同步 producer.type=sync

topic 級(jí)別:設(shè)置 replication.factor>=3,并且 min.insync.replicas>=2;

broker 級(jí)別:關(guān)閉不完全的 Leader 選舉,即 unclean.leader.election.enable=false;

29.請(qǐng)談一談 Kafka 數(shù)據(jù)一致性原理

一致性就是說(shuō)不論是老的 Leader 還是新選舉的 Leader,Consumer 都能讀到一樣的數(shù)據(jù)。

假設(shè)分區(qū)的副本為3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。雖然副本0已經(jīng)寫入了 Message4,但是 Consumer 只能讀取到 Message2。因?yàn)樗械?ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 讀取,而 High Water Mark 取決于 ISR 列表里面偏移量最小的分區(qū),對(duì)應(yīng)于上圖的副本2,這個(gè)很類似于木桶原理。

這樣做的原因是還沒(méi)有被足夠多副本復(fù)制的消息被認(rèn)為是“不安全”的,如果 Leader 發(fā)生崩潰,另一個(gè)副本成為新 Leader,那么這些消息很可能丟失了。如果我們?cè)试S消費(fèi)者讀取這些消息,可能就會(huì)破壞一致性。試想,一個(gè)消費(fèi)者從當(dāng)前 Leader(副本0) 讀取并處理了 Message4,這個(gè)時(shí)候 Leader 掛掉了,選舉了副本1為新的 Leader,這時(shí)候另一個(gè)消費(fèi)者再去從新的 Leader 讀取消息,發(fā)現(xiàn)這個(gè)消息其實(shí)并不存在,這就導(dǎo)致了數(shù)據(jù)不一致性問(wèn)題。

當(dāng)然,引入了 High Water Mark 機(jī)制,會(huì)導(dǎo)致 Broker 間的消息復(fù)制因?yàn)槟承┰蜃兟?,那么消息到達(dá)消費(fèi)者的時(shí)間也會(huì)隨之變長(zhǎng)(因?yàn)槲覀儠?huì)先等待消息復(fù)制完畢)。延遲時(shí)間可以通過(guò)參數(shù) replica.lag.time.max.ms 參數(shù)配置,它指定了副本在復(fù)制消息時(shí)可被允許的最大延遲時(shí)間。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容