解決Spark Streaming寫入HDFS的小文件問題

今天仍然處于感冒狀態(tài),打開電腦隨便寫一篇,然后滾回床上休息。

我們都知道,在HDFS中不宜存儲大量的小文件。所謂小文件,就是大小遠小于dfs.block.size的文件。如果有大量小文件的話,會浪費block,使元數(shù)據(jù)增加,擠占寶貴的NameNode內(nèi)存。另外,大文件能夠發(fā)揮磁盤順序讀寫的優(yōu)勢,小文件會產(chǎn)生很多隨機讀寫,性能下降。

在我們的數(shù)倉體系中,有一部分業(yè)務(wù)的日志數(shù)據(jù)來源是RocketMQ。我們編寫了Spark Streaming程序作為consumer,將這些日志下沉到以天分區(qū)的Hive外部表中,批次間隔(batch duration)為1分鐘。久而久之,產(chǎn)生了很多小文件。直覺上講可以通過增長batch duration來減少輸出,但這肯定是下下策。

實在更不動了,明天繼續(xù)吧(╯‵□′)╯︵┻━┻


感覺稍微好了一些,繼續(xù)寫。我們用兩種方法合并解決該問題,十分有效,下面簡要敘述下。

利用coalesce()和repartition()算子

在真正落盤之前,可以對RDD做如下兩種操作之一:

rdd.coalesce(1, true)
rdd.repartition(1)

Spark Streaming在將結(jié)果輸出到HDFS時是按分區(qū)來的,分區(qū)越多,產(chǎn)生的小文件自然也越多。coalesce()算子就用來為RDD重新分區(qū),其源碼如下,位于RDD類中。

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](
          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
          new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

該算子主要參數(shù)有兩個:numPartitions表示目標分區(qū)數(shù),shuffle表示重分區(qū)過程中是否Shuffle。

如果shuffle參數(shù)為true的話,會從一個隨機分區(qū)開始,利用HashPartitioner將所有數(shù)據(jù)重新均勻分布到numPartitions個分區(qū)上,返回一個由CoalescedRDD包裝的ShuffleRDD,父子RDD之間為寬依賴。如果shuffle參數(shù)為false,就直接返回CoalescedRDD,其內(nèi)部就只是簡單地將多個分區(qū)的數(shù)據(jù)flatMap之后合并為一個分區(qū),父子RDD之間為窄依賴。

由上面的分析可知,若numPartitions大于原分區(qū)數(shù),那么shuffle參數(shù)一定要設(shè)為true才可以。若numPartitions小于原分區(qū)數(shù),那么又有兩種情況要考慮:

  • 分區(qū)數(shù)之間的比例不太懸殊。比如原有1000個分區(qū),減少到200個分區(qū),這時可以將shuffle設(shè)為false,因為子RDD中的一個分區(qū)只對應(yīng)父RDD的5個分區(qū),壓力不大。

  • 分區(qū)數(shù)之間的比例懸殊。比如原有500個分區(qū),減少到1個分區(qū),就要將shuffle設(shè)為true,保證生成CoalescedRDD之前的操作有足夠的并行度,防止Executor出現(xiàn)單點問題。這也就是本節(jié)開頭的做法了。

repartition()算子是借助coalesce()實現(xiàn)的,就是shuffle參數(shù)默認為true的版本。

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

這種方法非常簡單,只需要一句話就可以使每批次輸出只有一個文件。不過它會增加批次處理時長,如果數(shù)據(jù)量巨大,可能會造成數(shù)據(jù)堆積,因此需要觀察之后再使用。

利用copyMerge()方法

Hadoop的FileUtil工具類中提供了copyMerge()方法,它專門用來將一個HDFS目錄下的所有文件合并成一個文件并輸出,其源碼如下。

  public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                  FileSystem dstFS, Path dstFile, 
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);

    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   
    OutputStream out = dstFS.create(dstFile);
    
    try {
      FileStatus contents[] = srcFS.listStatus(srcDir);
      Arrays.sort(contents);
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
          InputStream in = srcFS.open(contents[i].getPath());
          try {
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              out.write(addString.getBytes("UTF-8"));
                
          } finally {
            in.close();
          } 
        }
      }
    } finally {
      out.close();
    }
    
    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }

我們就可以寫一個簡單的程序,通過調(diào)用copyMerge()方法合并Hive外部表對應(yīng)分區(qū)的文件,并且按照分區(qū)的時間粒度(天、小時等)調(diào)度。源數(shù)據(jù)的文件夾可以通過參數(shù)來指定,并且設(shè)置deleteSource參數(shù)為true,就能在合并完成后刪除原來的小文件。需要注意的是,為了避免將當(dāng)前正在寫入的文件也合并進去,調(diào)度需要有一點延時。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • The Hadoop Distributed Filesystem 1. Why HDFS ? When a da...
    須臾之北閱讀 919評論 0 1
  • HDFS是Hadoop Distribute File System 的簡稱,也就是Hadoop的一個分布式文件系...
    大佛愛讀書閱讀 975評論 0 0
  • 1.背景 HDFS最初是參考谷歌GFS論文原理開發(fā)的一個開源產(chǎn)品,由Lucene開源項目的創(chuàng)始人Doug Cutt...
    架構(gòu)禪話閱讀 1,705評論 0 2
  • 從“涸澤而漁”的字面意思去理解,而不考慮它引申的意義,談?wù)劇敦懹^長歌》中的一個小故事。很多事情,換個考慮的角度,就...
    殘劍閱讀 701評論 0 1
  • 昨天我參加了一場婚禮,婚禮的主人公是媽媽朋友的兒子。正當(dāng)我興致勃勃的看新郎和新娘喝交杯酒的時候,坐在我旁邊的王阿姨...
    韓雅潔閱讀 1,445評論 4 5

友情鏈接更多精彩內(nèi)容