Hadoop學(xué)習(xí)筆記

參考:http://www.cnblogs.com/heavenwang/p/3988033.html

1. 基本概念

Hadoop 是Apache基金會下一個開源的分布式計算平臺,它以分布式文件系統(tǒng)HDFS和MapReduce算法為核心,為用戶提供了系統(tǒng)底層細(xì)節(jié)透明的分布式基礎(chǔ)架構(gòu)。
如下圖Hadoop集群中有很多并行的機器來存儲和分析數(shù)據(jù),客戶端把任務(wù)提交到集群,集群計算返回結(jié)果。

Hadoop強調(diào)把代碼向數(shù)據(jù)遷移,即Hadoop集群中既包含數(shù)據(jù)又包含運算環(huán)境,并且盡可能讓一段數(shù)據(jù)的計算發(fā)生在同一臺機器上,代碼比數(shù)據(jù)更加容易移動,Hadoop的設(shè)計理念即是把要執(zhí)行的計算代碼移動到數(shù)據(jù)所在的機器上去。


  • HDFS是一種分布式文件系統(tǒng),數(shù)據(jù)被保存在計算機集群上,HDFS為HBase等工具提供了基礎(chǔ)。
  • MapReduce,它是一個分布式、并行處理的編程模型,MapReduce把任務(wù)分為map(映射)階段和reduce(化簡)。實現(xiàn)并行。
  • Hive類似于SQL高級語言,用于運行存儲在Hadoop上的查詢語句。





2. Hadoop與SQL數(shù)據(jù)庫

從總體上看,現(xiàn)在大多數(shù)數(shù)據(jù)應(yīng)用處理的主力是關(guān)系型數(shù)據(jù)庫,即SQL面向的是結(jié)構(gòu)化的數(shù)據(jù),而Hadoop則針對的是非結(jié)構(gòu)化的數(shù)據(jù),從這一角度看,Hadoop提供了對數(shù)據(jù)處理的一種更為通用的方式。

  1. 用scale-out代替scale-up
    拓展商用服務(wù)器的代價是非常昂貴的。要運行一個更大的數(shù)據(jù)庫,就要一個更大的服務(wù)器,事實上,各服務(wù)器廠商往往會把其昂貴的高端機標(biāo)稱為“數(shù)據(jù)庫級服務(wù)器”,不過有時候有可能需要處理更大的數(shù)據(jù)集,但卻找不到更大的機器,而更為重要的是,高端機對于許多應(yīng)用并不經(jīng)濟。

  2. 用鍵值對代替關(guān)系表
    關(guān)系型數(shù)據(jù)庫需要將數(shù)據(jù)按照某種模式存放到具有關(guān)系型數(shù)據(jù)結(jié)構(gòu)表中,但是許多當(dāng)前的數(shù)據(jù)模型并不能很好的適應(yīng)這些模型,如文本、圖片、xml等,此外,大型數(shù)據(jù)集往往是非結(jié)構(gòu)化或半結(jié)構(gòu)化的。而Hadoop以鍵值對作為最基本的數(shù)據(jù)單元,能夠靈活的處理較少結(jié)構(gòu)化的數(shù)據(jù)類

  3. 用函數(shù)式編程(MapReduce)代替聲明式查詢(SQL)
    SQL從根本上說是一個高級聲明式語言,它的手段是聲明你想要的結(jié)果,并讓數(shù)據(jù)庫引擎判斷如何獲取數(shù)據(jù)。而在MapReduce程序中,實際的數(shù)據(jù)處理步驟是由你指定的。SQL使用查詢語句,而MapReduce使用程序和腳本。MapReduce還可以建立復(fù)雜的數(shù)據(jù)統(tǒng)計模型,或者改變圖像數(shù)據(jù)的處理格式。

  4. 用離線批量處理代替在線處理
    Hadoop并不適合處理那種對幾條記錄讀寫的在線事務(wù)處理模式,而適合一次寫入多次讀取的數(shù)據(jù)需求。




3. HDFS Hadoop Distributed File System

特點:高訪問量,高容錯性,線性拓展

HDFS采用了主從(Master/Slave)結(jié)構(gòu)模型,一個HDFS集群是由一個NameNode和若干個DataNode組成的。其中NameNode作為主服務(wù)器,管理文件系統(tǒng)的命名空間和客戶端對文件的訪問操作;集群中的DataNode管理存儲的數(shù)據(jù)。NameNode執(zhí)行文件系統(tǒng)的命名操作,比如打開、關(guān)閉、重命名文件或目錄等,它也負(fù)責(zé)數(shù)據(jù)塊到具體DataNode的映射。DataNode負(fù)責(zé)處理文件系統(tǒng)客戶端的文件讀寫請求,并在NameNode的同意調(diào)度下進行數(shù)據(jù)塊的創(chuàng)建、刪除和復(fù)制工作


