一、原理介紹
- 概述
Hadoop Streaming是Hadoop提供的一個(gè)編程工具,它允許用戶使用任何可執(zhí)行文件或者腳本文件作為Mapper和Reducer,例如:
采用shell腳本語(yǔ)言中的一些命令作為mapper和reducer(cat作為mapper,wc作為reducer)
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
- Hadoop Streaming原理
Streaming的原理是用Java實(shí)現(xiàn)一個(gè)包裝用戶程序的MapReduce程序,該程序負(fù)責(zé)調(diào)用MapReduce Java接口獲取key/value對(duì)輸入,創(chuàng)建一個(gè)新的進(jìn)程啟動(dòng)包裝的用戶程序,將數(shù)據(jù)通過(guò)管道傳遞給包裝的用戶程序處理,然后調(diào)用MapReduce Java接口將用戶程序的輸出切分成key/value對(duì)輸出。
mapper和reducer會(huì)從標(biāo)準(zhǔn)輸入中讀取用戶數(shù)據(jù),一行一行處理后發(fā)送給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)建MapReduce作業(yè),發(fā)送給各個(gè)tasktracker,同時(shí)監(jiān)控整個(gè)作業(yè)的執(zhí)行過(guò)程。
如果一個(gè)文件(可執(zhí)行或者腳本)作為mapper,mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把該文件作為一個(gè)單獨(dú)進(jìn)程啟動(dòng),mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。 同時(shí),mapper收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key/value對(duì),作為mapper的輸出。 默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。如果沒(méi)有tab,整行作為key值,value值為null。
對(duì)于reducer,類似。
以上是Map/Reduce框架和streaming mapper/reducer之間的基本通信協(xié)議。 - Streaming優(yōu)點(diǎn)
發(fā)效率高,便于移植。只要按照標(biāo)準(zhǔn)輸入輸出格式進(jìn)行編程,就可以滿足hadoop要求。因此單機(jī)程序稍加改動(dòng)就可以在集群上進(jìn)行使用。 同樣便于測(cè)試-只要按照 cat input | mapper | sort | reducer > output 進(jìn)行單機(jī)測(cè)試即可。如果單機(jī)測(cè)試通過(guò),大多數(shù)情況是可以在集群上成功運(yùn)行的,只要控制好內(nèi)存就好了。
提高程序效率-有些程序?qū)?nèi)存要求較高,如果用java控制內(nèi)存畢竟不如C/C++。
Streaming不足
1.Streaming中的mapper和reducer默認(rèn)只能向標(biāo)準(zhǔn)輸出寫(xiě)數(shù)據(jù),不能方便地處理多路輸出Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫(xiě)的mapper程序,可以是可執(zhí)行文件或者腳本
(4)-reducer:用戶自己寫(xiě)的reducer程序,可以是可執(zhí)行文件或者腳本
(5)-file:打包文件到提交的作業(yè)中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實(shí)現(xiàn))
(8)-D:作業(yè)的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數(shù)目
2)mapred.reduce.tasks:reduce task數(shù)目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數(shù)
據(jù)的分隔符,默認(rèn)均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數(shù)目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數(shù)目
另外,Hadoop本身還自帶一些好用的Mapper和Reducer:
(1) Hadoop聚集功能
Aggregate提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對(duì)產(chǎn)生“可聚合項(xiàng)”。Combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的選?。愃朴赨nix中的‘cut’)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut”工具。工具類中的map函數(shù)把輸入的key/value對(duì)看作字段的列表。 用戶可以指定字段的分隔符(默認(rèn)是tab),可以選擇字段列表中任意一段(由列表中一個(gè)或多個(gè)字段組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數(shù)也把輸入的key/value對(duì)看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。
二、shell腳本實(shí)現(xiàn)實(shí)例
- 首先準(zhǔn)備測(cè)試文件
用下面的腳本生成NG大小的test.txt
#! /bin/sh
while [ "1" == "1" ]
do
echo "noe two three" >> test.txt
done
- 將測(cè)試文件上傳到HDFS文件系統(tǒng)
[hadoop@master ~]$ hdfs dfs -mkdir /test
[hadoop@master ~]$ hdfs dfs -put test.txt /test/
[hadoop@master ~]$ hdfs dfs -ls /test
Found 1 items
-rw-r--r-- 2 hadoop supergroup 4880841000 2017-03-20 10:45 /test/test.txt
- mapper腳本代碼
#! /bin/bash
while read LINE; do
for word in $LINE
do
#-e使得\t轉(zhuǎn)義(escape)為tab
echo -e "$word\t1"
done
done
- reducer腳本代碼
#! /bin/sh
count=0
started=0
word=""
while read LINE;do
newword=`echo $LINE | cut -d ' ' -f 1`
if [ "$word" != "$newword" ];then
[ $started -ne 0 ] && echo -e "$word\t$count"
word=$newword
count=1
started=1
else
count=$(( $count + 1 ))
fi
done
echo -e "$word\t$count"
- 執(zhí)行任務(wù)
hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input test -output output2 -mapper mapper.sh -reducer reducer.sh -file mapper.sh -file reducer.sh
- 可以在slave節(jié)點(diǎn)上看到相關(guān)的進(jìn)程,這些mapper進(jìn)程在master節(jié)點(diǎn)上是不存在的
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7822 hadoop 20 0 103m 1628 1076 R 100.0 0.0 2:53.34 mapper.sh
7819 hadoop 20 0 103m 1632 1076 R 100.0 0.0 2:53.33 mapper.sh
7836 hadoop 20 0 103m 1636 1076 R 100.0 0.0 2:53.21 mapper.sh
7833 hadoop 20 0 103m 1628 1076 R 100.0 0.0 2:53.23 mapper.sh
29993 root 20 0 164g 346m 10m R 100.2 0.3 113:27.07 gpu_executor
7653 hadoop 20 0 912m 224m 18m S 81.4 0.2 2:34.85 java
7650 hadoop 20 0 899m 229m 18m S 80.4 0.2 2:33.40 java
7651 hadoop 20 0 922m 235m 18m S 79.4 0.2 2:32.60 java
7652 hadoop 20 0 915m 237m 18m S 79.1 0.2 2:31.61 java
28961 root 20 0 564m 9m 6252 S 18.8 0.0 23:43.42 TaskTracker
31606 hadoop 20 0 1838m 338m 18m S 4.3 0.3 0:25.58 java
6102 liuhao 20 0 931m 28m 18m S 1.6 0.0 132:25.21 knotify4
- map完成后,reduce階段會(huì)看到reduce進(jìn)程
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
29993 root 20 0 164g 346m 10m R 97.7 0.3 222:36.66 gpu_executor
28961 root 20 0 564m 9m 6252 S 19.7 0.0 46:10.45 TaskTracker
17016 hadoop 20 0 103m 1664 1100 S 11.8 0.0 0:33.40 reducer.sh
- 相關(guān)問(wèn)題
- 首先遇到“找不到或無(wú)法加載類hadoop-streaming-2.7.3.jar”的問(wèn)題,后來(lái)發(fā)現(xiàn)是因?yàn)槊顏G了“jar”這個(gè)參數(shù)
- 然后又遇到j(luò)ob發(fā)不下去,一直卡在
INFO mapreduce.Job: Running job: job_1489999749396_0004
后來(lái)將slaves節(jié)點(diǎn)的hostname也修正為IP映射表內(nèi)對(duì)應(yīng)的名字,解決?
- 運(yùn)行階段卡在reduce
17/03/20 19:50:29 INFO mapreduce.Job: map 74% reduce 0%
17/03/20 19:50:30 INFO mapreduce.Job: map 77% reduce 0%
17/03/20 19:50:33 INFO mapreduce.Job: map 84% reduce 0%
17/03/20 19:50:36 INFO mapreduce.Job: map 91% reduce 0%
17/03/20 19:50:39 INFO mapreduce.Job: map 97% reduce 0%
17/03/20 19:50:40 INFO mapreduce.Job: map 98% reduce 0%
17/03/20 19:50:41 INFO mapreduce.Job: map 100% reduce 0%
17/03/20 19:50:50 INFO mapreduce.Job: map 100% reduce 67%
根據(jù)一位外國(guó)友人的說(shuō)明,在reduce階段 ,0-33%階段是 shuffle 階段,就是根據(jù)鍵值 來(lái)講本條記錄發(fā)送到指定的reduce,這個(gè)階段應(yīng)該是在map還沒(méi)有完全完成的時(shí)候就已經(jīng)開(kāi)始了,因?yàn)槲覀儠?huì)看到map在執(zhí)行到一個(gè)百分比后reduce也啟動(dòng)了,這樣做也提高了程序的執(zhí)行效率。
34%-65%階段是sort階段,就是reduce根據(jù)收到的鍵值進(jìn)行排序。map階段也會(huì)發(fā)生排序,map的輸出結(jié)果是以鍵值為順序排序后輸出,可以通過(guò)只有map階段處理的輸出來(lái)驗(yàn)證(以前自己驗(yàn)證過(guò),貌似確有這么回事,大家自己再驗(yàn)證下,免得我誤人子弟?。?br>
66%-100%階段是處理階段,這個(gè)階段才是真正的處理階段,如果程序卡在這里,估計(jì)就是你的reduce程序有問(wèn)題了。
索性等了一晚上,第二天終于有動(dòng)靜了
17/03/21 07:01:53 INFO mapreduce.Job: map 100% reduce 77%
和上面的記錄對(duì)比發(fā)現(xiàn),從%67到%77用了11個(gè)小時(shí)!這明顯是reduce程序效率太慢了。也可能是數(shù)據(jù)傾斜問(wèn)題。中間也試過(guò)增加reducer的數(shù)量,但無(wú)果。最終我索性減少了輸入文件的行數(shù),使其只有三行:
one two three
one two three
one two three
然后重新運(yùn)行程序,瞬間得到了結(jié)果:
[hadoop@master ~]$ hdfs dfs -cat output2/part-00002
one\t3
three\t3
two\t3
可見(jiàn),結(jié)果是正確的。
Python版本
- mapper腳本
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
- reducer腳本
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
- 單機(jī)測(cè)試
[hadoop@master ~]$ echo "one two three" | ./mapper.py | sort | ./reducer.py
one 1
three 1
two 1
- 運(yùn)行
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output3 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
令人詫異的是很快就執(zhí)行完了,難道真的是shell腳本不適合做類似統(tǒng)計(jì)這樣的事情嗎?
[hadoop@master ~]$ hdfs dfs -ls output3
Found 4 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:12 output3/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 14 2017-03-22 10:12 output3/part-00000
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:12 output3/part-00001
-rw-r--r-- 2 hadoop supergroup 24 2017-03-22 10:12 output3/part-00002
[hadoop@master ~]$ hdfs dfs -cat output3/part-00002
noe 1166202
two 1166202
[hadoop@master ~]$ hdfs dfs -cat output3/part-00000
three 1166202
[hadoop@master ~]$