17.分區(qū)分桶的區(qū)別,為什么要分區(qū)
分區(qū)表:原來(lái)的一個(gè)大表存儲(chǔ)的時(shí)候分成不同的數(shù)據(jù)目錄進(jìn)行存儲(chǔ)。如果說(shuō)是單分區(qū)表,那么在表的目錄下就只有一級(jí)子目錄,如果說(shuō)是多分區(qū)表,那么在表的目錄下有多少分區(qū)就有多少級(jí)子目錄。不管是單分區(qū)表,還是多分區(qū)表,在表的目錄下,和非最終分區(qū)目錄下是不能直接存儲(chǔ)數(shù)據(jù)文件的
分桶表:原理和hashpartitioner 一樣,將hive中的一張表的數(shù)據(jù)進(jìn)行歸納分類的時(shí)候,歸納分類規(guī)則就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)
分區(qū)表和分桶的區(qū)別除了存儲(chǔ)的格式不同外,最主要的是作用:
分區(qū)表:細(xì)化數(shù)據(jù)管理,縮小mapreduce程序 需要掃描的數(shù)據(jù)量。
分桶表:提高join查詢的效率,在一份數(shù)據(jù)會(huì)被經(jīng)常用來(lái)做連接查詢的時(shí)候建立分桶,分桶字段就是連接字段;提高采樣的效率。
有了分區(qū)為什么還要分桶?
(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時(shí)能利用這個(gè)結(jié)構(gòu)。
(2)使取樣( sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時(shí),在開發(fā)和修改査詢的階段,如果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運(yùn)行查詢,會(huì)帶來(lái)很多方便。
分桶是相對(duì)分區(qū)進(jìn)行更細(xì)粒度的劃分。分桶將表或者分區(qū)的某列值進(jìn)行hash值進(jìn)行區(qū)分,如要安裝name屬性分為3個(gè)桶,就是對(duì)name屬性值的hash值對(duì)3取摸,按照取模結(jié)果對(duì)數(shù)據(jù)分桶。
與分區(qū)不同的是,分區(qū)依據(jù)的不是真實(shí)數(shù)據(jù)表文件中的列,而是我們指定的偽列,但是分桶是依據(jù)數(shù)據(jù)表中真實(shí)的列而不是偽列
18.mapjoin的原理

MapJoin通常用于一個(gè)很小的表和一個(gè)大表進(jìn)行join的場(chǎng)景,具體小表有多小,由參數(shù)hive.mapjoin.smalltable.filesize來(lái)決定,該參數(shù)表示小表的總大小,默認(rèn)值為25000000字節(jié),即25M。
Hive0.7之前,需要使用hint提示 *+ mapjoin(table) */才會(huì)執(zhí)行MapJoin,否則執(zhí)行Common Join,但在0.7版本之后,默認(rèn)自動(dòng)會(huì)轉(zhuǎn)換Map Join,由參數(shù)hive.auto.convert.join來(lái)控制,默認(rèn)為true.
假設(shè)a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執(zhí)行時(shí)候會(huì)自動(dòng)轉(zhuǎn)化為MapJoin。
MapJoin簡(jiǎn)單說(shuō)就是在Map階段將小表讀入內(nèi)存,順序掃描大表完成Join。減少昂貴的shuffle操作及reduce操作
MapJoin分為兩個(gè)階段:
通過(guò)MapReduce Local Task,將小表讀入內(nèi)存,生成HashTableFiles上傳至Distributed Cache中,這里會(huì)HashTableFiles進(jìn)行壓縮。
MapReduce Job在Map階段,每個(gè)Mapper從Distributed Cache讀取HashTableFiles到內(nèi)存中,順序掃描大表,在Map階段直接進(jìn)行Join,將數(shù)據(jù)傳遞給下一個(gè)MapReduce任務(wù)。
19.在hive的row_number中distribute by 和 partition by的區(qū)別?
20.hive開發(fā)中遇到什么問(wèn)題?
21.什么時(shí)候使用內(nèi)部表,什么時(shí)候使用外部表
Hive內(nèi)部表外部表區(qū)別及各自使用場(chǎng)景
22.hive都有哪些函數(shù),你平常工作中用到哪些
數(shù)學(xué)函數(shù)
round(DOUBLE a)
floor(DOUBLE a)
ceil(DOUBLE a)
rand()
集合函數(shù)
size(Map<K.V>)
map_keys(Map<K.V>)
map_values(Map<K.V>)
array_contains(Array<T>, value)
sort_array(Array<T>)
類型轉(zhuǎn)換函數(shù)
cast(expr as <type>)
日期函數(shù)
date_format函數(shù)(根據(jù)格式整理日期)
date_add、date_sub函數(shù)(加減日期)
next_day函數(shù)
last_day函數(shù)(求當(dāng)月最后一天日期)
collect_set函數(shù)
get_json_object解析json函數(shù)
from_unixtime(bigint unixtime, string format)
to_date(string timestamp)
year(string date)
month(string date)
hour(string date)
weekofyear(string date)
datediff(string enddate, string startdate)
add_months(string start_date, int num_months)
date_format(date/timestamp/string ts, string fmt)
條件函數(shù)
if(boolean testCondition, T valueTrue, T valueFalseOrNull)
nvl(T value, T default_value)
COALESCE(T v1, T v2, ...)
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
isnull( a )
isnotnull ( a )
字符函數(shù)
concat(string|binary A, string|binary B...)
concat_ws(string SEP, string A, string B...)
get_json_object(string json_string, string path)
length(string A)
lower(string A) lcase(string A)
parse_url(string urlString, string partToExtract [, string keyToExtract])
regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT)
reverse(string A)
split(string str, string pat)
substr(string|binary A, int start) substring(string|binary A, int start)
聚合函數(shù)
count? sum min max avg
表生成函數(shù)
explode(array<TYPE> a)
explode(ARRAY)
json_tuple(jsonStr, k1, k2, ...)
parse_url_tuple(url, p1, p2, ...)
23.手寫sql,連續(xù)活躍用戶
24.left semi?join和left?join區(qū)別
left semi join和left join區(qū)別
25.group by為什么要排序
26.說(shuō)說(shuō)印象最深的一次優(yōu)化場(chǎng)景,hive常見的優(yōu)化思路
Hive調(diào)優(yōu),數(shù)據(jù)工程師成神之路
27.聊聊hive的執(zhí)行引擎,spark和mr的區(qū)別?
28.hive的join底層mr是如何實(shí)現(xiàn)的?
29.sql問(wèn)題,連續(xù)幾天活躍的用戶?
30.建好了外部表,用什么語(yǔ)句把數(shù)據(jù)文件加載到表里
31.Hive的執(zhí)行流程?
32.hive的元數(shù)據(jù)信息存儲(chǔ)在哪?
33.sql語(yǔ)句的執(zhí)行順序from-where-group by-having -select-order by -limit
34.on和where的區(qū)別
35.hive和傳統(tǒng)數(shù)據(jù)庫(kù)之間的區(qū)別
1、寫時(shí)模式和讀時(shí)模式
傳統(tǒng)數(shù)據(jù)庫(kù)是寫時(shí)模式,在load過(guò)程中,提升了査詢性能,因?yàn)轭A(yù)先解析之后可以對(duì)列建立索引,并壓縮,但這樣也會(huì)花費(fèi)更多的加載時(shí)間。
Hive是讀時(shí)模式,1 oad data非常迅速,因?yàn)樗恍枰x取數(shù)據(jù)進(jìn)行解析,僅僅進(jìn)行文件的復(fù)制或者移動(dòng)。
2、數(shù)據(jù)格式。Hive中沒(méi)有定義專門的數(shù)據(jù)格式,由用戶指定,需要指定三個(gè)屬性:列分隔符,行分隔符,以及讀取文件數(shù)據(jù)的方法。數(shù)據(jù)庫(kù)中,存儲(chǔ)引擎定義了自己的數(shù)據(jù)格式。所有數(shù)據(jù)都會(huì)按照一定的組織存儲(chǔ)
3、數(shù)據(jù)更新。Hive的內(nèi)容是讀多寫少的,因此,不支持對(duì)數(shù)據(jù)的改寫和刪除,數(shù)據(jù)都在加載的時(shí)候中確定好的。數(shù)據(jù)庫(kù)中的數(shù)據(jù)通常是需要經(jīng)常進(jìn)行修改
4、執(zhí)行延遲。Hive在查詢數(shù)據(jù)的時(shí)候,需要掃描整個(gè)表(或分區(qū)),因此延遲較高,只有在處理大數(shù)據(jù)是才有優(yōu)勢(shì)。數(shù)據(jù)庫(kù)在處理小數(shù)據(jù)是執(zhí)行延遲較低。
5、索引。Hive比較弱,不適合實(shí)時(shí)查詢。數(shù)據(jù)庫(kù)有。
6、執(zhí)行。Hive是 Mapreduce,數(shù)據(jù)庫(kù)是 Executor
7、可擴(kuò)展性。Hive高,數(shù)據(jù)庫(kù)低
8、數(shù)據(jù)規(guī)模。Hive大,數(shù)據(jù)庫(kù)小
36.hive中導(dǎo)入數(shù)據(jù)的4種方式
從本地導(dǎo)入:load data local inpath home/liuzc into table ods.test
從hdfs導(dǎo)入:load data inpath user/hive/warehouse/a.txt into ods.test
查詢導(dǎo)入:create table tmp_test as select * from ods.test
查詢結(jié)果導(dǎo)入:insert into table tmp.test select * from ods.test
三.Spark
1.rdd的屬性

