Shell腳本實(shí)現(xiàn)MapReduce統(tǒng)計(jì)單詞數(shù)程序

一、原理介紹


  • 概述
    Hadoop Streaming是Hadoop提供的一個(gè)編程工具,它允許用戶使用任何可執(zhí)行文件或者腳本文件作為Mapper和Reducer,例如:
    采用shell腳本語(yǔ)言中的一些命令作為mapper和reducer(cat作為mapper,wc作為reducer)
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
  • Hadoop Streaming原理
    Streaming的原理是用Java實(shí)現(xiàn)一個(gè)包裝用戶程序的MapReduce程序,該程序負(fù)責(zé)調(diào)用MapReduce Java接口獲取key/value對(duì)輸入,創(chuàng)建一個(gè)新的進(jìn)程啟動(dòng)包裝的用戶程序,將數(shù)據(jù)通過(guò)管道傳遞給包裝的用戶程序處理,然后調(diào)用MapReduce Java接口將用戶程序的輸出切分成key/value對(duì)輸出。
    mapper和reducer會(huì)從標(biāo)準(zhǔn)輸入中讀取用戶數(shù)據(jù),一行一行處理后發(fā)送給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)建MapReduce作業(yè),發(fā)送給各個(gè)tasktracker,同時(shí)監(jiān)控整個(gè)作業(yè)的執(zhí)行過(guò)程。
    如果一個(gè)文件(可執(zhí)行或者腳本)作為mapper,mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把該文件作為一個(gè)單獨(dú)進(jìn)程啟動(dòng),mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。 同時(shí),mapper收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key/value對(duì),作為mapper的輸出。 默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。如果沒(méi)有tab,整行作為key值,value值為null。
    對(duì)于reducer,類似。
    以上是Map/Reduce框架和streaming mapper/reducer之間的基本通信協(xié)議。
  • Streaming優(yōu)點(diǎn)
  1. 發(fā)效率高,便于移植。只要按照標(biāo)準(zhǔn)輸入輸出格式進(jìn)行編程,就可以滿足hadoop要求。因此單機(jī)程序稍加改動(dòng)就可以在集群上進(jìn)行使用。 同樣便于測(cè)試-只要按照 cat input | mapper | sort | reducer > output 進(jìn)行單機(jī)測(cè)試即可。如果單機(jī)測(cè)試通過(guò),大多數(shù)情況是可以在集群上成功運(yùn)行的,只要控制好內(nèi)存就好了。

  2. 提高程序效率-有些程序?qū)?nèi)存要求較高,如果用java控制內(nèi)存畢竟不如C/C++。

  • Streaming不足
    1.Streaming中的mapper和reducer默認(rèn)只能向標(biāo)準(zhǔn)輸出寫(xiě)數(shù)據(jù),不能方便地處理多路輸出

  • Hadoop Streaming用法

Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]

