maxRecordsPerFile 使用
在實(shí)現(xiàn)一個(gè)需求的時(shí)候發(fā)現(xiàn)的問題;要求是將數(shù)據(jù)寫入到cos中,要求每個(gè)文件的固定條數(shù)是5千萬條且文件名字后綴以數(shù)字遞增例如001,002..,使用該參數(shù)發(fā)現(xiàn),不符合該需求,存在多個(gè)文件里不是5千萬的,而要求應(yīng)當(dāng)是最多一個(gè)文件不滿足5千萬條
探查數(shù)據(jù)塊發(fā)現(xiàn)數(shù)據(jù)塊存在倆中命名結(jié)構(gòu) part-00001和part-c0001
maxRecordsPerFile 是「Task 級別」的參數(shù),需要加入repartition(1),通過 Shuffle 將數(shù)據(jù)均勻分配到指定數(shù)量的分區(qū),一個(gè)分區(qū)對應(yīng)一個(gè) Task,從源頭控制數(shù)據(jù)分布。
解釋 spark文件中part-0000結(jié)構(gòu)和part-c00001結(jié)果的區(qū)別
當(dāng)你使用 repartition(num_partitions) 后,數(shù)據(jù)被分配到 num_partitions 個(gè)分區(qū),一個(gè)分區(qū)對應(yīng)一個(gè) Task。如果某個(gè) Task 處理的數(shù)據(jù)量超過了 maxRecordsPerFile 的限制(比如 50000 條),Spark 會在這個(gè) Task 內(nèi)自動拆分出多個(gè)文件,此時(shí)文件名會出現(xiàn) c 前綴和遞增的編號:
part-c00000-...:該 Task 生成的第 1 個(gè)文件;part-c00001-...:該 Task 生成的第 2 個(gè)文件;以此類推。
總數(shù)據(jù)量:120000 條;
repartition(2):2 個(gè)分區(qū)(Task),每個(gè) Task 約 60000 條;maxRecordsPerFile=50000:每個(gè) Task 超過 50000 條,需拆分。
最終輸出文件會是:
- Task 0(分區(qū) 0):60000 條 → 拆分為
part-c00000-...(50000 條)、part-c00001-...(10000 條); - Task 1(分區(qū) 1):60000 條 → 拆分為
part-c00002-...(50000 條)、part-c00003-...(10000 條);
如果只用 maxRecordsPerFile 不用 repartition,會出現(xiàn)以下問題:
問題 1:數(shù)據(jù)傾斜:若原始分區(qū)數(shù)據(jù)不均(比如一個(gè)分區(qū) 100 萬條,另一個(gè)分區(qū) 1 萬條),會導(dǎo)致大分區(qū)生成很多文件,小分區(qū)生成小文件,整體文件大小不均;
問題 2:小文件過多:若原始分區(qū)數(shù)過多(比如 1000 個(gè)分區(qū),每個(gè)分區(qū) 1000 條),即使 maxRecordsPerFile=50000,也會生成 1000 個(gè)小文件(每個(gè) 1000 條);
- 無
maxRecordsPerFile:part-[分區(qū) ID]-UUID.格式; - 有
maxRecordsPerFile:part-c[文件索引]-[分區(qū) ID]-UUID.格式;
image.png
