在《Hadoop三:Hadoop Java API初探(完結(jié))》中已經(jīng)介紹了為什么需要從winddows的eclipse轉(zhuǎn)到linux下eclipse+maven來寫java api,以及在linux搭建開發(fā)環(huán)境的demo程序。(詳見:http://www.itdecent.cn/p/dd13c1dba52d)
這一節(jié)我會介紹更詳細(xì)的HDFS客戶端API程序
寫代碼的時候需要從官網(wǎng)查看api文檔,地址:https://hadoop.apache.org/docs/r2.9.1/api/
1. 初始化部分代碼
conf.set("fs.defaultFS", "hdfs://10.10.77.194:9000"); //這一行的作用很重要,因為new出來的conf文件僅僅存儲的是hdfs配置文件默認(rèn)值,必須要在客戶端手動設(shè)置
conf.set("dfs.replication", "2"); //另外需要根據(jù)實(shí)際情況設(shè)置每一個block的副本數(shù)量,我做實(shí)驗設(shè)置的是2(配置文件中也是設(shè)置的2,跟配置文件保持同步就好)
package com.gamebear.test1;
import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
public class HadoopHdfs {
? ?static FileSystem fsObj = null;
? static Configuration conf = null;
? private static void init() throws Exception {
? ? conf = new Configuration(); /* set the "http://hadoop.apache.org/docs/stable/api/index.html" * set core-site.xml, "10.10.77.194" is the nameNode, "9000" is the nameNode client port */
? ? conf.set("fs.defaultFS", "hdfs://10.10.77.194:9000");
? ? conf.set("dfs.replication", "2");
? ? //conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
? ? System.out.println(conf.get("fs.hdfs.impl")); fsObj = FileSystem.get(conf); //get a hadoop FileSystem client object, refer to the web:"http://hadoop.apache.org/docs/stable/api/index.html"
? ? //fsObj = FileSystem.get(new URI("hdfs://10.10.77.194:9000"), conf, "root"); //get the configuration file from hadoop server
? }
2. 查看默認(rèn)配置參數(shù)
需要注意的是,有些參數(shù)是Hadoop服務(wù)器端參數(shù),有些是客戶端參數(shù)
private static void testConf() throws Exception{
????//import java.util.Map.Entry;
????Iterator<Entry<String, String>> it = conf.iterator();
? ??while(it.hasNext()){
? ??????Entry<String, String> ent = it.next();
? ??????System.out.println(ent.getKey() + ":" + ent.getValue());
????}
}
關(guān)注如下默認(rèn)參數(shù):
dfs.blocksize:134217728 //默認(rèn)值128M,最小值是1M
dfs.replication:3 //默認(rèn)值3, 我這里測試只有2臺datanode,設(shè)置為了2
dfs.datanode.data.dir:file://${hadoop.tmp.dir}/dfs/data
3. 上傳下載文件
在linux環(huán)境下的eclipse沒有跨平臺的問題
private static void testUpload() throws Exception
{
????fsObj.copyFromLocalFile(new Path("/kluter/hdpTestFile"), new Path("/json201710.tgz.copy"));
????System.out.println("upload finished!");
????fsObj.close();
}
private static void testDownload() throws Exception
{
????fsObj.copyToLocalFile(new Path("/json201710.tgz.copy"), new Path("/kluter/hdpTestFileDl"));
????System.out.println("download finished!");
????fsObj.close();
}
4. 嵌套創(chuàng)建目錄結(jié)構(gòu)
private static void testMkDir() throws Exception
{
????boolean mkdirs = fsObj.mkdirs(new Path("/aaa/bbb/ccc"));
????System.out.println("mkdirs finished!");
}
5. 遞歸刪除目錄
private static void testDel() throws Exception
{
????boolean delBl = fsObj.delete(new Path("/aaa/bbb"), true);
????System.out.println(delBl);
}
6. 遞歸查看文件夾下的文件信息
private static void testLs() throws Exception{
????RemoteIterator fileLst = fsObj.listFiles(new Path("/"), true);
????while(fileLst.hasNext()){
????????LocatedFileStatus fileStatus = fileLst.next();
????????System.out.println("blocksize: " + fileStatus.getBlockSize());
????????System.out.println("owner: " + fileStatus.getOwner());
????????System.out.println("Replication: " + fileStatus.getReplication());
????????System.out.println("permission: " + fileStatus.getPermission());
????????System.out.println("permission2: " + fileStatus.getPath().getName());
? ??????BlockLocation[] blockLocations = fileStatus.getBlockLocations();
????????for(BlockLocation bl:blockLocations){
????????????System.out.println("block-len: " + bl.getLength() + "---" + "block-offset:" + bl.getOffset());
????????????String[] hosts = bl.getHosts();
????????????for(String host:hosts){
????????????????System.out.println(host);
????????????}
????????}
????}
}
這里使用Iterator而不使用List的原因是因為,如果hdfs中有上億個文件,那么客戶端使用List的話內(nèi)存容易爆掉,而且網(wǎng)絡(luò)傳輸也很久。
而迭代器只是一種取數(shù)據(jù)的方式,調(diào)用next的時候才分配一個文件的少量內(nèi)存,不占用大內(nèi)存空間
代碼打印出了分block文件的信息,查看block所在的host,便于mapreduce的時候分布式計算
如果不想遞歸查看,參數(shù)寫false
7. 查看單目錄下文件信息
private static void testLs2() throws Exception {
????FileStatus[] listStatus = fsObj.listStatus(new Path("/"));
????for(FileStatus file:listStatus) {
????????System.out.println("permission2: " + file.getPath().getName());
????????System.out.println("permission2: " + file.getPath());
????????System.out.println(file.isDir()?"directory":"file");
????????System.out.println("----------------------------------------------");
????}
}
這種方式適用于單個目錄下文件較少的情況
8. 使用stream方式上傳、下載文件
用流的方式來操作hdfs上的文件,可以實(shí)現(xiàn)讀取指定偏移量范圍的數(shù)據(jù)
8.1 使用stream方式上傳文件
將/kluter/hdpTestFile以steam的形式上傳到hdfs的aaa目錄下:
private static void testUpload() throws Exception{
? ??FSDataOutputStream dfsOutStream = fsObj.create(new Path("/streamTestFile"), true);
? ??FileInputStream inputStream = new FileInputStream("/kluter/streamTestFile");
? ??IOUtils.copy(inputStream, dfsOutStream);
}
8.2 使用stream方式下載文件
private static void testDownload() throws Exception{
????FSDataInputStream inputStream = fsObj.open(new Path("/streamTestFile"));
????FileOutputStream dfsOutStream = new ????FileOutputStream("/kluter/downloadStreamFile");
????IOUtils.copy(inputStream, dfsOutStream);
}
9. 使用stream方式,按需求讀取
private static void testRamdomAccess() throws Exception{
????FSDataInputStream inputStream = fsObj.open(new Path("/streamTestFile"));
????FileOutputStream OutStream = new ????FileOutputStream("/kluter/downloadStreamFile");
????/**
????* just seek 100, pass the first 100 bytes
????*/
????// inputStream.seek(100);
????// IOUtils.copy(inputStream, OutStream);
????/**
????* copy by client control, u can use while here
????*/
????IOUtils.copyLarge(inputStream, OutStream, 100, 300);
}
private static void testPrintScr() throws Exception{
????FSDataInputStream inputStream = fsObj.open(new Path("/streamTestFile"));
????IOUtils.copy(inputStream, System.out);
}