04_hadoop_讀取hdfs在本地統(tǒng)計(jì)單詞并將結(jié)果放回hdfs

1 文件目錄

image.png

2 wordConfig.properties

配置文件

CLASS_BUSINESS=com.looc.D04HDFS單詞計(jì)數(shù).WordCountRealize
HDFS_URL=hdfs://vm01:9000/
HDFS_USER=root
OUT_PUT_SRC=/wordCount/result/
RESOURCE_SRC=/wordCount/resource/

3 WordCount

package com.looc.D04HDFS單詞計(jì)數(shù);

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName WordCount.java
 * @Description TODO
 * @createTime 
 */
public class WordCount {
    private String CLASS_BUSINESS;
    private String HDFS_URL;
    private String HDFS_USER;
    private String OUT_PUT_SRC;
    private String RESOURCE_SRC;

    private String PROS_CONFIG="wordConfig.properties";

    private FileSystem fileSystem;
    private WordCountInterface wordCountInt;

    public static void main(String[] args) throws IllegalAccessException, InterruptedException,
            IOException, InstantiationException, URISyntaxException, ClassNotFoundException {
        WordCount wordCount = new WordCount();
        wordCount.doMain();

    }
    public void doMain() throws IllegalAccessException, InterruptedException, IOException,
            InstantiationException, URISyntaxException, ClassNotFoundException {
        init();
        doIt();
        readerResult();
        finallys();
    }
    /**
     * 初始化
     * 拿到配置文件
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:07
     * @Param []
     * @return void
     **/
    public void init() throws ClassNotFoundException, IOException, URISyntaxException,
            InterruptedException, IllegalAccessException, InstantiationException {
        //讀取配置文件
        Properties pros = new Properties();
        pros.load(new InputStreamReader(
                WordCount.class.getClassLoader().getResourceAsStream(PROS_CONFIG),"GBK"));

        CLASS_BUSINESS = pros.getProperty("CLASS_BUSINESS");
        HDFS_URL = pros.getProperty("HDFS_URL");
        HDFS_USER = pros.getProperty("HDFS_USER");
        OUT_PUT_SRC = pros.getProperty("OUT_PUT_SRC");
        RESOURCE_SRC = pros.getProperty("RESOURCE_SRC");

        //加載業(yè)務(wù)邏輯類
        Class<?> tClass = Class.forName(CLASS_BUSINESS);
        wordCountInt = (WordCountRealize) tClass.newInstance();

        //拿到HDFS鏈接
        fileSystem = FileSystem.get(new URI(HDFS_URL), new Configuration(), HDFS_USER);
    }

    /**
     * 執(zhí)行
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:08
     * @Param []
     * @return void
     **/
    public void doIt() throws IOException {
        //拿到全部的文件
        FileStatus[] fileStatuses = fileSystem.listStatus(new Path(RESOURCE_SRC));

        //記錄信息
        Context context = new Context();

        for (FileStatus fileStatus : fileStatuses) {
            FSDataInputStream fsDataInputStream = fileSystem.open(fileStatus.getPath());
            BufferedReader bfr = new BufferedReader(new InputStreamReader(fsDataInputStream));

            String temp = null;
            while ((temp=bfr.readLine())!=null){
                //執(zhí)行業(yè)務(wù)邏輯
                wordCountInt.wordCountBusiness(temp, context);
            }

            bfr.close();
            fsDataInputStream.close();
        }

        //輸出結(jié)果
        HashMap<Object, Object> map = context.getMap();
        Set<Map.Entry<Object, Object>> entries = map.entrySet();
        StringBuffer stringBuffer = new StringBuffer();

        for (Map.Entry<Object, Object> entry : entries) {
            stringBuffer.append(entry.getKey()+"====="+entry.getValue()+"\n");
        }

        //寫入文件
        FSDataOutputStream fsDataOutputStream = fileSystem.create(
                new Path(OUT_PUT_SRC+"wordCount.txt"), true);
        
        fsDataOutputStream.write(stringBuffer.toString().getBytes());
        fsDataOutputStream.close();
    }

    /**
     * 讀取結(jié)果
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:59
     * @Param []
     * @return void
     **/
    public void readerResult() throws IOException {
        FSDataInputStream open = fileSystem.open(new Path(OUT_PUT_SRC+"wordCount.txt"));
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        String temp = null;
        while ((temp = bufferedReader.readLine())!=null){
            System.out.println(temp);
        }
    }
    /**
     *
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:08
     * @Param []
     * @return void
     **/
    public void finallys() throws IOException {
        fileSystem.close();
        System.out.println("HDFS流正常關(guān)閉");
    }
}

4 WordCountInterface

package com.looc.D04HDFS單詞計(jì)數(shù);

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName WordCountInterface.java
 * @Description TODO
 * @createTime 
 */
public interface WordCountInterface {
    /**
     * 業(yè)務(wù)接口
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:27
     * @Param [str, context]
     * @Param str
     * @Param context
     * @return void
     **/
    void wordCountBusiness(String str,Context context);
}

5 WordCountRealize

package com.looc.D04HDFS單詞計(jì)數(shù);

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName WordCountRealize.java
 * @Description TODO
 * @createTime 
 */
public class WordCountRealize implements WordCountInterface{

    /**
     * 業(yè)務(wù)接口
     * @param str
     * @param context
     * @return void
     * @Author chenpeng
     * @Description //TODO
     * @Date 18:29
     * @Param [str, context]
     */
    @Override
    public void wordCountBusiness(String str, Context context) {
        String[] wordArray = str.toLowerCase().split(" ");
        for (String word : wordArray) {
            Object number = context.getValue(word);
            if (number!=null){
                int nub = (int)number + 1;
                context.add(word, nub);
                continue;
            }
            context.add(word, 1);
        }
    }
}

6 Context

package com.looc.D04HDFS單詞計(jì)數(shù);

import org.junit.Test;

import java.util.HashMap;

/**
 * 一個(gè)上下文對(duì)象
 * @author chenPeng
 * @version 1.0.0
 * @ClassName Context.java
 * @Description TODO
 * @createTime 
 */
public class Context {
    private HashMap<Object,Object> hashMap = new HashMap<>();

    /**
     * 寫入值
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:25
     * @Param [obj, obj]
     * @return void
     **/
    public void add(Object objK,Object objV){
        hashMap.put(objK, objV);
    }

    /**
     * 取值
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:25
     * @Param [obj]
     * @return java.lang.Object
     **/
    public Object getValue(Object obj){
        return hashMap.get(obj);
    }


    /**
     * 獲取map對(duì)象
     * @Author chenpeng
     * @Description //TODO
     * @Date 19:26
     * @Param []
     * @return java.util.HashMap<java.lang.Object,java.lang.Object>
     **/
    public HashMap<Object,Object> getMap(){
            return hashMap;
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容