03_一個(gè)基于java,hadoop的數(shù)據(jù)采集程序demo

1.文件分布

文件分布

2. 環(huán)境搭建

參考:02_在window環(huán)境開(kāi)發(fā)hadoop_HDFS

3 需要采集的目錄

image.png

4 說(shuō)明:

  1. 需要采集的文件在accesslog里面并且以.log結(jié)尾
  2. 采集的思路是
    1. 先從accesslog移動(dòng)進(jìn)temp文件夾
    2. 從temp向HDFS上傳
    3. 將temp里面的文件移動(dòng)進(jìn)backup文件夾
  3. 此demo是采用讀取配置文件來(lái)得到路徑以及配置信息
  4. 讀取配置文件采用單列模式的懶漢式并考慮了線程安全

5. code

5.1 config.properties

LOG_SOURCE_DIR=H:/logs/accesslog/
LOG_TEMP_DIR=H:/logs/temp/
LOG_BACKUP_DIR=H:/logs/backup/

LOG_TIMEOUT=24
LOG_NAME=.log

HDFS_URL=hdfs://vm01:9000/
HDFS_DIR=/logs/
HDFS_F_NAME=access_log_
HDFS_L_NAME=.log

5.2 PropertiesHolderLaze

package com.looc.D02數(shù)據(jù)采集demo;

import java.io.IOException;
import java.util.Properties;

/**
 * 單列模式:懶漢式——并考慮線程安全
 * @author chenPeng
 * @version 1.0.0
 * @ClassName PropertiesHolderLaze.java
 * @Description TODO
 * @createTime 2019年01月28日 19:47:00
 */
public class PropertiesHolderLaze {
    private static Properties pros = null;

    public static Properties getPros() throws IOException {
        if (pros==null){
            synchronized (PropertiesHolderLaze.class){
                if (pros==null){
                    pros = new Properties();
                    pros.load(PropertiesHolderLaze.class.getClassLoader().getResourceAsStream(
                            "config.properties"));
                }
            }
        }
        return pros;
    }
}

5.3 CollectionData

package com.looc.D02數(shù)據(jù)采集demo;


import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.TimerTask;
import java.util.UUID;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName CollectionData.java
 * @Description TODO
 * @createTime 2019年01月28日 19:18:00
 */
public class CollectionData extends TimerTask {

    private String LOG_SOURCE_DIR;
    private String LOG_TEMP_DIR;
    private String LOG_BACKUP_DIR;
    private String LOG_TIMEOUT;
    private String LOG_NAME;
    private String HDFS_URL;
    private String HDFS_DIR;
    private String HDFS_F_NAME;
    private String HDFS_L_NAME;
    private String TIME_MATE = "yyyy-MM-dd";