options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫(xiě)的mapper程序,可以是可執(zhí)行文件或者腳本
(4)-reducer:用戶自己寫(xiě)的reducer程序,可以是可執(zhí)行文件或者腳本
(5)-file:打包文件到提交的作業(yè)中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實(shí)現(xiàn))
(8)-D:作業(yè)的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數(shù)目
2)mapred.reduce.tasks:reduce task數(shù)目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數(shù)
據(jù)的分隔符,默認(rèn)均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數(shù)目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數(shù)目
另外,Hadoop本身還自帶一些好用的Mapper和Reducer:
(1) Hadoop聚集功能
Aggregate提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對(duì)產(chǎn)生“可聚合項(xiàng)”。Combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的選?。愃朴赨nix中的‘cut’)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut”工具。工具類中的map函數(shù)把輸入的key/value對(duì)看作字段的列表。 用戶可以指定字段的分隔符(默認(rèn)是tab),可以選擇字段列表中任意一段(由列表中一個(gè)或多個(gè)字段組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數(shù)也把輸入的key/value對(duì)看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。

二、shell腳本實(shí)現(xiàn)實(shí)例


  • 首先準(zhǔn)備測(cè)試文件
    用下面的腳本生成NG大小的test.txt
#! /bin/sh
while [ "1" == "1" ]
do
    echo "noe two three" >> test.txt
done
  • 將測(cè)試文件上傳到HDFS文件系統(tǒng)
[hadoop@master ~]$ hdfs dfs -mkdir /test
[hadoop@master ~]$ hdfs dfs -put test.txt /test/
[hadoop@master ~]$ hdfs dfs -ls /test
Found 1 items
-rw-r--r--   2 hadoop supergroup 4880841000 2017-03-20 10:45 /test/test.txt
  • mapper腳本代碼
#! /bin/bash
while read LINE; do
  for word in $LINE
  do
    #-e使得\t轉(zhuǎn)義(escape)為tab
    echo -e "$word\t1"
  done
done
  • reducer腳本代碼
#! /bin/sh
count=0
started=0
word=""
while read LINE;do
  newword=`echo $LINE | cut -d ' '  -f 1`
  if [ "$word" != "$newword" ];then
    [ $started -ne 0 ] && echo -e "$word\t$count"
    word=$newword
    count=1
    started=1
  else
    count=$(( $count + 1 ))
  fi
done
echo -e "$word\t$count"
  • 執(zhí)行任務(wù)
hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input test -output output2 -mapper mapper.sh -reducer reducer.sh -file mapper.sh -file reducer.sh
  • 可以在slave節(jié)點(diǎn)上看到相關(guān)的進(jìn)程,這些mapper進(jìn)程在master節(jié)點(diǎn)上是不存在的
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                           
 7822 hadoop    20   0  103m 1628 1076 R 100.0  0.0   2:53.34 mapper.sh                                                         
 7819 hadoop    20   0  103m 1632 1076 R 100.0  0.0   2:53.33 mapper.sh                                                         
 7836 hadoop    20   0  103m 1636 1076 R 100.0  0.0   2:53.21 mapper.sh                                                         
 7833 hadoop    20   0  103m 1628 1076 R 100.0  0.0   2:53.23 mapper.sh                                                         
29993 root      20   0  164g 346m  10m R 100.2  0.3 113:27.07 gpu_executor                                                      
 7653 hadoop    20   0  912m 224m  18m S 81.4  0.2   2:34.85 java                                                               
 7650 hadoop    20   0  899m 229m  18m S 80.4  0.2   2:33.40 java                                                               
 7651 hadoop    20   0  922m 235m  18m S 79.4  0.2   2:32.60 java                                                               
 7652 hadoop    20   0  915m 237m  18m S 79.1  0.2   2:31.61 java                                                               
28961 root      20   0  564m   9m 6252 S 18.8  0.0  23:43.42 TaskTracker                                                        
31606 hadoop    20   0 1838m 338m  18m S  4.3  0.3   0:25.58 java                                                               
 6102 liuhao    20   0  931m  28m  18m S  1.6  0.0 132:25.21 knotify4      
  • map完成后,reduce階段會(huì)看到reduce進(jìn)程
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                           
29993 root      20   0  164g 346m  10m R 97.7  0.3 222:36.66 gpu_executor                                                       
28961 root      20   0  564m   9m 6252 S 19.7  0.0  46:10.45 TaskTracker                                                        
17016 hadoop    20   0  103m 1664 1100 S 11.8  0.0   0:33.40 reducer.sh 
  • 相關(guān)問(wèn)題
  1. 首先遇到“找不到或無(wú)法加載類hadoop-streaming-2.7.3.jar”的問(wèn)題,后來(lái)發(fā)現(xiàn)是因?yàn)槊顏G了“jar”這個(gè)參數(shù)
  2. 然后又遇到j(luò)ob發(fā)不下去,一直卡在
INFO mapreduce.Job: Running job: job_1489999749396_0004

后來(lái)將slaves節(jié)點(diǎn)的hostname也修正為IP映射表內(nèi)對(duì)應(yīng)的名字,解決?

  1. 運(yùn)行階段卡在reduce
