大家好,我是Iggi。
今天我給大家分享的是HDFS-3.1.1版本的實驗示例。
首先用一段文字簡介HDFS:
HDFS是英文Hadoop Distributed File System的縮寫。作為Hadoop的核心組件之一,它的設計思路參考于Google的GFS論文,是GFS的開源實現。由于HDFS自身的成熟穩(wěn)定,加之擁有眾多用戶,現在已經成為當前分布式存儲的事實標準。HDFS為Hadoop生態(tài)圈中的其它組件提供最基本的存儲功能。它具有高容錯性、高可靠性、高擴展性、高獲得性、高吞吐率等特征為大數據存儲和處理提供了強大的底層存儲架構,可以說它是一切大數據平臺的基礎。HDFS采用Master/Slave架構,主服務器運行Master進程NameNode,從服務器Slave進程DataNode。它將集群中所有服務器的存儲空間連接到一起,構成了一個統(tǒng)一的、海量的存儲空間。下圖是HDFS的架構圖。

image
如果大家還想了解更多的有關HDFS的知識請訪問:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html
好,下面進入正題。介紹Java操作HDFS組件完成對文件系統(tǒng)的操作。
第一步:使用IDE建立Maven工程,建立工程時沒有特殊說明,按照向導提示點擊完成即可。重要的是在pom.xml文件中添加依賴包,內容如下圖。