一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。
一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
2.算子分為哪幾類(RDD支持哪幾種類型的操作)
轉(zhuǎn)換(Transformation) ?現(xiàn)有的RDD通過(guò)轉(zhuǎn)換生成一個(gè)新的RDD。lazy模式,延遲執(zhí)行。
轉(zhuǎn)換函數(shù)包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce?等等。
動(dòng)作(Action) ?在RDD上運(yùn)行計(jì)算,并返回結(jié)果給驅(qū)動(dòng)程序(Driver)或?qū)懭胛募到y(tǒng)。
動(dòng)作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect ?該方法把數(shù)據(jù)收集到driver端 ??Array數(shù)組類型
所有的transformation只有遇到action才能被執(zhí)行。
當(dāng)觸發(fā)執(zhí)行action之后,數(shù)據(jù)類型不再是rdd了,數(shù)據(jù)就會(huì)存儲(chǔ)到指定文件系統(tǒng)中,或者直接打印結(jié)?果或者收集起來(lái)。
3.創(chuàng)建rdd的幾種方式
1.集合并行化創(chuàng)建(有數(shù)據(jù))
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
2.讀取外部文件系統(tǒng),如hdfs,或者讀取本地文件(最常用的方式)(沒(méi)數(shù)據(jù))
val rdd2 = sc.textFile("hdfs://hdp-01:9000/words.txt")
// 讀取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
3.從父RDD轉(zhuǎn)換成新的子RDD
調(diào)用Transformation類的方法,生成新的RDD
4.spark運(yùn)行流程

