Hadoop是個高效的工具
介紹了Hadoop的由來和組成,當(dāng)然提供給你用來分析的數(shù)據(jù),以及最快的方式。更重要的是描述第一個Hadoop工程的詳細(xì)編寫過程。
1.認(rèn)識Hadoop
1.1解決高速增長的存儲空間和讀取速度不匹配的問題
引入了多個磁盤同時(shí)讀取數(shù)據(jù)的技術(shù)。但為實(shí)現(xiàn)這個技術(shù),又有兩個問題需要解決:
- 硬盤故障問題
- 各種分布式系統(tǒng)允許結(jié)合不同來源的數(shù)據(jù)進(jìn)行分析,很難保證其正確性。
而在Hadoop中對這兩個問題都做到處理和解決。對于第一個問題,常用的做法是保存數(shù)據(jù)副本(replica),Hadoop文件系統(tǒng)(HDFS, Hadoop Distributed FileSystem)的使用原理類似,略有不同。第二個問題Hadoop中引入了MapReduce模型,模型抽象出了硬盤讀寫問題并將其轉(zhuǎn)換為對一個數(shù)據(jù)集的計(jì)算,同時(shí)也具備較高的可靠性。
MapReduce 是一種線性的可伸縮編程模型。使用者要寫兩個函數(shù),分別是Map函數(shù)和Reduce函數(shù),每個函數(shù)定義從一個鍵值對集合到另一個鍵值對集合到映射。性能方面,MapReduce盡量在計(jì)算節(jié)點(diǎn)上存儲數(shù)據(jù),以實(shí)現(xiàn)數(shù)據(jù)的本地快速訪問,數(shù)據(jù)本地化是MapReduce的核心特征,從而獲得更好的性能。另外有多種基于MapReduce的高級查詢語言(Pig和Hive)供使用。穩(wěn)定性上,MapReduce采用無共享(shared-nothing)框架,實(shí)現(xiàn)了失敗檢測,所有使用者無需擔(dān)心系統(tǒng)的部分失效問題。
1.2.氣象數(shù)據(jù)下載
書中的數(shù)據(jù)分析實(shí)例使用的是ncdc的氣象數(shù)據(jù),在手動編寫程序之前,首先要準(zhǔn)備好這些數(shù)據(jù)。最開始找到了ncdc的ftp站點(diǎn)ftp://ftp.ncdc.noaa.gov/pub/data/,下載經(jīng)常性的出現(xiàn)斷線,下載速度異常緩慢。所以不得不重新搜索新的源,最終找到了https://www1.ncdc.noaa.gov/pub/data/noaa這個地址,但是在下載時(shí)卻不像ftp可以批量下載。
而只能通過腳本去抓去數(shù)據(jù)。這個腳本實(shí)現(xiàn)的功能是,按年份批量下載對應(yīng)地址的壓縮包,并將這些數(shù)據(jù)按年份保存。值得一說的是這個shell腳本使用了并行下載方式,節(jié)省了大量的時(shí)間。
#! /bin/bash
for i in {1901..2019}
do {
mkdir -p /Users/macos/noaaData/$i
wget --execute robots=off -r -np -nH --cut-dirs=4 -R index.html* https://www1.ncdc.noaa.gov/pub/data/noaa/$i/ -P /Users/macos/noaaData/$i
}&
done
2.第一個Hadoop工程
2.1 安裝并運(yùn)行Hadoop
下載最新2.8.1版本
具體安裝方式和配置過程參考官方文檔 http://hadoop.apache.org/docs/current/
進(jìn)入hadoop-x.x.x/sbin目錄下運(yùn)行star-all腳本(中間需要輸入root密碼)
啟動成功驗(yàn)證:
打開瀏覽器:
http://192.168.8.88:50070 (hdfs管理界面)顯示active活躍狀態(tài)
http://192.168.8.88:8088 (yarn管理界面)
以上兩個地址正常顯示,則說明啟動成功。
2.2 Hadoop程序編寫
MapReduce任務(wù)過程分為兩個處理階段:
- map階段
- reduce階段
每個階段都以鍵值對作為輸入和輸出,類型可供選擇。兩個處理階段需要分別編寫相應(yīng)的函數(shù)方法,并加上運(yùn)行作業(yè)的代碼。
新建Maven項(xiàng)目
- 在pom.xml文件中增加以下依賴關(guān)系
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
- 編寫Map函數(shù)、Reduce函數(shù)和調(diào)用執(zhí)行代碼
Map函數(shù)
完成功能:在天氣數(shù)據(jù)中截取溫度數(shù)據(jù)。并寫入到contex中為Reduce方法準(zhǔn)備好數(shù)據(jù)。
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// ^^ MaxTemperatureMapper
Reduce函數(shù)
完成功能:根據(jù)Map函數(shù)傳遞來的數(shù)據(jù)計(jì)算最大值,并輸出年份和最高溫度的鍵值對。
// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducer
main方法:
完成功能:創(chuàng)建運(yùn)行Job,傳遞數(shù)據(jù)目錄并設(shè)置Map和Reduce對應(yīng)class;同時(shí)設(shè)置輸出鍵值對格式。
// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature
- 設(shè)置數(shù)據(jù)輸入輸出目錄
在Run configurations中Program arguments輸入框中,設(shè)置數(shù)據(jù)目錄和輸出目錄的絕對路徑。
運(yùn)行會在輸出目錄下生成兩個文件:
_SUCCESS
part-r-00000
第二個文件為我們需要的運(yùn)行結(jié)果如下:
1948 342
1949 311
...
到此我們對Hadoop工程有了一個初步認(rèn)識,并成功運(yùn)行了我們的第一個項(xiàng)目。好了,這篇分享就到這了,感興趣可以持續(xù)關(guān)注博客更新哦??