上面兩篇大部分介紹的都是理論知識(shí),希望看到前兩篇的都讀讀。讀一遍 不容易理解現(xiàn)在這一篇是介紹api操作的。相對(duì)來(lái)說(shuō)容易些也是方便我自己記憶。
RDD的兩種類(lèi)型操作。
有哪兩種操作呢?分別是transformation ,action 也是我們上面所說(shuō)的轉(zhuǎn)換 和行動(dòng)。
Transformations 使用的是常用的api操作還有很多可能介紹不到
- map():將原來(lái)的RDD的每個(gè)數(shù)據(jù)想根據(jù)自定義函數(shù)進(jìn)行映射,轉(zhuǎn)換成一個(gè)新的RDD。
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App Test");
JavaRDD<String> pairRDD = scContext.parallelize(Arrays.asList("a","b","c"));
pairRDD.map(result -> result.split(" "));
- filter(): 使用該函數(shù) 對(duì)RDD數(shù)據(jù)進(jìn)行過(guò)濾。將符合條件的RDD中的數(shù)據(jù) 組成新的RDD返回。
JavaRDD<String> pairRDD = scContext.parallelize(Arrays.asList("a","b","c"));
// pairRDD.map(result -> result.split(" "));
JavaRDD<String> resultRdd=pairRDD.filter( content -> {
return content.equals('s') ;
}) ;
}
- flatMap()類(lèi)似與Map(),不過(guò)這個(gè)map,返回值是一個(gè)數(shù)據(jù)項(xiàng)集合,而不是一個(gè)單項(xiàng)的數(shù)據(jù)項(xiàng)。
- mapPartitions:類(lèi)似于Map,不過(guò)該操作是在每個(gè)分區(qū)上分別執(zhí)行的,所以當(dāng)操作一個(gè)類(lèi)型為T(mén)的RDD必須是Iterator =>Iterator 。
- sample():對(duì)數(shù)據(jù)進(jìn)行采樣用戶可以設(shè)定,是否有放回,采樣的百分比,隨機(jī)種子等。
- union():聚合操作??梢杂脕?lái)合并多個(gè)集合。但是使用union函數(shù)時(shí)必須抱枕RDD的理性是相同。
- distinct();去重操作。將重復(fù)的內(nèi)容排除掉。
- intersection() : 返回兩個(gè)數(shù)據(jù)集的交集。
- groupByKey(): 進(jìn)行分組。默認(rèn)情況下并行情況是根據(jù)父RDD的分區(qū)數(shù)來(lái)確定的。但是可以自己指定任務(wù)數(shù)。
-
reduceByKey():聚合操作:函數(shù)格式必須是(V,V)=>V。也可指定任務(wù)數(shù)。
參考例子:
PairRdd的轉(zhuǎn)化操作
Action
reduce(func)
使用函數(shù)func聚集數(shù)據(jù)集中的元素,這個(gè)函數(shù)func輸入為兩個(gè)元素,返回為一個(gè)元素。這個(gè)函數(shù)應(yīng)該符合結(jié)合律和交換了,這樣才能保證數(shù)據(jù)集中各個(gè)元素計(jì)算的正確性。
collect()
在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素。通常用于filter或其它產(chǎn)生了大量小數(shù)據(jù)集的情況。
count()
返回?cái)?shù)據(jù)集中元素的個(gè)數(shù)。
first()
返回?cái)?shù)據(jù)集中的第一個(gè)元素(類(lèi)似于take(1))。
take(n)
返回?cái)?shù)據(jù)集中的前n個(gè)元素。
takeSample(withReplacement,num, [seed])
對(duì)一個(gè)數(shù)據(jù)集隨機(jī)抽樣,返回一個(gè)包含num個(gè)隨機(jī)抽樣元素的數(shù)組,參數(shù)withReplacement指定是否有放回抽樣,參數(shù)seed指定生成隨機(jī)數(shù)的種子。
takeOrdered(n, [ordering])
返回RDD按自然順序或自定義順序排序后的前n個(gè)元素。
saveAsTextFile(path)
將數(shù)據(jù)集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統(tǒng)、HDFS或其它Hadoop支持的文件系統(tǒng)中。Spark將在每個(gè)元素上調(diào)用toString方法,將數(shù)據(jù)元素轉(zhuǎn)換為文本文件中的一行記錄。
saveAsSequenceFile(path) (Java and Scala)
將數(shù)據(jù)集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統(tǒng)、HDFS或其它Hadoop支持的文件系統(tǒng)中。該操作只支持對(duì)實(shí)現(xiàn)了Hadoop的Writable接口的鍵值對(duì)RDD進(jìn)行操作。在Scala中,還支持隱式轉(zhuǎn)換為Writable的類(lèi)型(Spark包括了基本類(lèi)型的轉(zhuǎn)換,例如Int、Double、String等等)。
saveAsObjectFile(path) (Java and Scala)
將數(shù)據(jù)集中的元素以簡(jiǎn)單的Java序列化的格式寫(xiě)入指定的路徑。這些保存該數(shù)據(jù)的文件,可以使用SparkContext.objectFile()進(jìn)行加載。
countByKey()
僅支持對(duì)(K,V)格式的鍵值對(duì)類(lèi)型的RDD進(jìn)行操作。返回(K,Int)格式的Hashmap,(K,Int)為每個(gè)key值對(duì)應(yīng)的記錄數(shù)目。
foreach(func)
對(duì)數(shù)據(jù)集中每個(gè)元素使用函數(shù)func進(jìn)行處理。該操作通常用于更新一個(gè)累加器(Accumulator)或與外部數(shù)據(jù)源進(jìn)行交互。注意:在foreach()之外修改累加器變量可能引起不確定的后果。詳細(xì)介紹請(qǐng)閱讀