Worker的功能:定時(shí)和master通信;調(diào)度并管理自身的executor
executor:由Worker啟動(dòng)的,程序最終在executor中運(yùn)行,(程序運(yùn)行的一個(gè)容器)
spark-submit命令執(zhí)行時(shí),會(huì)根據(jù)master地址去向 Master發(fā)送請(qǐng)求,
Master接收到Dirver端的任務(wù)請(qǐng)求之后,根據(jù)任務(wù)的請(qǐng)求資源進(jìn)行調(diào)度,(打散的策略),盡可能的?把任務(wù)資源平均分配,然后向WOrker發(fā)送指令
Worker收到Master的指令之后,就根據(jù)相應(yīng)的資源,啟動(dòng)executor(cores,memory)
executor會(huì)向dirver端建立請(qǐng)求,通知driver,任務(wù)已經(jīng)可以運(yùn)行了
driver運(yùn)行任務(wù)的時(shí)候,會(huì)把任務(wù)發(fā)送到executor中去運(yùn)行。
5.Spark中coalesce與repartition的區(qū)別
1)關(guān)系:
兩者都是用來(lái)改變 RDD 的 partition 數(shù)量的,repartition 底層調(diào)用的就是 coalesce 方法:coalesce(numPartitions, shuffle = true)
2)區(qū)別:
repartition 一定會(huì)發(fā)生 shuffle,coalesce 根據(jù)傳入的參數(shù)來(lái)判斷是否發(fā)生 shuffle
一般情況下增大 rdd 的 partition 數(shù)量使用 repartition,減少 partition 數(shù)量時(shí)使用coalesce
6.sortBy 和 sortByKey的區(qū)別
sortBy既可以作用于RDD[K] ,還可以作用于RDD[(k,v)]
sortByKey ?只能作用于 RDD[K,V] 類型上。
7.map和mapPartitions的區(qū)別

8.數(shù)據(jù)存入Redis ?優(yōu)先使用map mapPartitions ?foreach ?foreachPartions哪個(gè)
使用 foreachPartition
???* 1,map mapPartition ??是轉(zhuǎn)換類的算子, 有返回值
???* 2, 寫mysql,redis?的連接
???foreach ?* 100萬(wàn) ????????100萬(wàn)次的連接
???foreachPartions * 200 個(gè)分區(qū) ????200次連接 ?一個(gè)分區(qū)中的數(shù)據(jù),共用一個(gè)連接
foreachParititon 每次迭代一個(gè)分區(qū),foreach每次迭代一個(gè)元素。
該方法沒(méi)有返回值,或者Unit
主要作用于,沒(méi)有返回值類型的操作(打印結(jié)果,寫入到mysql數(shù)據(jù)庫(kù)中)
在寫入到redis,mysql的時(shí)候,優(yōu)先使用foreachPartititon
9.reduceByKey和groupBykey的區(qū)別

reduceByKey會(huì)傳一個(gè)聚合函數(shù), 相當(dāng)于 ?groupByKey?+ mapValues
reduceByKey 會(huì)有一個(gè)分區(qū)內(nèi)聚合,而groupByKey沒(méi)有 ?最核心的區(qū)別 ?
結(jié)論:reduceByKey有分區(qū)內(nèi)聚合,更高效,優(yōu)先選擇使用reduceByKey。
10.cache和checkPoint的比較
都是做 RDD 持久化的
1.緩存,是在觸發(fā)action之后,把數(shù)據(jù)寫入到內(nèi)存或者磁盤中。不會(huì)截?cái)嘌夑P(guān)系
(設(shè)置緩存級(jí)別為memory_only:內(nèi)存不足,只會(huì)部分緩存或者沒(méi)有緩存,緩存會(huì)丟失,memory_and_disk :內(nèi)存不足,會(huì)使用磁盤)
2.checkpoint 也是在觸發(fā)action之后,執(zhí)行任務(wù)。單獨(dú)再啟動(dòng)一個(gè)job,負(fù)責(zé)寫入數(shù)據(jù)到hdfs中。(把rdd中的數(shù)據(jù),以二進(jìn)制文本的方式寫入到hdfs中,有幾個(gè)分區(qū),就有幾個(gè)二進(jìn)制文件)
3.某一個(gè)RDD被checkpoint之后,他的父依賴關(guān)系會(huì)被刪除,血緣關(guān)系被截?cái)?,該RDD轉(zhuǎn)換成了CheckPointRDD,以后再對(duì)該rdd的所有操作,都是從hdfs中的checkpoint的具體目錄來(lái)讀取數(shù)據(jù)。緩存之后,rdd的依賴關(guān)系還是存在的。
11.spark streaming流式統(tǒng)計(jì)單詞數(shù)量代碼
object WordCountAll {
? newValues當(dāng)前批次的出現(xiàn)的單詞次數(shù), runningCount表示之前運(yùn)行的單詞出現(xiàn)的結(jié)果
* def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
? ? val newCount =? newValues.sum + runningCount.getOrElse(0)// 將歷史前幾個(gè)批次的值和當(dāng)前批次的值進(jìn)行累加返回當(dāng)前批次最終的結(jié)果
? ? Some(newCount)
? }*/
? **
? ? * String : 單詞 hello
? ? * Seq[Int] :?jiǎn)卧~在當(dāng)前批次出現(xiàn)的次數(shù)
? ? * Option[Int] :歷史結(jié)果
? ? */
? val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
? ? iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
? ? iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
? }
? 屏蔽日志
? Logger.getLogger("org.apache").setLevel(Level.ERROR)
? def main(args: Array[String]) {
? ? 必須要開啟2個(gè)以上的線程,一個(gè)線程用來(lái)接收數(shù)據(jù),另外一個(gè)線程用來(lái)計(jì)算
? ? val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
? ? ? 設(shè)置sparkjob計(jì)算時(shí)所采用的序列化方式
? ? ? .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
? ? ? .set("spark.rdd.compress", "true")? 節(jié)約大量的內(nèi)存內(nèi)容
? ? 如果你的程序出現(xiàn)垃圾回收時(shí)間過(guò)程,可以設(shè)置一下java的垃圾回收參數(shù)
? ? 同時(shí)也會(huì)創(chuàng)建sparkContext對(duì)象
? ? 批次時(shí)間 >= 批次處理的總時(shí)間 (批次數(shù)據(jù)量,集群的計(jì)算節(jié)點(diǎn)數(shù)量和配置)
? ? val ssc = new StreamingContext(conf, Seconds(5))
? ? 做checkpoint 寫入共享存儲(chǔ)中
? ? ssc.checkpoint("c://aaa")
? ? 創(chuàng)建一個(gè)將要連接到 hostname:port 的 DStream,如 localhost:9999
? ? val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.175.101", 44444)
? ? updateStateByKey結(jié)果可以累加但是需要傳入一個(gè)自定義的累加函數(shù):updateFunc
? ? val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
? ? 打印結(jié)果到控制臺(tái)
? ? results.print()
? ? 開始計(jì)算
? ? ssc.start()
? ? 等待停止
? ? ssc.awaitTermination()
? }
}
12.簡(jiǎn)述map和flatMap的區(qū)別和應(yīng)用場(chǎng)景
map是對(duì)每一個(gè)元素進(jìn)行操作,flatmap是對(duì)每一個(gè)元素操作后并壓平
13.計(jì)算曝光數(shù)和點(diǎn)擊數(shù)