    /**
     * The action to be performed by this timer task.
     */
    @Override
    public void run() {
        //移動(dòng)數(shù)據(jù)到temp
        //temp上傳到hdfs
        //temp移動(dòng)到backup
        try {
            init();
            //移動(dòng)之前判斷是否有文件
            if (!isNullDir()){
                moveToTemp();
                setUpToHdfs();
                tempToBackup();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化得到配置文件
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:19
     * @Param []
     * @return void
     **/
    public void init() throws IOException {
        Properties pros = PropertiesHolderLaze.getPros();

        LOG_SOURCE_DIR = pros.getProperty("LOG_SOURCE_DIR");
        LOG_TEMP_DIR = pros.getProperty("LOG_TEMP_DIR");
        LOG_BACKUP_DIR = pros.getProperty("LOG_BACKUP_DIR");
        LOG_TIMEOUT = pros.getProperty("LOG_TIMEOUT");
        LOG_NAME = pros.getProperty("LOG_NAME");
        HDFS_URL = pros.getProperty("HDFS_URL");
        HDFS_DIR = pros.getProperty("HDFS_DIR");
        HDFS_F_NAME = pros.getProperty("HDFS_F_NAME");
        HDFS_L_NAME = pros.getProperty("HDFS_L_NAME");
    }


    /**
     * 判斷文件夾是否為空
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:19
     * @Param []
     * @return boolean
     **/
    public boolean isNullDir(){
        if (new File(LOG_SOURCE_DIR).list().length==0){
            return true;
        }
        return false;
    }
    /**
     * 移動(dòng)數(shù)據(jù)到temp
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:17
     * @Param []
     * @return void
     **/
    public void moveToTemp() throws IOException {
        //拿到文件
        File[] logSourceArray = new File(LOG_SOURCE_DIR).listFiles();

        //移動(dòng)文件
        for (File file : logSourceArray) {
            FileUtils.moveFileToDirectory(file,new File(LOG_TEMP_DIR),true);
        }
    }

    /**
     * temp上傳到hdfs
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:18
     * @Param []
     * @return void
     **/
    public void setUpToHdfs() throws URISyntaxException, IOException, InterruptedException {
        //拿到全部的文件
        File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();

        //獲取hdfs鏈接
        FileSystem fileSystem =
                FileSystem.get(new URI(HDFS_URL),new Configuration(),"root");

        //獲取一個(gè)時(shí)間
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
        Calendar calendar = Calendar.getInstance();
        String time = sdf.format(calendar.getTime());

        //上傳文件
        for (File file : logTempArray) {
            fileSystem.copyFromLocalFile(
                    new Path(file.getAbsolutePath()),
                    new Path(HDFS_DIR+"/"+time+"/"+
                            HDFS_F_NAME+time+UUID.randomUUID()+HDFS_L_NAME));
        }

        //關(guān)閉流
        fileSystem.close();
    }

    /**
     * temp移動(dòng)到backup
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:18
     * @Param []
     * @return void
     **/
    public void tempToBackup() throws IOException {
        //拿到全部的文件
        File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();

        //拿到時(shí)間
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
        Calendar calendar = Calendar.getInstance();
        String time = sdf.format(calendar.getTime());

        for (File file : logTempArray) {
            FileUtils.moveFileToDirectory(
                    file,new File(LOG_BACKUP_DIR+"/"+time),true);
        }
    }
}

5.4 ClearDate

package com.looc.D02數(shù)據(jù)采集demo;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.TimerTask;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName ClearDate.java
 * @Description TODO
 * @createTime 2019年01月28日 19:19:00
 */
public class ClearDate extends TimerTask {

    private String TIME_MATE = "yyyy-MM-dd";
    private String LOG_BACKUP_DIR;

    /**
     * The action to be performed by this timer task.
     */
    @Override
    public void run() {
        try {
            init();
            fondTimeOutFile();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化路徑
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:25
     * @Param []
     * @return void
     **/
    public void init() throws IOException {
        Properties properties = PropertiesHolderLaze.getPros();

        LOG_BACKUP_DIR = properties.getProperty("LOG_BACKUP_DIR");
    }

    /**
     * 掃描文件夾是否存在超時(shí)文件
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:26
     * @Param []
     * @return void
     **/
    public void fondTimeOutFile() throws IOException, ParseException {
        //拿到路徑
        File[] fileArray = new File(LOG_BACKUP_DIR).listFiles();

        //拿到時(shí)間
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.DAY_OF_MONTH,-1);
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);

        for (File file : fileArray) {

            if (calendar.getTime().getTime() > sdf.parse(file.getName()).getTime()){
                //刪除
                FileUtils.deleteDirectory(file);
            }
        }
    }
}

5.5 Main

package com.looc.D02數(shù)據(jù)采集demo;


import java.io.IOException;
import java.util.Properties;
import java.util.Timer;

/**
 * 程序入口
 * @author chenPeng
 * @version 1.0.0
 * @ClassName Main.java
 * @Description TODO
 * @createTime 2019年01月28日 20:08:00
 */
public class Main {
    private static Integer LOG_TIMEOUT;

    public static void init() throws IOException {
        Properties properties = PropertiesHolderLaze.getPros();
        LOG_TIMEOUT = Integer.parseInt(properties.getProperty("LOG_TIMEOUT"));
    }

    public static void main(String[] args){
        Timer timer = new Timer();
        timer.schedule(new CollectionData(),0,LOG_TIMEOUT*60*60*1000L);
        timer.schedule(new ClearDate(),0,LOG_TIMEOUT*60*60*1000L);
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說(shuō)閱讀 12,417評(píng)論 6 13
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,305評(píng)論 2 89
  • 注:本文是我學(xué)習(xí)Hadoop權(quán)威指南的時(shí)候一些關(guān)鍵點(diǎn)的記錄,并不是全面的知識(shí)點(diǎn) Hadoop 避免數(shù)據(jù)丟失的方法:...
    利伊奧克兒閱讀 805評(píng)論 0 2
  • 你好!陳俊生 文 秋雨 最近熱播的《我的前半生》話題特別多,人們一邊替羅子君捏了一把汗,一邊大罵陳俊...
    陽(yáng)光普照1閱讀 496評(píng)論 0 0
  • 盜走浮華的夢(mèng)境 淺留楚門(mén)的幻影 掠過(guò)南飛的白燕 驚起靜坐的魚(yú)鷹 碰碎死寂的鏡湖 觸發(fā)千層的圓渦 騰躍成群的銀魚(yú) 揭...
    桐月九閱讀 261評(píng)論 0 3

友情鏈接更多精彩內(nèi)容