1:使用命令將文本上傳到HDFS中
1.1 創(chuàng)建文件夾
hdfs dfs -mkdir -p? input
1.2 將*.txt 放到input目錄
hdfs dfs -put *.txt input
1.3查看input目錄中的文件
hdfs dfs -ls input
結(jié)果:
Found 1 items
-rw-r--r--? 1 hegh supergroup? ? ? ? 53 2020-11-30 22:42 input/text.txt
1.4 Hadoop Streaming使用:
hadoop streaming的工作方式如下圖(在這里我們只談跟hadoop streaming相關(guān)的部分,至于MapReduce的細(xì)節(jié)不予贅述)。與標(biāo)準(zhǔn)的MapReduce(以下簡(jiǎn)稱MR)一樣的是整個(gè)MR過(guò)程依然由mapper、[combiner]、reducer組成(其中combiner為可選加入)。用戶像使用java一樣去用其他語(yǔ)言編寫(xiě)MR,只不過(guò)Mapper/Reducer的輸入和輸出并不是和java API打交道,而是通過(guò)該語(yǔ)言下的標(biāo)準(zhǔn)輸入輸出函數(shù)來(lái)進(jìn)行。我在圖中尤其標(biāo)注了綠色的框框,是你應(yīng)該關(guān)注并自己編寫(xiě)的mapper和reducer的位置

mapper的角色:hadoop將用戶提交的mapper可執(zhí)行程序或腳本作為一個(gè)單獨(dú)的進(jìn)程加載起來(lái),這個(gè)進(jìn)程我們稱之為mapper進(jìn)程,hadoop不斷地將文件片段轉(zhuǎn)換為行,傳遞到我們的mapper進(jìn)程中,mapper進(jìn)程通過(guò)標(biāo)準(zhǔn)輸入的方式一行一行地獲取這些數(shù)據(jù),然后設(shè)法將其轉(zhuǎn)換為鍵值對(duì),再通過(guò)標(biāo)準(zhǔn)輸出的形式將這些鍵值對(duì)按照一對(duì)兒一行的方式輸出出去。
雖然在我們的mapper函數(shù)中,我們自己能分得清key/value(比方說(shuō)有可能在我們的代碼中使用的是string key,int value),但是當(dāng)我們采用標(biāo)準(zhǔn)輸出之后,key value是打印到一行作為結(jié)果輸出的(比如sys.stdout.write("%s\t%s\n"%(birthyear,gender))),因此我們?yōu)榱吮WChadoop能從中鑒別出我們的鍵值對(duì),鍵值對(duì)中一定要以分隔符'\t'即Tab(也可自定義分隔符)字符分隔,這樣才能保證hadoop正確地為我們進(jìn)行partitoner、shuffle等等過(guò)程。
reducer的角色:hadoop將用戶提交的reducer可執(zhí)行程序或腳本同樣作為一個(gè)單獨(dú)的進(jìn)程加載起來(lái),這個(gè)進(jìn)程我們稱之為reducer進(jìn)程,hadoop不斷地將鍵值對(duì)(按鍵排序)按照一對(duì)兒一行的方式傳遞到reducer進(jìn)程中,reducer進(jìn)程同樣通過(guò)標(biāo)準(zhǔn)輸入的方式按行獲取這些鍵值對(duì)兒,進(jìn)行自定義計(jì)算后將結(jié)果通過(guò)標(biāo)準(zhǔn)輸出的形式輸出出去。
在reducer這個(gè)過(guò)程中需要注意的是:傳遞進(jìn)reducer的鍵值對(duì)是按照鍵排過(guò)序的,這點(diǎn)是由MR框架的sort過(guò)程保證的,因此如果讀到一個(gè)鍵與前一個(gè)鍵不同,我們就可以知道當(dāng)前key對(duì)應(yīng)的pairs已經(jīng)結(jié)束了,接下來(lái)將是新的key對(duì)應(yīng)的pairs
mapper.py:
import sys
for linein sys.stdin:
????line = line.strip()
????words = line.split()
????for wordin words:
????????print("%s\t%s" % (word, 1))

reducer.py:
import sys
countMap = {}
for linein sys.stdin:
????line = line.strip()
????word, count = line.split('\t')
????try:
????????count =int(count)
????except ValueError:#count如果不是數(shù)字的話,直接忽略掉
? ? ? ? ????continue
? ? if wordnot in countMap:
????????countMap[word] = count
????else:
????????countMap[word] = countMap[word] + count
????for keyin countMap:
????????print("%s\t%s" % (key, countMap[key]))

Hadoop Streaming的使用方式
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/.../hadoop-streaming.jar [genericOptions] [streamingOptions]