14.分別列出幾個(gè)常用的transformation和action算子
轉(zhuǎn)換算子:map,map,filter,reduceByKey,groupByKey,groupBy
行動(dòng)算子:foreach,foreachpartition,collect,collectAsMap,take,top,first,count,countByKey
15.按照需求使用spark編寫以下程序,要求使用scala語(yǔ)言
當(dāng)前文件a.txt的格式,請(qǐng)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)
A,b,c
B,b,f,e
objectWordCount {
def main(args: Array[String]):Unit= {
valconf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
valsc = new SparkContext(conf)
varsData: RDD[String] = sc.textFile("a.txt")
valsortData: RDD[(String,Int)] = sData.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
? ? sortData.foreach(print)
? }
}
16.spark應(yīng)用程序的執(zhí)行命令是什么?
/usr/local/spark-current2.3/bin/spark-submit \
--class com.wedoctor.Application \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 2g \
--queue root.wedw \
--num-executors 200 \
--jars home/pgxl/liuzc/config-1.3.0.jar,/home/pgxl/liuzc/hadoop-lzo-0.4.20.jar,/home/pgxl/liuzc/elasticsearch-hadoop-hive-2.3.4.jar?\
/home/pgxl/liuzc/sen.jar
17.Spark應(yīng)用執(zhí)行有哪些模式,其中哪幾種是集群模式
本地local模式
standalone模式
spark on yarn模式
spark on mesos模式
其中,standalone模式,spark on yarn模式,spark on mesos模式是集群模式
18.請(qǐng)說(shuō)明spark中廣播變量的用途
使用廣播變量,每個(gè) Executor 的內(nèi)存中,只駐留一份變量副本,而不是對(duì) 每個(gè) task 都傳輸一次大變量,省了很多的網(wǎng)絡(luò)傳輸, 對(duì)性能提升具有很大幫助, 而且會(huì)通過(guò)高效的廣播算法來(lái)減少傳輸代價(jià)。
19.以下代碼會(huì)報(bào)錯(cuò)嗎?如果會(huì)怎么解決 val arr = new ArrayList[String]; arr.foreach(println)
val arr = new ArrayList[String]; 這里會(huì)報(bào)錯(cuò),需要改成 val arr: Array[String] = new Array[String](10)
arr.foreach(println)打印不會(huì)報(bào)空指針
20.寫出你用過(guò)的spark中的算子,其中哪些會(huì)產(chǎn)生shuffle過(guò)程
reduceBykey:
groupByKey:
…ByKey:
21.Spark中rdd與partition的區(qū)別
22.請(qǐng)寫出創(chuàng)建Dateset的幾種方式
23.描述一下RDD,DataFrame,DataSet的區(qū)別?
1)RDD
優(yōu)點(diǎn):
編譯時(shí)類型安全
編譯時(shí)就能檢查出類型錯(cuò)誤
面向?qū)ο蟮木幊田L(fēng)格
直接通過(guò)類名點(diǎn)的方式來(lái)操作數(shù)據(jù)
缺點(diǎn):
序列化和反序列化的性能開銷
無(wú)論是集群間的通信, 還是 IO 操作都需要對(duì)對(duì)象的結(jié)構(gòu)和數(shù)據(jù)進(jìn)行序列化和反序列化。
GC 的性能開銷,頻繁的創(chuàng)建和銷毀對(duì)象, 勢(shì)必會(huì)增加 GC
2)DataFrame
DataFrame 引入了 schema 和 off-heap
schema : RDD 每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的,這個(gè)結(jié)構(gòu)就存儲(chǔ)在 schema 中。Spark 通過(guò) schema 就能夠讀懂?dāng)?shù)據(jù), 因此在通信和 IO 時(shí)就只需要序列化和反序列化數(shù)據(jù), 而結(jié)構(gòu)的部分就可以省略了。
3)DataSet
DataSet 結(jié)合了 RDD 和 DataFrame 的優(yōu)點(diǎn),并帶來(lái)的一個(gè)新的概念 Encoder。
當(dāng)序列化數(shù)據(jù)時(shí),Encoder 產(chǎn)生字節(jié)碼與 off-heap 進(jìn)行交互,能夠達(dá)到按需訪問(wèn)數(shù)據(jù)的效果,而不用反序列化整個(gè)對(duì)象。Spark 還沒(méi)有提供自定義 Encoder 的 API,但是未來(lái)會(huì)加入。
三者之間的轉(zhuǎn)換:

