import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.*;
/**
* 文件系統(tǒng)工具類
*/
public class FSUtil {
//獲取FileSystem對(duì)象
public static FileSystem getFS() {
FileSystem fs = null;
try {
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", 8192);
fs = FileSystem.newInstance(conf);
} catch (Exception e) {
//do noting
}
return fs;
}
//獲取LocalFileSystem對(duì)象
public static LocalFileSystem getLFS() {
LocalFileSystem lfs = null;
try {
Configuration conf = new Configuration();
lfs = FileSystem.newInstanceLocal(conf);
} catch (Exception e) {
//do noting
}
return lfs;
}
//關(guān)閉文件對(duì)象
public static void closeFS(FileSystem fs) {
if (fs != null) {
try {
fs.close();
} catch (Exception e) {
// do nothing
}
}
}
/**
* 讀取hdfs的文件寫到console
*
* @throws IOException
*/
public static void downToConsole(FileSystem fs, String toFilePath) throws IOException {
FSDataInputStream fis = fs.open(new Path(toFilePath));
IOUtils.copyBytes(fis, System.out, 4096, true);
}
/**
* 讀取hdfs的文件寫到local
*/
public static void downToLocal(FileSystem fs, String fromHdfsPath, String toLocalPath) throws IOException {
FSDataInputStream fis = fs.open(new Path(fromHdfsPath));
OutputStream os = new FileOutputStream(new File(toLocalPath));
IOUtils.copyBytes(fis, os, 4096, true);
}
/**
* 創(chuàng)建目錄
*/
public static Boolean mkdir(FileSystem fs, String dirName) throws IOException {
boolean isok = fs.mkdirs(new Path(dirName));
return isok;
}
/**
* 列出 hdfs或者local 中的文件列表
* 遞歸列出文件文件列表???
*/
public static void listFile(FileSystem fs, String dirName) throws IllegalArgumentException, IOException {
FileStatus[] fss = fs.listStatus(new Path(dirName));
if (fss.length == 0) {
System.out.println("內(nèi)容為空");
return;
}
for (FileStatus f : fss) {
System.out.print("文件路徑:" + f.getPath().toString() + " ");
System.out.print("文件名:" + f.getPath().getName() + " ");
System.out.print("文件大小:" + f.getLen() / 1024.0 + "kb" + " ");
System.out.print("文件用戶:" + f.getOwner() + " ");
System.out.println("文件的權(quán)限:" + f.getPermission());
}
}
/**
* 獲取集群磁盤情況
*
* @throws IOException
*/
public static void getDiskSource(FileSystem fs) throws IOException {
FsStatus fsstatus = fs.getStatus();
System.out.println("總?cè)萘浚? + fsstatus.getCapacity() / 1024 / 1024 / 1024.0 + "GB");
System.out.println("已經(jīng)使用的容量:" + fsstatus.getUsed() / 1024 / 1024.0 + "MB");
System.out.println("維持容量:" + fsstatus.getRemaining() / 1024 / 1024 / 1024.0 + "GB");
}
/**
* 獲取DataNode信息
*/
public static void getDataNodeInfo(FileSystem fs) throws IOException {
//強(qiáng)轉(zhuǎn)為分布式文件系統(tǒng)
DistributedFileSystem dis = (DistributedFileSystem) fs;
DatanodeInfo[] dinfo = dis.getDataNodeStats();
for (DatanodeInfo info : dinfo) {
System.out.print(info.getHostName() + " ");
System.out.print(info.getName() + " ");
System.out.println(info.getCapacity());
}
}
/**
* 查看塊位置信息
*
* @param fileName
*/
public static void getBlockLocation(FileSystem fs, String fileName) throws IllegalArgumentException, IOException {
//獲取文件狀態(tài)
FileStatus fss = fs.getFileStatus(new Path(fileName));
//獲取塊位置信息
BlockLocation[] bls = fs.getFileBlockLocations(fss, 0, fss.getLen());
for (BlockLocation bl : bls) {
for (int i = 0; i < bl.getHosts().length; i++) {
System.out.print(bl.getHosts()[i] + " ");
System.out.print(bl.getTopologyPaths()[i] + " ");
System.out.println(bl.getNames()[i]);
}
}
}
/**
* 待進(jìn)度的文件上傳
*
* @param toHdfsPath
*/
public static void uploadWithProcess(FileSystem fs, String fromLocalPath, String toHdfsPath) throws IllegalArgumentException, IOException {
FileInputStream fis = new FileInputStream(new File(fromLocalPath));
FSDataOutputStream fos = fs.create(new Path(toHdfsPath), new Progressable() {
public void progress() {
try {
System.out.print("* ");
Thread.sleep(20);
} catch (InterruptedException e) {
//
}
}
});
IOUtils.copyBytes(fis, fos, 4096, true);
System.out.println("finished...");
}
/**
* 將本地文件copy到hdfs中(上傳目錄和文件均可以)
*/
public static void copyFromLocal(FileSystem fs, String fromLocalPath, String toHdfsPath) throws IllegalArgumentException, IOException {
fs.copyFromLocalFile(new Path(fromLocalPath), new Path(toHdfsPath));
}
/**
* 將本地文件copy到hdfs中(上傳不同目錄下的多個(gè)文件)
*/
public static void copyFromLocalOfMulti(FileSystem fs, Path[] fromLocalPaths, String toHdfsPath) throws IllegalArgumentException, IOException {
fs.copyFromLocalFile(false, false, fromLocalPaths, new Path(toHdfsPath));
}
/**
* 下載到本地(下載多個(gè)不同目錄的文件??)
*/
public static void copyToLocal(FileSystem fs, String fromHdfsPath, String toLocalPath) throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path(fromHdfsPath), new Path(toLocalPath));
}
/**
* 存在、刪除
*
* @param path
*/
public static void Delete(FileSystem fs, String path) throws IllegalArgumentException, IOException {
//判斷文件是否存在
//如果存在,在看是目錄還是是文件
Path fileOrDir = new Path(path);
if (fs.exists(fileOrDir)) {
if (fs.isDirectory(fileOrDir)) {
//遞歸刪除
fs.delete(fileOrDir, true);
} else {
fs.deleteOnExit(fileOrDir);
}
} else {
System.out.println("文件或者目錄不存在");
}
}
/**
* 和并本地小文件并上傳hdfs
*
* @param fileName
*/
public static void uploadWithMerge(FileSystem fs, LocalFileSystem lfs, String fileName, String toHdfsPath) throws IOException {
FileStatus[] fss = lfs.listStatus(new Path(fileName));
FSDataOutputStream fos = fs.create(new Path(toHdfsPath));
FileInputStream fis = null;
for (FileStatus f : fss) {
//獲取本地文件列表的路徑
String localFilePath = f.getPath().toString().replace("file:/", "");
//獲取本地文件的數(shù)據(jù)流
fis = new FileInputStream(new File(localFilePath));
byte[] bytes = new byte[1024];
int length = 0;
while ((length = fis.read(bytes)) != -1) {
fos.write(bytes, 0, length);
}
}
}
/**
* 重命名或者移動(dòng)
*/
public static void rename(FileSystem fs, String oldName, String newName) throws IllegalArgumentException, IOException {
fs.rename(new Path(oldName), new Path(newName));
}
/**
* 追加
*/
public static void append(FileSystem fs, String inputFilePath, String outFilePath) throws IllegalArgumentException, IOException {
FSDataOutputStream fos = fs.append(new Path(outFilePath), 4096);
InputStream is = new FileInputStream(new File(inputFilePath));
IOUtils.copyBytes(is, fos, 4096, true);
}
}
FSUtil_文件系統(tǒng)工具類
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 1、日歷和清單系統(tǒng)的工具推薦 (1)手機(jī)自帶日歷系統(tǒng) 蘋果手機(jī)的日歷系統(tǒng)可同步 (2)清單系統(tǒng)APP1:OmniF...
- c#中文件下載工具類,可以實(shí)現(xiàn)文件進(jìn)度和下載完成(成功、失?。┑幕卣{(diào),具體使用請(qǐng)往下翻 如何使用:
- -------------------------常用工具----------------------------...
- 今天的閱讀主要講的是意志力的生理特性,意志力其實(shí)是人們?cè)谶M(jìn)化的過(guò)程中所演變的一種生理本能,以保護(hù)我們不受到...
- 譯:cod9 注意:此圖集包含直面戰(zhàn)爭(zhēng)的畫面,請(qǐng)讀者自行斟酌。 第一次世界大戰(zhàn)特輯圖集系列:(點(diǎn)擊藍(lán)色標(biāo)題處可轉(zhuǎn)到...