image.png
等待系統(tǒng)下載好依賴的jar包后便可以編寫程序了。
以下代碼段是操作HDFS的測試類:
package linose.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* HDFS 服務類
* @author Iggi
*
*/
public class HdfsService {
protected String user;
protected String defaultFS;
protected Configuration conf;
protected FileSystem fileSystem;
/**
* 構造函數
* @param defaultFS hdfs://ip:port
* @throws IOException
*/
public HdfsService(String defaultFS) throws IOException {
this.defaultFS = defaultFS;
conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
fileSystem = FileSystem.get(conf);
}
/**
* 構造函數
* @param user hdfs用戶名
* @param defaultFS hdfs://ip:port
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public HdfsService(String user, String defaultFS) throws IOException, InterruptedException, URISyntaxException {
this.user = user;
this.defaultFS = defaultFS;
conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
fileSystem = FileSystem.get(new URI(defaultFS), conf, user);
}
/**
* 獲取HDFS文件系統(tǒng)對象,拿到該對象后可以跳過封裝,直接調用HDFS提供的方法。
* @return HDFS文件系統(tǒng)對象
*/
public FileSystem getFileSystem() {
return fileSystem;
}
/**
* 析構資源
*/
public void clear() {
fileSystem = null;
conf = null;
}
/**
* 將輸入路徑構建成hdfs路徑
* @param path 輸入路徑
* @return hdfs路徑
*/
protected Path buildHdfsPath(String path) {
String hdfsPath = defaultFS;
if (path.startsWith("/")) {
hdfsPath += path;
} else {
hdfsPath += ("/" + path);
}
return new Path(hdfsPath);
}
/**
* 輸入路徑是否存在
* @param path 輸入路徑
* @return 是否存在
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public boolean IsExists(String path) throws IOException, InterruptedException, URISyntaxException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.exists(hdfsPath);
}
/**
* 創(chuàng)建路徑,如果該路徑存在返回成功,否則創(chuàng)建新目錄
* @param path 路徑
* @return 成功或失敗
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public boolean createDirection(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
return true;
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.mkdirs(hdfsPath);
}
/**
* 創(chuàng)建文件返回流對象,如果該文件已存在就已追加形式打開并返回流對象
* @param path 文件路徑
* @return 成功或失敗
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public FSDataOutputStream createFile(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.append(hdfsPath);
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.create(hdfsPath);
}
/**
* 創(chuàng)建新文件,沒有寫文件操作。
* @param path 文件路徑
* @return 成功或失敗
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean createNewFile(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
return true;
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.createNewFile(hdfsPath);
}
/**
* 顯示制定路徑下的文件與目錄名稱
* @param path 制定路徑
* @return 制定目錄內的文件與文件夾名稱鏈表
* @throws FileNotFoundException
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public RemoteIterator<LocatedFileStatus> showFiles(String path) throws FileNotFoundException, IOException {
Path hdfsPath = buildHdfsPath(path);
RemoteIterator<LocatedFileStatus> fsIterator = fileSystem.listFiles(hdfsPath, true);
return fsIterator;
}
/**
* 重命名文件
* @param srcName 源文件名稱 /test/t1
* @param dstName 目標文件名稱 /test/t2
* @return 成功或失敗
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean rename(String fileName, String newFileName) throws IOException {
Path hdfsPath = buildHdfsPath(fileName);
Path newHdfsPath = buildHdfsPath(newFileName);
return fileSystem.rename(hdfsPath, newHdfsPath);
}
/**
* 刪除文件
* @param path 文件路徑
* @return 成功或失敗
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean delete(String path) throws IOException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.delete(hdfsPath, true);
}
/**
* 打開文件,獲取流對象后自行轉換要使用的格式例如byte[]或者String,使用后需要關閉流對象
* @param path
* @return 流對象
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public FSDataInputStream open(String path) throws IOException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.open(hdfsPath);
}
/**
* 獲取文件或文件夾路徑在集群中的位置
* @param path 文件或文件夾路徑
* @return 節(jié)點對象
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public BlockLocation[] getFileBlockLocation(String path) throws IOException, InterruptedException, URISyntaxException {
if (!IsExists(path)) {
return null;
}
Path hdfsPath = buildHdfsPath(path);
FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
/**
* 關閉文件系統(tǒng)
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void close() throws IOException {
fileSystem.close();
}
/**
* 上傳文件到HDFS
* @param localFile 本地文件
* @param hdfsDirection HDFS目錄
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void uploadFileToHdfs(String localFile, String hdfsDirection) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsDirection);
fileSystem.copyFromLocalFile(localPath, hdfsPath);
}
/**
* 拷貝文件輸出簡單的進度條,如果需要百分比,請自己實現
* @param localFile 本地文件
* @param hdfsDirection HDFS目錄
* @throws IOException
*/
public void uploadFileToHdfsWithProgress(String localFile, String hdfsDirection) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsDirection);
FileInputStream inputStream = new FileInputStream(localPath.toString());
InputStream input = new BufferedInputStream(inputStream);
FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {
public void progress() {
// 進度條的輸出
System.out.print(".");
}
});
IOUtils.copyBytes(input, outputStream, 4096);
input.close();
outputStream.close();
}
/**
* 從HDFS目錄下載文件
* @param localFile 本地文件位置
* @param hdfsPath HDFS文件位置
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void downloadFileFormHdfs(String localFile, String hdfsFile) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsFile);
fileSystem.copyToLocalFile(hdfsPath, localPath);
}
}
以下代碼段是測試代碼:
package linose.hdfs;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
//import org.apache.log4j.BasicConfigurator;
/**
* Hello HDFS!
* 本示例演示如何在分布式文件系統(tǒng)中進行:
* 創(chuàng)建目錄、創(chuàng)建文件、寫入數據、打開文件、讀取數據、追加數據、重命名文件、刪除文件、重命名目錄、刪除目錄
* 查詢指定路徑中的文件與目錄信息,查詢指定文件在那個集群節(jié)點
* 文件上傳、下載、進度展示功能。
*/
public class App
{
public static void main( String[] args ) throws IOException, InterruptedException, URISyntaxException
{
/**
* 為了清楚的看到輸出結果,暫將集群調試信息缺省。
* 如果想查閱集群調試信息,取消注釋即可。
*/
//BasicConfigurator.configure();
/**
* 操作HDFS基礎信息賦值
*/
String user = "hdfs";
String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
/**
* 獲取HDFS服務對象
*/
HdfsService service = new HdfsService(user, defaultFS);
/**
* 創(chuàng)建目錄示例
*/
String path = "index.dirs";
if (service.createDirection(path)) {
System.out.println("創(chuàng)建目錄成功");
} else {
System.out.println("創(chuàng)建目錄失敗");
}
/**
* 創(chuàng)建文件示例1,創(chuàng)建文件后返回流對象并寫入新信息。
*/
String file1 = "index.dirs/test1";
FSDataOutputStream outputStream = service.createFile(file1);
for (int i = 0; i < 5; ++i) {
outputStream.writeUTF("index: "+ i + " hello HDFS! \n");
}
outputStream.close();
System.out.println("創(chuàng)建文件,寫入Hello HDFS后,關閉流對象");
/**
* 創(chuàng)建文件示例2,創(chuàng)建新文件。
*/
String file2 = "index.dirs/test8";
if (service.createNewFile(file2)) {
System.out.println("創(chuàng)建新文件成功");
} else {
System.out.println("創(chuàng)建新文件失敗");
}
/**
* 追加數據
*/
outputStream = service.createFile(file1);
outputStream.writeUTF("這是一條追加數據 \n");
outputStream.close();
/**
* 打開文件讀取數據后關閉流對象
*/
FSDataInputStream inputStream = service.open(file1);
IOUtils.copyBytes(inputStream, System.out, 256);
inputStream.close();
/**
* 重命名文件
*/
String newFile = "index.dirs/test6.txt";
String srcFile = "index.dirs/test6";
if (service.rename(srcFile, newFile)) {
System.out.println("重命名成功");
} else {
System.out.println("重命名失敗");
}
/**
* 創(chuàng)建目錄、重命名目錄
*/
String path1 = "test.test";
if (service.createDirection(path1)) {
System.out.println("創(chuàng)建目錄成功");
} else {
System.out.println("創(chuàng)建目錄失敗");
}
String path2 = "test.dirs";
if (service.rename(path1, path2)) {
System.out.println("重命名目錄成功");
} else {
System.out.println("重命名目錄失敗");
}
/**
* 刪除目錄
*/
if (service.delete(path2)) {
System.out.println("刪除目錄成功");
} else {
System.out.println("刪除目錄失敗");
}
/**
* 上傳文件
*/
service.uploadFileToHdfs("/Users/liupengchun/Downloads/hadoop-logo.jpg", "/index.dirs");
/**
* 查詢指定路徑中的文件與目錄信息。
*/
RemoteIterator<LocatedFileStatus> fsIterator = service.showFiles("/index.dirs");
LocatedFileStatus lfs;
while (fsIterator.hasNext()) {
lfs = fsIterator.next();
System.out.println(
(lfs.isDirectory() ? "文件夾" : "文件")
+ "文件大?。? + lfs.getLen()
+ "文件路徑:" + lfs.getPath()
);
}
/**
* 查詢指定文件在那個集群節(jié)點
*/
BlockLocation[] locations = service.getFileBlockLocation(file1);
if(locations != null && locations.length > 0){
for(BlockLocation location : locations){
System.out.println(location.getHosts()[0]);
}
}
/**
* 刪除文件
*/
if (service.delete(file1)) {
System.out.println("刪除文件成功");
} else {
System.out.println("刪除文件失敗");
}
/**
* 下載文件
*/
service.downloadFileFormHdfs("/Users/liupengchun/Downloads/hadoop-logo2.jpg", "/index.dirs/hadoop-logo.jpg");
}
}
下圖為測試結果:

image.png
至此,HDFS-3.1.1 實驗示例演示完畢。