24.描述一下Spark中stage是如何劃分的?描述一下shuffle的概念
25.Spark 在yarn上運(yùn)行需要做哪些關(guān)鍵的配置工作?如何kill -個(gè)Spark在yarn運(yùn)行中Application
26.通常來(lái)說(shuō),Spark與MapReduce相比,Spark運(yùn)行效率更高。請(qǐng)說(shuō)明效率更高來(lái)源于Spark內(nèi)置的哪些機(jī)制?并請(qǐng)列舉常見spark的運(yùn)行模式?
27.RDD中的數(shù)據(jù)在哪?
RDD中的數(shù)據(jù)在數(shù)據(jù)源,RDD只是一個(gè)抽象的數(shù)據(jù)集,我們通過(guò)對(duì)RDD的操作就相當(dāng)于對(duì)數(shù)據(jù)進(jìn)行操作。
28.如果對(duì)RDD進(jìn)行cache操作后,數(shù)據(jù)在哪里?
數(shù)據(jù)在第一執(zhí)行cache算子時(shí)會(huì)被加載到各個(gè)Executor進(jìn)程的內(nèi)存中,第二次就會(huì)直接從內(nèi)存中讀取而不會(huì)區(qū)磁盤。
29.Spark中Partition的數(shù)量由什么決定
和Mr一樣,但是Spark默認(rèn)最少有兩個(gè)分區(qū)。
30.Scala里面的函數(shù)和方法有什么區(qū)別
31.SparkStreaming怎么進(jìn)行監(jiān)控?
32.Spark判斷Shuffle的依據(jù)?
?父RDD的一個(gè)分區(qū)中的數(shù)據(jù)有可能被分配到子RDD的多個(gè)分區(qū)中
33.Scala有沒(méi)有多繼承?可以實(shí)現(xiàn)多繼承么?
34.Sparkstreaming和flink做實(shí)時(shí)處理的區(qū)別
35.Sparkcontext的作用
36.Sparkstreaming讀取kafka數(shù)據(jù)為什么選擇直連方式
37.離線分析什么時(shí)候用sparkcore和sparksq
38.Sparkstreaming實(shí)時(shí)的數(shù)據(jù)不丟失的問(wèn)題
39.簡(jiǎn)述寬依賴和窄依賴概念,groupByKey,reduceByKey,map,filter,union五種操作哪些會(huì)導(dǎo)致寬依賴,哪些會(huì)導(dǎo)致窄依賴
40.數(shù)據(jù)傾斜可能會(huì)導(dǎo)致哪些問(wèn)題,如何監(jiān)控和排查,在設(shè)計(jì)之初,要考慮哪些來(lái)避免
41.有一千萬(wàn)條短信,有重復(fù),以文本文件的形式保存,一行一條數(shù)據(jù),請(qǐng)用五分鐘時(shí)間,找出重復(fù)出現(xiàn)最多的前10條
42.現(xiàn)有一文件,格式如下,請(qǐng)用spark統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)
18619304961,18619304064,186193008,186193009
18619304962,18619304065,186193007,186193008
18619304963,18619304066,186193006,186193010
43.共享變量和累加器
累加器(accumulator)是 Spark 中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個(gè)常見用途是在調(diào)試時(shí)對(duì)作業(yè)執(zhí)行過(guò)程中的事件進(jìn)行計(jì)數(shù)。而廣播變量用來(lái)高效分發(fā)較大的對(duì)象。
共享變量出現(xiàn)的原因:
通常在向 Spark 傳遞函數(shù)時(shí),比如使用 map() 函數(shù)或者用 filter() 傳條件時(shí),可以使用驅(qū)動(dòng)器程序中定義的變量,但是集群中運(yùn)行的每個(gè)任務(wù)都會(huì)得到這些變量的一份新的副本,更新這些副本的值也不會(huì)影響驅(qū)動(dòng)器中的對(duì)應(yīng)變量。
Spark 的兩個(gè)共享變量,累加器與廣播變量,分別為結(jié)果聚合與廣播這兩種常見的通信模式突破了這一限制。
44.當(dāng) Spark 涉及到數(shù)據(jù)庫(kù)的操作時(shí),如何減少 Spark 運(yùn)行中的數(shù)據(jù)庫(kù)連接數(shù)?
使用 foreachPartition 代替 foreach,在 foreachPartition 內(nèi)獲取數(shù)據(jù)庫(kù)的連接。
45.特別大的數(shù)據(jù),怎么發(fā)送到excutor中?
46.spark調(diào)優(yōu)都做過(guò)哪些方面?
47.spark任務(wù)為什么會(huì)被yarn kill掉?
48.Spark on Yarn作業(yè)執(zhí)行流程?yarn-client和yarn-cluster有什么區(qū)別?
49.Flatmap底層編碼實(shí)現(xiàn)?
Spark flatMap 源碼:
/**
? *? Return a new RDD by first applying a function to all elements of this
? *? RDD, and then flattening the results.
? */
? def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
? ? val cleanF = sc.clean(f)
? ? new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
? }
Scala flatMap 源碼:
/** Creates a new iterator by applying a function to all values produced by this iterator
? *? and concatenating the results.
? *
? *? @param f the function to apply on each element.
? *? @return? the iterator resulting from applying the given iterator-valued function
? *? ? ? ? ? `f` to each value produced by this iterator and concatenating the results.
? *? @note? ? Reuse: $consumesAndProducesIterator
? */
? def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
? ? private var cur: Iterator[B] = empty
? ? private def nextCur() { cur = f(self.next()).toIterator }
? ? def hasNext: Boolean = {
? ? ? Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
? ? ? but slightly shorter bytecode (better JVM inlining!)
? ? ? while (!cur.hasNext) {
? ? ? ? if (!self.hasNext) return false
? ? ? ? nextCur()
? ? ? }
? ? ? true
? ? }
? ? def next(): B =<span style="color:#ffffff"> <span style="background-color:rgb(255,0,0)">(if (hasNext) cur else empty).next()</span></span>
? }
flatMap其實(shí)就是將RDD里的每一個(gè)元素執(zhí)行自定義函數(shù)f,這時(shí)這個(gè)元素的結(jié)果轉(zhuǎn)換成iterator,最后將這些再拼接成一個(gè)
新的RDD,也可以理解成原本的每個(gè)元素由橫向執(zhí)行函數(shù)f后再變?yōu)榭v向。畫紅部分一直在回調(diào),當(dāng)RDD內(nèi)沒(méi)有元素為止。
50.spark_1.X與spark_2.X區(qū)別?
51.說(shuō)說(shuō)spark與flink
52.spark streaming如何保證7*24小時(shí)運(yùn)行機(jī)制?
53.spark streaming是Exactly-Once嗎?
四.Kafka
1.Kafka名詞解釋和工作方式
Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
Consumer :消息消費(fèi)者,向kafka broker取消息的客戶端
Topic :咋們可以理解為一個(gè)隊(duì)列。
Consumer Group (CG):這是kafka用來(lái)實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG。topic的消息會(huì)復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)partion只會(huì)把消息發(fā)給該CG中的一個(gè)consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)CG。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic。
Broker :一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。
Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給consumer,不保證一個(gè)topic的整體(多個(gè)partition間)的順序。
Offset:kafka的存儲(chǔ)文件都是按照offset.kafka來(lái)命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka
2.Consumer與topic關(guān)系
本質(zhì)上kafka只支持Topic;
每個(gè)group中可以有多個(gè)consumer,每個(gè)consumer屬于一個(gè)consumer group;
通常情況下,一個(gè)group中會(huì)包含多個(gè)consumer,這樣不僅可以提高topic中消息的并發(fā)消費(fèi)能力,而且還能提高"故障容錯(cuò)"性,如果group中的某個(gè)consumer失效那么其消費(fèi)的partitions將會(huì)有其他consumer自動(dòng)接管。
對(duì)于Topic中的一條特定的消息,只會(huì)被訂閱此Topic的每個(gè)group中的其中一個(gè)consumer消費(fèi),此消息不會(huì)發(fā)送給一個(gè)group的多個(gè)consumer;
那么一個(gè)group中所有的consumer將會(huì)交錯(cuò)的消費(fèi)整個(gè)Topic,每個(gè)group中consumer消息消費(fèi)互相獨(dú)立,我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者。
在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi)(同一時(shí)刻);
一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以同時(shí)消費(fèi)多個(gè)partitions中的消息。
kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息。
kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí)是順序的;事實(shí)上,從Topic角度來(lái)說(shuō),當(dāng)有多個(gè)partitions時(shí),消息仍不是全局有序的。
3.kafka中生產(chǎn)數(shù)據(jù)的時(shí)候,如何保證寫入的容錯(cuò)性?
設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個(gè)值0,1,-1
0: producer不會(huì)等待broker發(fā)送ack
1: 當(dāng)leader接收到消息之后發(fā)送ack
-1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack
request.required.acks=0
4.如何保證kafka消費(fèi)者消費(fèi)數(shù)據(jù)是全局有序的
偽命題
每個(gè)分區(qū)內(nèi),每條消息都有一個(gè)offset,故只能保證分區(qū)內(nèi)有序。
如果要全局有序的,必須保證生產(chǎn)有序,存儲(chǔ)有序,消費(fèi)有序。
由于生產(chǎn)可以做集群,存儲(chǔ)可以分片,消費(fèi)可以設(shè)置為一個(gè)consumerGroup,要保證全局有序,就需要保證每個(gè)環(huán)節(jié)都有序。
只有一個(gè)可能,就是一個(gè)生產(chǎn)者,一個(gè)partition,一個(gè)消費(fèi)者。這種場(chǎng)景和大數(shù)據(jù)應(yīng)用場(chǎng)景相悖。
5.有兩個(gè)數(shù)據(jù)源,一個(gè)記錄的是廣告投放給用戶的日志,一個(gè)記錄用戶訪問(wèn)日志,另外還有一個(gè)固定的用戶基礎(chǔ)表記錄用戶基本信息(比如學(xué)歷,年齡等等)。現(xiàn)在要分析廣告投放對(duì)與哪類用戶更有效,請(qǐng)采用熟悉的技術(shù)描述解決思路。另外如果兩個(gè)數(shù)據(jù)源都是實(shí)時(shí)數(shù)據(jù)源(比如來(lái)自kafka),他們數(shù)據(jù)在時(shí)間上相差5分鐘,需要哪些調(diào)整來(lái)解決實(shí)時(shí)分析問(wèn)題?
6.Kafka和SparkStreaing如何集成?
7.列舉Kafka的優(yōu)點(diǎn),簡(jiǎn)述Kafka為什么可以做到每秒數(shù)十萬(wàn)甚至上百萬(wàn)消息的高效分發(fā)?
8.為什么離線分析要用kafka?
Kafka的作用是解耦,如果直接從日志服務(wù)器上采集的話,實(shí)時(shí)離線都要采集,等于要采集兩份數(shù)據(jù),而使用了kafka的話,只需要從日志服務(wù)器上采集一份數(shù)據(jù),然后在kafka中使用不同的兩個(gè)組讀取就行了
9.Kafka怎么進(jìn)行監(jiān)控?
Kafka?Manager
10.Kafka與傳統(tǒng)的消息隊(duì)列服務(wù)有很么不同
11.Kafka api ?low-level與high-level有什么區(qū)別,使用low-level需要處理哪些細(xì)節(jié)
12.Kafka的ISR副本同步隊(duì)列
ISR(In-Sync Replicas),副本同步隊(duì)列。ISR中包括Leader和Follower。如果Leader進(jìn)程掛掉,會(huì)在ISR隊(duì)列中選擇一個(gè)服務(wù)作為新的Leader。有replica.lag.max.messages(延遲條數(shù))和replica.lag.time.max.ms(延遲時(shí)間)兩個(gè)參數(shù)決定一臺(tái)服務(wù)是否可以加入ISR副本隊(duì)列,在0.10版本移除了replica.lag.max.messages參數(shù),防止服務(wù)頻繁的進(jìn)去隊(duì)列。
任意一個(gè)維度超過(guò)閾值都會(huì)把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會(huì)先存放在OSR中。
13.Kafka消息數(shù)據(jù)積壓,Kafka消費(fèi)能力不足怎么處理?
1)如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)
2)如果是下游的數(shù)據(jù)處理不及時(shí):提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過(guò)少(拉取數(shù)據(jù)/處理時(shí)間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。
14.Kafka中的ISR、AR又代表什么?
?ISR:in-sync replica set (ISR),與leader保持同步的follower集合
??? AR:分區(qū)的所有副本
15.Kafka中的HW、LEO等分別代表什么?
LEO:每個(gè)副本的最后條消息的offset
??? HW:一個(gè)分區(qū)中所有副本最小的offset
16.哪些情景會(huì)造成消息漏消費(fèi)?
先提交offset,后消費(fèi),有可能造成數(shù)據(jù)的重復(fù)
17.當(dāng)你使用kafka-topics.sh創(chuàng)建了一個(gè)topic之后,Kafka背后會(huì)執(zhí)行什么邏輯?
? 1)會(huì)在zookeeper中的/brokers/topics節(jié)點(diǎn)下創(chuàng)建一個(gè)新的topic節(jié)點(diǎn),如:/brokers/topics/first
????2)觸發(fā)Controller的監(jiān)聽程序
????3)kafka Controller 負(fù)責(zé)topic的創(chuàng)建工作,并更新metadata cache
18.topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
19.topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?
不可以減少,被刪除的分區(qū)數(shù)據(jù)難以處理。
20.Kafka有內(nèi)部的topic嗎?如果有是什么?有什么所用?
?__consumer_offsets,保存消費(fèi)者offset
21.聊一聊Kafka Controller的作用?
負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。
22.失效副本是指什么?有那些應(yīng)對(duì)措施?
不能及時(shí)與leader同步,暫時(shí)踢出ISR,等其追上leader之后再重新加入
23.Kafka 都有哪些特點(diǎn)?
高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作。
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失?。ㄈ舾北緮?shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敗)
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
24.請(qǐng)簡(jiǎn)述下你在哪些場(chǎng)景下會(huì)選擇 Kafka?
日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、HBase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶或者app用戶的各種活動(dòng),如瀏覽網(wǎng)頁(yè)、搜索、點(diǎn)擊等活動(dòng),這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘。
運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
流式處理:比如spark streaming和 Flink
25.Kafka 的設(shè)計(jì)架構(gòu)你知道嗎?
簡(jiǎn)單架構(gòu)如下

