HDFS-3.1.1 分布式文件系統(tǒng) 實驗示例

大家好,我是Iggi。

今天我給大家分享的是HDFS-3.1.1版本的實驗示例。

首先用一段文字簡介HDFS:

HDFS是英文Hadoop Distributed File System的縮寫。作為Hadoop的核心組件之一,它的設計思路參考于Google的GFS論文,是GFS的開源實現。由于HDFS自身的成熟穩(wěn)定,加之擁有眾多用戶,現在已經成為當前分布式存儲的事實標準。HDFS為Hadoop生態(tài)圈中的其它組件提供最基本的存儲功能。它具有高容錯性、高可靠性、高擴展性、高獲得性、高吞吐率等特征為大數據存儲和處理提供了強大的底層存儲架構,可以說它是一切大數據平臺的基礎。HDFS采用Master/Slave架構,主服務器運行Master進程NameNode,從服務器Slave進程DataNode。它將集群中所有服務器的存儲空間連接到一起,構成了一個統(tǒng)一的、海量的存儲空間。下圖是HDFS的架構圖。

image

如果大家還想了解更多的有關HDFS的知識請訪問:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html

好,下面進入正題。介紹Java操作HDFS組件完成對文件系統(tǒng)的操作。

第一步:使用IDE建立Maven工程,建立工程時沒有特殊說明,按照向導提示點擊完成即可。重要的是在pom.xml文件中添加依賴包,內容如下圖。

image.png

等待系統(tǒng)下載好依賴的jar包后便可以編寫程序了。

以下代碼段是操作HDFS的測試類:

package linose.hdfs;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

/**
 * HDFS 服務類
 * @author Iggi
 *
 */
public class HdfsService {

    protected String user;
    protected String defaultFS;
    protected Configuration conf;
    protected FileSystem fileSystem;
    
    /**
     * 構造函數
     * @param defaultFS hdfs://ip:port
     * @throws IOException 
     */
    public HdfsService(String defaultFS) throws IOException {
        this.defaultFS = defaultFS;
        conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        fileSystem = FileSystem.get(conf);
    }
    /**
     * 構造函數
     * @param user hdfs用戶名
     * @param defaultFS hdfs://ip:port
     * @throws URISyntaxException 
     * @throws InterruptedException 
     * @throws IOException 
     */
    public HdfsService(String user, String defaultFS) throws IOException, InterruptedException, URISyntaxException {
        this.user = user;
        this.defaultFS = defaultFS;
        conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        fileSystem = FileSystem.get(new URI(defaultFS), conf, user);
    }
    
    /**
     * 獲取HDFS文件系統(tǒng)對象,拿到該對象后可以跳過封裝,直接調用HDFS提供的方法。
     * @return HDFS文件系統(tǒng)對象
     */
    public FileSystem getFileSystem() {
        return fileSystem;
    }
    
    /**
     * 析構資源
     */
    public void clear() {
        fileSystem = null;
        conf = null;
    }
    
    /**
     * 將輸入路徑構建成hdfs路徑
     * @param path 輸入路徑
     * @return hdfs路徑
     */
    protected Path buildHdfsPath(String path) {
        String hdfsPath = defaultFS;
        if (path.startsWith("/")) {
            hdfsPath += path;
        } else {
            hdfsPath += ("/" + path);
        }
        return new Path(hdfsPath);
    }
    