17/03/20 19:50:29 INFO mapreduce.Job:  map 74% reduce 0%
17/03/20 19:50:30 INFO mapreduce.Job:  map 77% reduce 0%
17/03/20 19:50:33 INFO mapreduce.Job:  map 84% reduce 0%
17/03/20 19:50:36 INFO mapreduce.Job:  map 91% reduce 0%
17/03/20 19:50:39 INFO mapreduce.Job:  map 97% reduce 0%
17/03/20 19:50:40 INFO mapreduce.Job:  map 98% reduce 0%
17/03/20 19:50:41 INFO mapreduce.Job:  map 100% reduce 0%
17/03/20 19:50:50 INFO mapreduce.Job:  map 100% reduce 67%

根據(jù)一位外國(guó)友人的說(shuō)明,在reduce階段 ,0-33%階段是 shuffle 階段,就是根據(jù)鍵值 來(lái)講本條記錄發(fā)送到指定的reduce,這個(gè)階段應(yīng)該是在map還沒(méi)有完全完成的時(shí)候就已經(jīng)開(kāi)始了,因?yàn)槲覀儠?huì)看到map在執(zhí)行到一個(gè)百分比后reduce也啟動(dòng)了,這樣做也提高了程序的執(zhí)行效率。
34%-65%階段是sort階段,就是reduce根據(jù)收到的鍵值進(jìn)行排序。map階段也會(huì)發(fā)生排序,map的輸出結(jié)果是以鍵值為順序排序后輸出,可以通過(guò)只有map階段處理的輸出來(lái)驗(yàn)證(以前自己驗(yàn)證過(guò),貌似確有這么回事,大家自己再驗(yàn)證下,免得我誤人子弟?。?br> 66%-100%階段是處理階段,這個(gè)階段才是真正的處理階段,如果程序卡在這里,估計(jì)就是你的reduce程序有問(wèn)題了。
索性等了一晚上,第二天終于有動(dòng)靜了

17/03/21 07:01:53 INFO mapreduce.Job:  map 100% reduce 77%

和上面的記錄對(duì)比發(fā)現(xiàn),從%67到%77用了11個(gè)小時(shí)!這明顯是reduce程序效率太慢了。也可能是數(shù)據(jù)傾斜問(wèn)題。中間也試過(guò)增加reducer的數(shù)量,但無(wú)果。最終我索性減少了輸入文件的行數(shù),使其只有三行:

one two three
one two three
one two three

然后重新運(yùn)行程序,瞬間得到了結(jié)果:

[hadoop@master ~]$ hdfs dfs -cat output2/part-00002
one\t3  
three\t3    
two\t3  

可見(jiàn),結(jié)果是正確的。


Python版本

  • mapper腳本
#!/usr/bin/env python
 
import sys
 
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words while removing any empty strings
    words = filter(lambda word: word, line.split())
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
  • reducer腳本
#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split()
    # convert count (currently a string) to int
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        pass
 
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
 
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)
  • 單機(jī)測(cè)試
[hadoop@master ~]$ echo "one two three" | ./mapper.py | sort | ./reducer.py 
one 1
three   1
two 1
  • 運(yùn)行
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output3 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py 

令人詫異的是很快就執(zhí)行完了,難道真的是shell腳本不適合做類似統(tǒng)計(jì)這樣的事情嗎?

[hadoop@master ~]$ hdfs dfs -ls output3
Found 4 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:12 output3/_SUCCESS
-rw-r--r--   2 hadoop supergroup         14 2017-03-22 10:12 output3/part-00000
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:12 output3/part-00001
-rw-r--r--   2 hadoop supergroup         24 2017-03-22 10:12 output3/part-00002
[hadoop@master ~]$ hdfs dfs -cat output3/part-00002
noe 1166202
two 1166202
[hadoop@master ~]$ hdfs dfs -cat output3/part-00000
three   1166202
[hadoop@master ~]$ 

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容