詳細(xì)如下

Kafka 架構(gòu)分為以下幾個(gè)部分
Producer :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端。
Consumer :消息消費(fèi)者,向 kafka broker 取消息的客戶端。
Topic :可以理解為一個(gè)隊(duì)列,一個(gè) Topic 又分為一個(gè)或多個(gè)分區(qū)。
Consumer Group:這是 kafka 用來(lái)實(shí)現(xiàn)一個(gè) topic 消息的廣播(發(fā)給所有的 consumer)和單播(發(fā)給任意一個(gè) consumer)的手段。一個(gè) topic 可以有多個(gè) Consumer Group。
Broker :一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。
Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker上,每個(gè) partition 是一個(gè)有序的隊(duì)列。partition 中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。將消息發(fā)給 consumer,kafka 只保證按一個(gè) partition 中的消息的順序,不保證一個(gè) topic 的整體(多個(gè) partition 間)的順序。
Offset:kafka 的存儲(chǔ)文件都是按照 offset.kafka 來(lái)命名,用 offset 做名字的好處是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。當(dāng)然 the first offset 就是 00000000000.kafka。
26.Kafka 分區(qū)的目的?
分區(qū)對(duì)于 Kafka 集群的好處是:實(shí)現(xiàn)負(fù)載均衡。分區(qū)對(duì)于消費(fèi)者來(lái)說(shuō),可以提高并發(fā)度,提高效率。
27.你知道 Kafka 是如何做到消息的有序性?
kafka 中的每個(gè) partition 中的消息在寫入時(shí)都是有序的,而且消息帶有offset偏移量,消費(fèi)者按偏移量的順序從前往后消費(fèi),從而保證了消息的順序性。但是分區(qū)之間的消息是不保證有序的。
28.Kafka 的高可靠性是怎么實(shí)現(xiàn)的?
kafka通過(guò)分區(qū)的多副本機(jī)制來(lái)保證消息的可靠性。1. 每個(gè)分區(qū)可以設(shè)置多個(gè)副本,這些副本分布在不同的broker上;2. 相同partition的多個(gè)副本能動(dòng)態(tài)選舉leader來(lái)對(duì)外服務(wù)和管理內(nèi)部數(shù)據(jù)同步。這樣,即使有broker出現(xiàn)故障,受影響的partition也會(huì)在其他broker上重新選舉出新的leader來(lái)繼續(xù)服務(wù)
更具體來(lái)說(shuō),可參看下文:
Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個(gè)副本可以使 Kafka 在發(fā)生崩潰時(shí)仍能保證消息的持久性。
Producer 往 Broker 發(fā)送消息
如果我們要往 Kafka 對(duì)應(yīng)的主題發(fā)送消息,我們需要通過(guò) Producer 完成。前面我們講過(guò) Kafka 主題對(duì)應(yīng)了多個(gè)分區(qū),每個(gè)分區(qū)下面又對(duì)應(yīng)了多個(gè)副本;為了讓用戶設(shè)置數(shù)據(jù)可靠性, Kafka 在 Producer 里面提供了消息確認(rèn)機(jī)制。也就是說(shuō)我們可以通過(guò)配置來(lái)決定消息發(fā)送到對(duì)應(yīng)分區(qū)的幾個(gè)副本才算消息發(fā)送成功。可以在定義 Producer 時(shí)通過(guò) acks 參數(shù)指定(在 0.8.2.X 版本之前是通過(guò) request.required.acks 參數(shù)設(shè)置的,詳見 KAFKA-3043)。這個(gè)參數(shù)支持以下三種值:
acks = 0:意味著如果生產(chǎn)者能夠通過(guò)網(wǎng)絡(luò)把消息發(fā)送出去,那么就認(rèn)為消息已成功寫入 Kafka 。在這種情況下還是有可能發(fā)生錯(cuò)誤,比如發(fā)送的對(duì)象無(wú)能被序列化或者網(wǎng)卡發(fā)生故障,但如果是分區(qū)離線或整個(gè)集群長(zhǎng)時(shí)間不可用,那就不會(huì)收到任何錯(cuò)誤。在 acks=0 模式下的運(yùn)行速度是非常快的(這就是為什么很多基準(zhǔn)測(cè)試都是基于這個(gè)模式),你可以得到驚人的吞吐量和帶寬利用率,不過(guò)如果選擇了這種模式, 一定會(huì)丟失一些消息。
acks = 1:意味若 Leader 在收到消息并把它寫入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤上)時(shí)會(huì)返回確認(rèn)或錯(cuò)誤響應(yīng)。在這個(gè)模式下,如果發(fā)生正常的 Leader 選舉,生產(chǎn)者會(huì)在選舉時(shí)收到一個(gè) LeaderNotAvailableException 異常,如果生產(chǎn)者能恰當(dāng)?shù)靥幚磉@個(gè)錯(cuò)誤,它會(huì)重試發(fā)送悄息,最終消息會(huì)安全到達(dá)新的 Leader 那里。不過(guò)在這個(gè)模式下仍然有可能丟失數(shù)據(jù),比如消息已經(jīng)成功寫入 Leader,但在消息被復(fù)制到 follower 副本之前 Leader發(fā)生崩潰。
acks = all(這個(gè)和 request.required.acks = -1 含義一樣):意味著 Leader 在返回確認(rèn)或錯(cuò)誤響應(yīng)之前,會(huì)等待所有同步副本都收到悄息。如果和 min.insync.replicas 參數(shù)結(jié)合起來(lái),就可以決定在返回確認(rèn)前至少有多少個(gè)副本能夠收到悄息,生產(chǎn)者會(huì)一直重試直到消息被成功提交。不過(guò)這也是最慢的做法,因?yàn)樯a(chǎn)者在繼續(xù)發(fā)送其他消息之前需要等待所有副本都收到當(dāng)前的消息。
根據(jù)實(shí)際的應(yīng)用場(chǎng)景,我們?cè)O(shè)置不同的 acks,以此保證數(shù)據(jù)的可靠性。
另外,Producer 發(fā)送消息還可以選擇同步(默認(rèn),通過(guò) producer.type=sync 配置) 或者異步(producer.type=async)模式。如果設(shè)置成異步,雖然會(huì)極大的提高消息發(fā)送的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。如果需要確保消息的可靠性,必須將 producer.type 設(shè)置為 sync。
Leader 選舉
在介紹 Leader 選舉之前,讓我們先來(lái)了解一下 ISR(in-sync replicas)列表。每個(gè)分區(qū)的 leader 會(huì)維護(hù)一個(gè) ISR 列表,ISR 列表里面就是 follower 副本的 Borker 編號(hào),只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,這個(gè)是通過(guò) replica.lag.time.max.ms 參數(shù)配置的,具體可以參見?《一文了解 Kafka 的副本復(fù)制機(jī)制》。只有 ISR 里的成員才有被選為 leader 的可能。
所以當(dāng) Leader 掛掉了,而且 unclean.leader.election.enable=false 的情況下,Kafka 會(huì)從 ISR 列表中選擇第一個(gè) follower 作為新的 Leader,因?yàn)檫@個(gè)分區(qū)擁有最新的已經(jīng) committed 的消息。通過(guò)這個(gè)可以保證已經(jīng) committed 的消息的數(shù)據(jù)可靠性。
綜上所述,為了保證數(shù)據(jù)的可靠性,我們最少需要配置一下幾個(gè)參數(shù):
producer 級(jí)別:acks=all(或者 request.required.acks=-1),同時(shí)發(fā)生模式為同步 producer.type=sync
topic 級(jí)別:設(shè)置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 級(jí)別:關(guān)閉不完全的 Leader 選舉,即 unclean.leader.election.enable=false;
29.請(qǐng)談一談 Kafka 數(shù)據(jù)一致性原理
一致性就是說(shuō)不論是老的 Leader 還是新選舉的 Leader,Consumer 都能讀到一樣的數(shù)據(jù)。