    /**
     * 輸入路徑是否存在
     * @param path 輸入路徑
     * @return 是否存在
     * @throws IOException
     * @throws URISyntaxException 
     * @throws InterruptedException 
     */
    public boolean IsExists(String path) throws IOException, InterruptedException, URISyntaxException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.exists(hdfsPath);
    }
    
    /**
     * 創(chuàng)建路徑,如果該路徑存在返回成功,否則創(chuàng)建新目錄
     * @param path 路徑
     * @return 成功或失敗
     * @throws IOException
     * @throws URISyntaxException 
     * @throws InterruptedException 
     */
    public boolean createDirection(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            return true;
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.mkdirs(hdfsPath);
    }
    
    /**
     * 創(chuàng)建文件返回流對象,如果該文件已存在就已追加形式打開并返回流對象
     * @param path 文件路徑
     * @return 成功或失敗
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public FSDataOutputStream createFile(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            Path hdfsPath = buildHdfsPath(path);
            return fileSystem.append(hdfsPath);
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.create(hdfsPath);
    }
    
    /**
     * 創(chuàng)建新文件,沒有寫文件操作。
     * @param path 文件路徑
     * @return 成功或失敗
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean createNewFile(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            return true;
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.createNewFile(hdfsPath);
    }
    
    /**
     * 顯示制定路徑下的文件與目錄名稱
     * @param path 制定路徑
     * @return 制定目錄內的文件與文件夾名稱鏈表
     * @throws FileNotFoundException
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public RemoteIterator<LocatedFileStatus> showFiles(String path) throws FileNotFoundException, IOException  {
        Path hdfsPath = buildHdfsPath(path);
        RemoteIterator<LocatedFileStatus> fsIterator = fileSystem.listFiles(hdfsPath, true);
        return fsIterator;
    }
    
    /**
     * 重命名文件
     * @param srcName 源文件名稱 /test/t1
     * @param dstName 目標文件名稱 /test/t2
     * @return 成功或失敗
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean rename(String fileName, String newFileName) throws IOException {
        Path hdfsPath = buildHdfsPath(fileName);
        Path newHdfsPath = buildHdfsPath(newFileName);
        return fileSystem.rename(hdfsPath, newHdfsPath);
    }
    
    /**
     * 刪除文件
     * @param path 文件路徑
     * @return 成功或失敗
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean delete(String path) throws IOException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.delete(hdfsPath, true);
    }
    
    /**
     * 打開文件,獲取流對象后自行轉換要使用的格式例如byte[]或者String,使用后需要關閉流對象
     * @param path
     * @return 流對象
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public FSDataInputStream open(String path) throws IOException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.open(hdfsPath);
    }
    
    /**
     * 獲取文件或文件夾路徑在集群中的位置
     * @param path 文件或文件夾路徑
     * @return 節(jié)點對象
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public BlockLocation[] getFileBlockLocation(String path) throws IOException, InterruptedException, URISyntaxException {
        if (!IsExists(path)) {
            return null;
        }
        Path hdfsPath = buildHdfsPath(path);
        FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
        return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }
    
    /**
     * 關閉文件系統(tǒng)
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void close() throws IOException {
        fileSystem.close();
    }
    
    /**
     * 上傳文件到HDFS
     * @param localFile 本地文件
     * @param hdfsDirection HDFS目錄
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void uploadFileToHdfs(String localFile, String hdfsDirection) throws IOException {
        Path localPath = new Path(localFile);
        Path hdfsPath = buildHdfsPath(hdfsDirection);
        fileSystem.copyFromLocalFile(localPath, hdfsPath);
    }
    
    /**
     * 拷貝文件輸出簡單的進度條,如果需要百分比,請自己實現
     * @param localFile 本地文件
     * @param hdfsDirection HDFS目錄
     * @throws IOException
     */
    public void uploadFileToHdfsWithProgress(String localFile, String hdfsDirection) throws IOException {
        Path localPath = new Path(localFile); 
        Path hdfsPath = buildHdfsPath(hdfsDirection);
        FileInputStream inputStream = new FileInputStream(localPath.toString());
        InputStream input = new BufferedInputStream(inputStream);
        FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {
            public void progress() {
                // 進度條的輸出
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(input, outputStream, 4096);
        input.close();
        outputStream.close();
    }
    
    /**
     * 從HDFS目錄下載文件
     * @param localFile 本地文件位置
     * @param hdfsPath HDFS文件位置
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void downloadFileFormHdfs(String localFile, String hdfsFile) throws IOException {
        Path localPath = new Path(localFile);
        Path hdfsPath = buildHdfsPath(hdfsFile);
        fileSystem.copyToLocalFile(hdfsPath, localPath);
    }
}

以下代碼段是測試代碼:

package linose.hdfs;

import java.io.IOException;
import java.net.URISyntaxException;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
//import org.apache.log4j.BasicConfigurator;

/**
 * Hello HDFS!
 * 本示例演示如何在分布式文件系統(tǒng)中進行:
 * 創(chuàng)建目錄、創(chuàng)建文件、寫入數據、打開文件、讀取數據、追加數據、重命名文件、刪除文件、重命名目錄、刪除目錄
 * 查詢指定路徑中的文件與目錄信息,查詢指定文件在那個集群節(jié)點
 * 文件上傳、下載、進度展示功能。
 */
public class App 
{
    public static void main( String[] args ) throws IOException, InterruptedException, URISyntaxException 
    {
        /**
         * 為了清楚的看到輸出結果,暫將集群調試信息缺省。
         * 如果想查閱集群調試信息,取消注釋即可。
         */
        //BasicConfigurator.configure();
        
        /**
         * 操作HDFS基礎信息賦值
         */
        String user = "hdfs";
        String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
        
        /**
         * 獲取HDFS服務對象
         */
        HdfsService service = new HdfsService(user, defaultFS);
        
        /**
         * 創(chuàng)建目錄示例
         */
        String path = "index.dirs";
        if (service.createDirection(path)) {
            System.out.println("創(chuàng)建目錄成功");
        } else {
            System.out.println("創(chuàng)建目錄失敗");
        }
        
        /**
         * 創(chuàng)建文件示例1,創(chuàng)建文件后返回流對象并寫入新信息。
         */
        String file1 = "index.dirs/test1";
        FSDataOutputStream outputStream = service.createFile(file1);
        for (int i = 0; i < 5; ++i) {
            outputStream.writeUTF("index: "+ i + " hello HDFS! \n");
        }
        outputStream.close();
        System.out.println("創(chuàng)建文件,寫入Hello HDFS后,關閉流對象");
        
        /**
         * 創(chuàng)建文件示例2,創(chuàng)建新文件。
         */
        String file2 = "index.dirs/test8";
        if (service.createNewFile(file2)) {
            System.out.println("創(chuàng)建新文件成功");
        } else {
            System.out.println("創(chuàng)建新文件失敗");
        }
        
        /**
         * 追加數據
         */
        outputStream = service.createFile(file1);
        outputStream.writeUTF("這是一條追加數據 \n");
        outputStream.close();
        
        /**
         * 打開文件讀取數據后關閉流對象
         */
        FSDataInputStream inputStream = service.open(file1);
        IOUtils.copyBytes(inputStream, System.out, 256);
        inputStream.close();
 
        /**
         * 重命名文件
         */
        String newFile = "index.dirs/test6.txt";
        String srcFile = "index.dirs/test6";
        if (service.rename(srcFile, newFile)) {
            System.out.println("重命名成功");
        } else {
            System.out.println("重命名失敗");
        }
        
        /**
         * 創(chuàng)建目錄、重命名目錄
         */
        String path1 = "test.test";
        if (service.createDirection(path1)) {
            System.out.println("創(chuàng)建目錄成功");
        } else {
            System.out.println("創(chuàng)建目錄失敗");
        }
        
        String path2 = "test.dirs";
        if (service.rename(path1, path2)) {
            System.out.println("重命名目錄成功");
        } else {
            System.out.println("重命名目錄失敗");
        }
        
        /**
         * 刪除目錄
         */
        if (service.delete(path2)) {
            System.out.println("刪除目錄成功");
        } else {
            System.out.println("刪除目錄失敗");
        }
        
        /**
         * 上傳文件
         */
        service.uploadFileToHdfs("/Users/liupengchun/Downloads/hadoop-logo.jpg", "/index.dirs");
        
        /**
         * 查詢指定路徑中的文件與目錄信息。
         */
        RemoteIterator<LocatedFileStatus> fsIterator = service.showFiles("/index.dirs");
        LocatedFileStatus lfs;
        while (fsIterator.hasNext()) {
            lfs = fsIterator.next();
            System.out.println(
                    (lfs.isDirectory() ? "文件夾" : "文件") 
                    + "文件大?。? + lfs.getLen()
                    + "文件路徑:" + lfs.getPath()
                    );
        }
        
        /**
         * 查詢指定文件在那個集群節(jié)點
         */
        BlockLocation[] locations = service.getFileBlockLocation(file1);
        if(locations != null && locations.length > 0){
            for(BlockLocation location : locations){
                System.out.println(location.getHosts()[0]);
            }
        }
        
        /**
         * 刪除文件
         */
        if (service.delete(file1)) {
            System.out.println("刪除文件成功");
        } else {
            System.out.println("刪除文件失敗");
        }
        
        /**
         * 下載文件
         */
        service.downloadFileFormHdfs("/Users/liupengchun/Downloads/hadoop-logo2.jpg", "/index.dirs/hadoop-logo.jpg");
    }
}

下圖為測試結果:


image.png

至此,HDFS-3.1.1 實驗示例演示完畢。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容