yarn
- Yet Another Resource Negotiator, 另一種資源協(xié)調(diào)者
- 通用資源管理系統(tǒng)
- 為上層應(yīng)用提供統(tǒng)一的資源管理和調(diào)度,為集群在利用率、資源統(tǒng)一管理和數(shù)據(jù)共享等方面帶來了巨大好處
YARN的架構(gòu)和執(zhí)行流程
ResourceManager: RM 資源管理器
1、整個集群同一時間提供服務(wù)的RM只有一個,負(fù)責(zé)集群資源的統(tǒng)一管理和調(diào)度, 2、處理客戶端的請求: submit, kill監(jiān)控我們的NM,一旦某個NM掛了,那么該NM上運行的任務(wù)需要告訴我們的AM來如何進行處理
NodeManager: NM 節(jié)點管理器
整個集群中有多個,負(fù)責(zé)自己本身節(jié)點資源管理和使用,定時向RM匯報本節(jié)點的資源使用情況,接收并處理來自RM的各種命令:啟動Container處理來自AM的命令
ApplicationMaster: AM
每個應(yīng)用程序?qū)?yīng)一個:MR、Spark,負(fù)責(zé)應(yīng)用程序的管理
為應(yīng)用程序向RM申請資源(core、memory),分配給內(nèi)部task, 需要與NM通信:啟動/停止task,task是運行在container里面,AM也是運行在container里面
- Container 容器: 封裝了CPU、Memory等資源的一個容器,是一個任務(wù)運行環(huán)境的抽象, Client: 提交作業(yè) 查詢作業(yè)的運行進度,殺死作業(yè)
啟動YARN相關(guān)的進程
sbin目錄下
./ start-yarn.sh
驗證
[hadoop@hadoop000 sbin]$ jps
13000 ResourceManager
13199 Jps
13104 NodeManager
停止
./ stop-yarn.sh
yarn提交MapReduce作業(yè)演示,我們接下來使用hadoop進行π值的計算
在/home/hadoop/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce找到示例jar包hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar
執(zhí)行以下命令進行π值計算
[hadoop@hadoop000 mapreduce]$ hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
2 是執(zhí)行任務(wù)次數(shù) 3是執(zhí)行每次任務(wù)投遞次數(shù)
可視化yarn和MapReduce
http://10.25.187.18:8088/cluster
分布式處理框架 MapReduce
什么是MapReduce
- 源于Google的MapReduce論文(2004年12月)
- Hadoop的MapReduce是Google論文的開源實現(xiàn)
- MapReduce優(yōu)點: 海量數(shù)據(jù)離線處理&易開發(fā)
- MapReduce缺點: 實時流式計算
MapReduce編程模型
MapReduce分而治之的思想
- 數(shù)錢實例:一堆鈔票,各種面值分別是多少
- 單點策略
- 一個人數(shù)所有的鈔票,數(shù)出各種面值有多少張
- 分治策略
- 每個人分得一堆鈔票,數(shù)出各種面值有多少張
- 匯總,每個人負(fù)責(zé)統(tǒng)計一種面值
- 解決數(shù)據(jù)可以切割進行計算的應(yīng)用
- 單點策略
- MapReduce編程分Map和Reduce階段
- 將作業(yè)拆分成Map階段和Reduce階段
- Map階段 Map Tasks 分:把復(fù)雜的問題分解為若干"簡單的任務(wù)"
- Reduce階段: Reduce Tasks 合:reduce
- MapReduce編程執(zhí)行步驟
- 準(zhǔn)備MapReduce的輸入數(shù)據(jù)
- 準(zhǔn)備Mapper數(shù)據(jù)
- Shuffle
- Reduce處理
- 結(jié)果輸出
- 編程模型
- 借鑒函數(shù)式編程方式
- 用戶只需要實現(xiàn)兩個函數(shù)接口:
- Map(in_key,in_value)
--->(out_key,intermediate_value) list - Reduce(out_key,intermediate_value) list
--->out_value list
- Map(in_key,in_value)
-
Word Count 詞頻統(tǒng)計案例
Java實現(xiàn)WordCount
編寫WordCountApp
package com.neusoft.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileInputStream;
import java.io.IOException;
/**
* @author Eric Lee
* @date 2020/10/28 11:16
* 使用Mapreduce 開發(fā) WordCountApp
*/
public class WordCountApp {
/**
* Map階段: 讀取輸入文件
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接受每一行數(shù)據(jù)
String line = value.toString();
// 按照指定分隔符進行拆分
String[] words = line.split(" ");
// 遍歷
for(String word: words){
// 通過上下文把map的處理結(jié)果進行輸出
context.write(new Text(word), one);
}
}
}
/**
* Reduce階段: 歸并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value: values){
// 求出key出現(xiàn)的次數(shù)總和
sum += value.get();
}
// 最終的統(tǒng)計結(jié)果進行輸出
context.write(key, new LongWritable(sum));
}
}
/**
* 定義main函數(shù) 使用Driver封裝Maoreduce作業(yè)的所有信息
*/
public static void main(String[] args) throws Exception {
// 創(chuàng)建Configuration對象
Configuration configuration = new Configuration();
// 創(chuàng)建Job
Job job = Job.getInstance(configuration, "wordcount");
// 設(shè)置job處理類
job.setJarByClass(WordCountApp.class);
// 設(shè)置作業(yè)的輸入路徑 args[0] 將命令行的第一個值傳給 Path
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 設(shè)置 map 相關(guān)參數(shù)
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 設(shè)置 reduce 相關(guān)參數(shù)
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 設(shè)置作業(yè)的輸出路徑 args[1] 將命令行的第二個值傳給 Path
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
打包程序
- mvn打包命令,需要設(shè)置mvn環(huán)境變量
mvn clean package -DskipTests
-
2.使用idea的mave插件
上傳jar包
使用 maven進行打包 hadoop_java_op_hdfs-1.0-SNAPSHOT.jar 上傳到 用戶目錄的lib文件夾下進行命令操作
接下來在命令行下進行操作(類似計算π值)
hadoop jar 主函數(shù)全限定名 輸入 輸出
hadoop jar shenyangbig_data_hadoop-1.0-SNAPSHOT.jar com.neusoft.hadoop.mapreduce.WordCountApp 輸入 輸出
注意
相同的腳本和代碼在這里執(zhí)行會報錯;
- 通過shell方式將輸出文件夾刪除
hadoop fs -rm -r /output/wc
參考命令
hadoop jar shenyangbig_data_hadoop-1.0-SNAPSHOT.jar com.neusoft.hadoop.mapreduce.WordCountApp hdfs://hadoop000:8020/hello.txt hdfs://hadoop000:8020/output/wc
打開http://192.168.1.109:8088/cluster

查看詞頻統(tǒng)計結(jié)果