假設(shè)分區(qū)的副本為3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。雖然副本0已經(jīng)寫入了 Message4,但是 Consumer 只能讀取到 Message2。因?yàn)樗械?ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 讀取,而 High Water Mark 取決于 ISR 列表里面偏移量最小的分區(qū),對(duì)應(yīng)于上圖的副本2,這個(gè)很類似于木桶原理。
這樣做的原因是還沒(méi)有被足夠多副本復(fù)制的消息被認(rèn)為是“不安全”的,如果 Leader 發(fā)生崩潰,另一個(gè)副本成為新 Leader,那么這些消息很可能丟失了。如果我們?cè)试S消費(fèi)者讀取這些消息,可能就會(huì)破壞一致性。試想,一個(gè)消費(fèi)者從當(dāng)前 Leader(副本0) 讀取并處理了 Message4,這個(gè)時(shí)候 Leader 掛掉了,選舉了副本1為新的 Leader,這時(shí)候另一個(gè)消費(fèi)者再去從新的 Leader 讀取消息,發(fā)現(xiàn)這個(gè)消息其實(shí)并不存在,這就導(dǎo)致了數(shù)據(jù)不一致性問(wèn)題。
當(dāng)然,引入了 High Water Mark 機(jī)制,會(huì)導(dǎo)致 Broker 間的消息復(fù)制因?yàn)槟承┰蜃兟?,那么消息到達(dá)消費(fèi)者的時(shí)間也會(huì)隨之變長(zhǎng)(因?yàn)槲覀儠?huì)先等待消息復(fù)制完畢)。延遲時(shí)間可以通過(guò)參數(shù) replica.lag.time.max.ms 參數(shù)配置,它指定了副本在復(fù)制消息時(shí)可被允許的最大延遲時(shí)間。