(1) 向HDFS中上傳任意文本文件,如果指定的文件在HDFS中已經(jīng)存在,由用戶指定是追加到原有文件末尾還是覆蓋原有的文件;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_1_ExistAppendCopy {
/**
* 判斷路徑是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 復(fù)制文件到指定路徑 若路徑已存在,則進(jìn)行覆蓋
*/
public static void copyFromLocalFile(Configuration conf, String localFilePath, String
remoteFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
// fs.copyFromLocalFile 第一個(gè)參數(shù)表示是否刪除源文件,第二個(gè)參數(shù)表示是否覆蓋
fs.copyFromLocalFile(false, true, localPath, remotePath);
fs.close();
}
/**
* 追加文件內(nèi)容
*/
public static void appendToFile(Configuration conf, String localFilePath, String
remoteFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 創(chuàng)建一個(gè)文件讀入流 */
FileInputStream in = new FileInputStream(localFilePath);
/* 創(chuàng)建一個(gè)文件輸出流,輸出的內(nèi)容將追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
/* 讀寫文件內(nèi)容 */
byte[] data = new byte[1024];
int read = -1;
while ((read = in.read(data)) > 0) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String localFilePath = "/usr/local/hadoop/input/append.txt"; // 本地路徑
String remoteFilePath = "/user/hadoop/data3merge.txt"; // HDFS 路徑
String choice = "append"; // 若文件存在則追加到文件末尾
// String choice = "overwrite"; // 若文件存在則覆蓋
try {
/* 判斷文件是否存在 */
Boolean fileExists = false;
if (HW1_1_ExistAppendCopy.test(conf, remoteFilePath)) {
fileExists = true;
System.out.println(remoteFilePath + " 已存在.");
} else {
System.out.println(remoteFilePath + " 不存在.");
}
/* 進(jìn)行處理 */
if (!fileExists) { // 文件不存在,則上傳
HW1_1_ExistAppendCopy.copyFromLocalFile(conf, localFilePath,
remoteFilePath);
System.out.println(localFilePath + " 已上傳至 " + remoteFilePath);
} else if (choice.equals("overwrite")) { // 選擇覆蓋
HW1_1_ExistAppendCopy.copyFromLocalFile(conf, localFilePath,
remoteFilePath);
System.out.println(localFilePath + " 已覆蓋 " + remoteFilePath);
} else if (choice.equals("append")) { // 選擇追加
HW1_1_ExistAppendCopy.appendToFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2) 從HDFS中下載指定文件,如果本地文件與要下載的文件名稱相同,則自動(dòng)對(duì)下載的文件重命名;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_2_ExistRenameCopy {
/**
* 下載文件到本地 判斷本地路徑是否已存在,若已存在,則自動(dòng)進(jìn)行重命名
*/
public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
File f = new File(localFilePath);
/* 如果文件名存在,自動(dòng)重命名(在文件名后面加上 _0, _1 ...) */
if (f.exists()) {
System.out.println(localFilePath + " 已存在.");
Integer i = 0;
while (true) {
f = new File(localFilePath + "_" + i.toString());
if (!f.exists()) {
localFilePath = localFilePath + "_" + i.toString();
break;
}
}
System.out.println("將重新命名為: " + localFilePath);
}
// 下載文件到本地
Path localPath = new Path(localFilePath);
fs.copyToLocalFile(remotePath, localPath);
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String localFilePath = "/usr/local/hadoop/output/data3merge.txt"; // 本地路徑
String remoteFilePath = "/user/hadoop/data3merge.txt"; // HDFS 路徑
try {
HW1_2_ExistRenameCopy.copyToLocal(conf, remoteFilePath, localFilePath);
System.out.println("下載完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(3) 將HDFS中指定文件的內(nèi)容輸出到終端中;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_3_CatFile {
/**
* 讀取文件內(nèi)容
*/
public static void cat(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataInputStream in = fs.open(remotePath);
BufferedReader d = new BufferedReader(new InputStreamReader(in));
String line = null;
while ((line = d.readLine()) != null)
System.out.println(line);
d.close();
in.close();
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteFilePath = "/user/hadoop/data3merge.txt"; // HDFS 路徑
try {
System.out.println("讀取文件: " + remoteFilePath);
HW1_3_CatFile.cat(conf, remoteFilePath);
System.out.println("\n 讀取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(4) 顯示HDFS中指定的文件的讀寫權(quán)限、大小、創(chuàng)建時(shí)間、路徑等信息;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;
public class HW1_4_ShowMsg {
/**
* 顯示指定文件的信息
*/
public static void ls(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FileStatus[] fileStatuses = fs.listStatus(remotePath);
for (FileStatus s : fileStatuses) {
System.out.println("路徑: " + s.getPath().toString());
System.out.println("權(quán)限: " + s.getPermission().toString());
System.out.println("大小: " + s.getLen());
/* 返回的是時(shí)間戳,轉(zhuǎn)化為時(shí)間日期格式 */
Long timeStamp = s.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("時(shí)間: " + date);
}
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteFilePath = "/user/hadoop/data3merge.txt"; // HDFS 路徑
try {
System.out.println("讀取文件信息: " + remoteFilePath);
HW1_4_ShowMsg.ls(conf, remoteFilePath);
System.out.println("\n 讀取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(5) 給定HDFS中某一個(gè)目錄,輸出該目錄下的所有文件的讀寫權(quán)限、大小、創(chuàng)建時(shí)間、路徑等信息,如果該文件是目錄,則遞歸輸出該目錄下所有文件相關(guān)信息;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;
public class HW1_5_ShowAllMsg {
/**
* 顯示指定文件夾下所有文件的信息(遞歸)
*/
public static void lsDir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
/* 遞歸獲取目錄下的所有文件 */
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
/* 輸出每個(gè)文件的信息 */
while (remoteIterator.hasNext()) {
FileStatus s = remoteIterator.next();
System.out.println("路徑: " + s.getPath().toString());
System.out.println("權(quán)限: " + s.getPermission().toString());
System.out.println("大小: " + s.getLen());
/* 返回的是時(shí)間戳,轉(zhuǎn)化為時(shí)間日期格式 */
Long timeStamp = s.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("時(shí)間: " + date);
System.out.println();
}
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteDir = "/user/hadoop"; // HDFS 路徑
try {
System.out.println("(遞歸)讀取目錄下所有文件的信息: " + remoteDir);
HW1_5_ShowAllMsg.lsDir(conf, remoteDir);
System.out.println("讀取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(6) 提供一個(gè)HDFS內(nèi)的文件的路徑,對(duì)該文件進(jìn)行創(chuàng)建和刪除操作。如果文件所在目錄不存在,則自動(dòng)創(chuàng)建目錄;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_6_CreateDelete {
/**
* 判斷路徑是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 創(chuàng)建目錄
*/
public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
boolean result = fs.mkdirs(dirPath);
fs.close();
return result;
}
/**
* 創(chuàng)建文件
*/
public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataOutputStream outputStream = fs.create(remotePath);
outputStream.close();
fs.close();
}
/**
* 刪除文件
*/
public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
boolean result = fs.delete(remotePath, false);
fs.close();
return result;
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteFilePath = "/user/hadoop/input_new/text.txt"; // HDFS 路徑
String remoteDir = "/user/hadoop/input_new"; // HDFS 路徑對(duì)應(yīng)的目錄
try {
/* 判斷路徑是否存在,存在則刪除,否則進(jìn)行創(chuàng)建 */
if (HW1_6_CreateDelete.test(conf, remoteFilePath)) {
HW1_6_CreateDelete.rm(conf, remoteFilePath); // 刪除
System.out.println("刪除路徑: " + remoteFilePath);
} else {
if (!HW1_6_CreateDelete.test(conf, remoteDir)) { // 若目錄不存在,則進(jìn)行創(chuàng)建
HW1_6_CreateDelete.mkdir(conf, remoteDir);
System.out.println("創(chuàng)建文件夾: " + remoteDir);
}
HW1_6_CreateDelete.touchz(conf, remoteFilePath);
System.out.println("創(chuàng)建路徑: " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(7) 提供一個(gè)HDFS的目錄的路徑,對(duì)該目錄進(jìn)行創(chuàng)建和刪除操作。創(chuàng)建目錄時(shí),如果目錄文件所在目錄不存在則自動(dòng)創(chuàng)建相應(yīng)目錄;刪除目錄時(shí),由用戶指定當(dāng)該目錄不為空時(shí)是否還刪除該目錄;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_7_CreateDeleteDir {
/**
* 判斷路徑是否存在 s
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 判斷目錄是否為空 true: 空,false: 非空
*/
public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
return !remoteIterator.hasNext();
}
/**
* 創(chuàng)建目錄
*/
public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
boolean result = fs.mkdirs(dirPath);
fs.close();
return result;
}
/**
* 刪除目錄
*/
public static boolean rmDir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
/* 第二個(gè)參數(shù)表示是否遞歸刪除所有文件 */
boolean result = fs.delete(dirPath, true);
fs.close();
return result;
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteDir = "/user/hadoop/input_new/input_bk"; // HDFS 目錄
Boolean forceDelete = false; // 是否強(qiáng)制刪除
try {
/* 判斷目錄是否存在,不存在則創(chuàng)建,存在則刪除 */
if (!HW1_7_CreateDeleteDir.test(conf, remoteDir)) {
HW1_7_CreateDeleteDir.mkdir(conf, remoteDir); // 創(chuàng)建目錄
System.out.println("創(chuàng)建目錄: " + remoteDir);
} else {
if (HW1_7_CreateDeleteDir.isDirEmpty(conf, remoteDir) || forceDelete) { // 目錄為空或強(qiáng)制刪除
HW1_7_CreateDeleteDir.rmDir(conf, remoteDir);
System.out.println("刪除目錄: " + remoteDir);
} else { // 目錄不為空
System.out.println("目錄不為空,不刪除: " + remoteDir);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(8) 向HDFS中指定的文件追加內(nèi)容,由用戶指定內(nèi)容追加到原有文件的開(kāi)頭或結(jié)尾;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_8_AppendInsert {
/**
* 判斷路徑是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}
/**
* 追加文本內(nèi)容
*/
public static void appendContentToFile(Configuration conf, String content, String
remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 創(chuàng)建一個(gè)文件輸出流,輸出的內(nèi)容將追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
out.write(content.getBytes());
out.close();
fs.close();
}
/**
* 追加文件內(nèi)容
*/
public static void appendToFile(Configuration conf, String localFilePath, String
remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 創(chuàng)建一個(gè)文件讀入流 */
FileInputStream in = new FileInputStream(localFilePath);
/* 創(chuàng)建一個(gè)文件輸出流,輸出的內(nèi)容將追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
/* 讀寫文件內(nèi)容 */
byte[] data = new byte[1024];
int read = -1;
while ( (read = in.read(data)) > 0 ) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}
/**
* 移動(dòng)文件到本地
* 移動(dòng)后,刪除源文件
*/
public static void moveToLocalFile(Configuration conf, String remoteFilePath, String
localFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
Path localPath = new Path(localFilePath);
fs.moveToLocalFile(remotePath, localPath);
}
/**
* 創(chuàng)建文件
*/
public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataOutputStream outputStream = fs.create(remotePath);
outputStream.close();
fs.close();
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String remoteFilePath = "/user/hadoop/append_ch.txt"; // HDFS 文件
String content = "新追加的內(nèi)容\n";
String choice = "after"; //追加到文件末尾
// String choice = "before"; // 追加到文件開(kāi)頭
try {
/* 判斷文件是否存在 */
if ( !HW1_8_AppendInsert.test(conf, remoteFilePath) ) {
System.out.println("文件不存在: " + remoteFilePath);
} else {
if ( choice.equals("after") ) { // 追加在文件末尾
HW1_8_AppendInsert.appendContentToFile(conf, content,
remoteFilePath);
System.out.println("已追加內(nèi)容到文件末尾" + remoteFilePath);
} else if ( choice.equals("before") ) { // 追加到文件開(kāi)頭
/* 沒(méi)有相應(yīng)的 api 可以直接操作,因此先把文件移動(dòng)到本地,創(chuàng)建一個(gè)
新的 HDFS,再按順序追加內(nèi)容 */
String localTmpPath = "/usr/local/hadoop/input/append.txt";
HW1_8_AppendInsert.moveToLocalFile(conf, remoteFilePath,
localTmpPath); // 移動(dòng)到本地
HW1_8_AppendInsert.touchz(conf, remoteFilePath); // 創(chuàng)建一個(gè)新文件
HW1_8_AppendInsert.appendContentToFile(conf, content,
remoteFilePath); // 先寫入新內(nèi)容
HW1_8_AppendInsert.appendToFile(conf, localTmpPath, remoteFilePath);
// 再寫入原來(lái)內(nèi)容
System.out.println("已追加內(nèi)容到文件開(kāi)頭: " + remoteFilePath);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(9) 刪除HDFS中指定的文件;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_9_DeleteFile {
/**
* 刪除文件
*/
public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
boolean result = fs.delete(remotePath, false);
fs.close();
return result;
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String remoteFilePath = "/user/hadoop/data3merge.txt"; // HDFS 文件
try {
if ( HW1_9_DeleteFile.rm(conf, remoteFilePath) ) {
System.out.println("文件刪除: " + remoteFilePath);
} else {
System.out.println("操作失?。ㄎ募淮嬖诨騽h除失?。?);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(10) 刪除HDFS中指定的目錄,由用戶指定目錄中如果存在文件時(shí)是否刪除目錄;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_10_DeleteDir {
/**
* 判斷目錄是否為空
* true: 空,false: 非空
*/
public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
return !remoteIterator.hasNext();
}
/**
* 刪除目錄
*/
public static boolean rmDir(Configuration conf, String remoteDir, boolean recursive) throws
IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
/* 第二個(gè)參數(shù)表示是否遞歸刪除所有文件 */
boolean result = fs.delete(dirPath, recursive);
fs.close();
return result;
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String remoteDir = "/user/hadoop/input_new/input_bk"; // HDFS 目錄
Boolean forceDelete = false; // 是否強(qiáng)制刪除
try {
if ( !HW1_10_DeleteDir.isDirEmpty(conf, remoteDir) && !forceDelete ) {
System.out.println("目錄不為空,不刪除");
} else {
if ( HW1_10_DeleteDir.rmDir(conf, remoteDir, forceDelete) ) {
System.out.println("目錄已刪除: " + remoteDir);
} else {
System.out.println("操作失敗");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(11) 在HDFS中,將文件從源路徑移動(dòng)到目的路徑。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class HW1_11_MVFile {
/**
* 移動(dòng)文件
*/
public static boolean mv(Configuration conf, String remoteFilePath, String remoteToFilePath)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Path srcPath = new Path(remoteFilePath);
Path dstPath = new Path(remoteToFilePath);
boolean result = fs.rename(srcPath, dstPath);
fs.close();
return result;
}
/**
* 主函數(shù)
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
String remoteFilePath = "hdfs:///user/hadoop/append_ch.txt"; // 源文件 HDFS 路徑
String remoteToFilePath = "hdfs:///user/hadoop/output.txt"; // 目的 HDFS 路徑
try {
if (HW1_11_MVFile.mv(conf, remoteFilePath, remoteToFilePath)) {
System.out.println("將文件 " + remoteFilePath + " 移動(dòng)到 " +
remoteToFilePath);
} else {
System.out.println("操作失敗(源文件不存在或移動(dòng)失敗)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}