HDFS是Master和Slave的結(jié)構(gòu)

結(jié)構(gòu):

  • NameNode:master節(jié)點,管理HDFS的名稱空間和數(shù)據(jù)塊映射信息、配置副本策略和處理客戶端請求
  • Secondary NameNode: 輔助master節(jié)點,備份namespace
  • DataNod: slave節(jié)點,實際存儲數(shù)據(jù)、執(zhí)行數(shù)據(jù)塊的讀寫并匯報存儲信息給NameNode



操作:

  • hadoop fs: 實際存儲數(shù)據(jù)、執(zhí)行數(shù)據(jù)塊的讀寫并匯報存儲信息給NameNode;
hadoop fs -ls / 
hadoop fs -lsr 
hadoop fs -mkdir /user/hadoop 
hadoop fs -put a.txt /user/hadoop/ 
hadoop fs -get /user/hadoop/a.txt / 
hadoop fs -cp src dst 
hadoop fs -mv src dst 
hadoop fs -cat /user/hadoop/a.txt 
hadoop fs -rm /user/hadoop/a.txt 
hadoop fs -rmr /user/hadoop/a.txt 
hadoop fs -text /user/hadoop/a.txt 
hadoop fs -copyFromLocal localsrc dst 與hadoop fs -put功能類似。 
hadoop fs -moveFromLocal localsrc dst
  • hadoop fsadmin
    dfsadmin是一個多任務(wù)的工具,我們可以使用它來獲取HDFS的狀態(tài)信息,以及在HDFS上執(zhí)行的一系列管理操作。

    -report:查看文件系統(tǒng)的基本信息和統(tǒng)計信息。
    -safeadmin enter | leave | get | wait:安全模式命令。安全模式是NameNode的一種狀態(tài),在這種狀態(tài)下,NameNode不接受對名字空間的更改(只讀);不復(fù)制或刪除塊。NameNode在啟動時自動進入安全模式,當(dāng)配置塊的最小百分?jǐn)?shù)滿足最小副本數(shù)的條件時,會自動離開安全模式。enter是進入,leave是離開。
    -refreshNodes:重新讀取hosts和exclude文件,使新的節(jié)點或需要退出集群的節(jié)點能夠被NameNode重新識別。這個命令在新增節(jié)點或注銷節(jié)點時用到。

  • hadoop fsck
    HDFS支持fsck命令用以檢查各種不一致。fsck用以報告各種文件問題,如block丟失或缺少block等。



4. MapReduce

是一個分布式計算框架,用來并行計算海量數(shù)據(jù)。

MapReduce 框架的核心步驟主要分兩部分:Map 和Reduce。

Hadoop的MapReduce模型是通過輸入key/value對進行運算得到輸出key/value對。其分為Map過程和Reduce過程。

  • Map主要的工作是接收一個key/value對,產(chǎn)生一個中間key/value對,之后MapReduce把集合中所有相同key值的value放在一起并傳遞給Reduce函數(shù)。
  • Reduce函數(shù)接收key和相關(guān)的value集合并合并這些value值,得到一個較小的value集合。

下圖是MapReduce的數(shù)據(jù)流圖,體現(xiàn)了MapReduce處理大數(shù)據(jù)集的過程。這個過程就是將大數(shù)據(jù)分解為成百上千個小數(shù)據(jù)集,每個(或若干個)數(shù)據(jù)集分別由集群中的一個節(jié)點進行處理并生成的中間結(jié)果,然后這些中間結(jié)果又由大量的節(jié)點合并,形成最終結(jié)果。




例子1:Hadoop Word Count

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}



例子2:下載氣象數(shù)據(jù),求每年的最低溫度
Min Temperature

 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinTemperature {

        public static void main(String[] args) throws Exception {
            if(args.length != 2) {
                System.err.println("Usage: MinTemperature<input path> <output path>");
                System.exit(-1);
            }

            Job job = new Job();
            job.setJarByClass(MinTemperature.class);
            job.setJobName("Min temperature");
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setMapperClass(MinTemperatureMapper.class);
            job.setReducerClass(MinTemperatureReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

MinTemperatureMapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

    public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

        private static final int MISSING = 9999;

        @Override 
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String year = line.substring(15, 19);

            int airTemperature;
            if(line.charAt(87) == '+') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }

            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && quality.matches("[01459]")) {
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    }

MinTemperatureReducer

  import java.io.IOException;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

    public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int minValue = Integer.MAX_VALUE;
            for(IntWritable value : values) {
                minValue = Math.min(minValue, value.get());
            }
            context.write(key, new IntWritable(minValue));
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容