簡介
- 分布式文件存儲(chǔ),文件在物理上是分塊存儲(chǔ)(block),可以通過dfs.blocksizes設(shè)定,2.x默認(rèn)是128M,老版本是64M;
- 為客戶端提供統(tǒng)一的抽象目錄樹,客戶端通過路徑來訪問;
- 目錄結(jié)構(gòu)及分塊信息(元數(shù)據(jù))由namenode節(jié)點(diǎn)承擔(dān),namenode是HDFS集群的主節(jié)點(diǎn),維護(hù)整個(gè)目錄結(jié)構(gòu)block塊信息以及datanode信息;
- 文件的各個(gè)block的存儲(chǔ)管理由datanode負(fù)責(zé),DataNode是HDFS的從節(jié)點(diǎn),每一個(gè)block都可以在多個(gè)DataNode上存儲(chǔ)多個(gè)副本;
- 一次寫入,多次讀取,不支持修改;
注意:HDFS適合做數(shù)據(jù)分析,并不適合做網(wǎng)盤應(yīng)用,不便修改,延遲 大,網(wǎng)絡(luò)開銷大,成本太高。
常用Shell命令
1. 顯示目錄信息
hadoop fs -sl /
2. 創(chuàng)建目錄
hadoop fs -mkdir -p /dir1/dir2
3. 本地截切粘貼
hadoop fs -moveFromLocal /root/1.txt /dir1/dir2
4. 追加文件到已存在文件末尾
hadoop fs -appendToFile /root/2.txt /dir1/dir2/1.txt
5. 查看內(nèi)容
hadoop fs -cat /dir1/dir2/1.txt
hadoop fs -tail /dir1/dir2/1.txt
hadoop fs -text /dir1/dir2/1.txt
6. 拷貝到HDFS
hadoop fs -copyFromLocal ./1.txt /dir1/dir2
hadoop fs -put ./1.txt /dir1/dir2
7. HDFS內(nèi)部拷貝
hadoop fs -cp /dir1/dir2/1.txt /dir1/dir2/2.txt
8. HDFS內(nèi)部移動(dòng)
hadoop fs -mv ./2.txt /
9. 從HDFS拷貝到本地
hadoop fs -copyToLocal /2.txt
hadoop fs -get /2.txt
10. 合并下載多個(gè)文件
hadoop fs -getmerage /dir1/dir2/1.txt /2.txt
11. 刪除文件或文件夾
hadoop fs -rm -r /dir1/dir2
hadoop fs -rmdir /dir1/dir2(刪除空文件夾)
12. 統(tǒng)計(jì)文件系統(tǒng)可用空間
hadoop fs -df -h /
hadoop fs -du -h /dir1/dir2
13. 統(tǒng)計(jì)一個(gè)指定目錄下的文件節(jié)點(diǎn)數(shù)據(jù)量
hadoop fs -count /dir1/
14. 設(shè)置HDFS中文件的副本數(shù)量
hadoop fs -setrep 3 /dir1/dir2/1.txt
HDFS 工作機(jī)制
- 文件寫入過程
客戶端向HDFS寫數(shù)據(jù),首先要與NameNode通信,以確認(rèn)可以寫文件并獲取DataNode節(jié)點(diǎn)信息,然后客戶端根據(jù)DateNode信息按順序上傳數(shù)據(jù),DataNode將Block副本復(fù)制到其他DataNode;- 與NameNode 通信,請求上傳數(shù)據(jù),NameNode檢測是否存在、父目錄是否存在;
- 返回確認(rèn)信息;
- 客戶端請求NameNode,block1上傳到那些DataNode;
- NameNode返回DataNode信息
- 客戶端根據(jù)返回的DateNode信息,請求其中一臺(tái),上傳數(shù)據(jù)(本質(zhì)是RPC調(diào)用,建立pipeline),A到B,B到C根據(jù)副本數(shù)量依次類推,完成后逐級返回客戶端。
- 客戶端上傳block,以packet為單位,A到B,B到C,A每傳一個(gè)packet都會(huì)放入應(yīng)答隊(duì)列,等待應(yīng)答。
- 當(dāng)一個(gè)block傳輸完成,客戶端上傳第二個(gè)block。
- 文件讀取流程
客戶端將要獲取的文件路徑發(fā)送給NameNode,NameNode獲取文件的元數(shù)據(jù)信息(Block存放位置)返回給客戶端,客戶端通過相應(yīng)信息找到DataNode,逐個(gè)獲取文件的block并在客戶端本地進(jìn)行數(shù)據(jù)追加合并,從而得到完整文件。- 通過NameNode獲取元數(shù)據(jù)信息,找到文件塊所在DataNode。
- 挑選一臺(tái)DataNode(就近原則,然后隨機(jī)),建立Socket流。
- datanode開始發(fā)送數(shù)據(jù)(從磁盤讀取數(shù)據(jù)放入流,以packet為單位來做校驗(yàn));
- 客戶端以packet為單位接收,先在本地緩存,然后寫入目標(biāo)文件
NameNode機(jī)制
- 元數(shù)據(jù)存儲(chǔ)機(jī)制
- 內(nèi)存數(shù)據(jù)(完整的元數(shù)據(jù)信息)
- 磁盤元數(shù)據(jù)鏡像文件(“準(zhǔn)完整”,在namenode的工作目錄)
- 數(shù)據(jù)操作日志文件(可通過日志文件運(yùn)算出元數(shù)據(jù)),用于銜接內(nèi)存和持久化元數(shù)據(jù)鏡像的操作日志(edits文件),
注:當(dāng)客戶端新增,操作日志首先被記入edits文件,當(dāng)客戶端操作成功,相應(yīng)的元數(shù)據(jù)會(huì)更新到內(nèi)存中。
- 元數(shù)據(jù)手動(dòng)查看
可以通過hdfs的一個(gè)工具來查看edits中的信息
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
- 元數(shù)據(jù)的checkpoint
每隔一段時(shí)間,會(huì)由secondary namenode將namenode上積累的所有edits和一個(gè)最新的fsimage下載到本地,并加載到內(nèi)存進(jìn)行merage(這個(gè)過程稱作checkpoint) - checkpoint操作的觸發(fā)條件配置參數(shù)
dfs.namenode.checkpoint.check.period=60 #檢查觸發(fā)條件是否滿足的頻率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上兩個(gè)參數(shù)做checkpoint操作時(shí),secondary namenode的本地工作目錄
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重試次數(shù)
dfs.namenode.checkpoint.period=3600 #兩次checkpoint之間的時(shí)間間隔3600秒
dfs.namenode.checkpoint.txns=1000000 #兩次checkpoint之間最大的操作記錄
- checkpoint的附帶作用
namenode和secondary namenode的工作目錄存儲(chǔ)結(jié)構(gòu)完全相同,所以,當(dāng)namenode故障退出需要重新恢復(fù)時(shí),可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復(fù)namenode的元數(shù)據(jù)
DataNode工作機(jī)制
- 工作職責(zé)
存儲(chǔ)用戶的文件塊數(shù)據(jù)
定期向namenode匯報(bào)自身所持有的block信息,(通過心跳信息上報(bào)) - Datanode掉線判斷時(shí)限參數(shù)
datanode進(jìn)程死亡或者網(wǎng)絡(luò)故障造成datanode無法與namenode通信,namenode不會(huì)立即把該節(jié)點(diǎn)判定為死亡,要經(jīng)過一段時(shí)間,這段時(shí)間暫稱作超時(shí)時(shí)長。HDFS默認(rèn)的超時(shí)時(shí)長為10分鐘+30秒。如果定義超時(shí)時(shí)間為timeout,則超時(shí)時(shí)長的計(jì)算公式為:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而默認(rèn)的heartbeat.recheck.interval 大小為5分鐘,dfs.heartbeat.interval默認(rèn)為3秒。
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的單位為毫秒,dfs.heartbeat.interval的單位為秒。所以,舉個(gè)例子,如果heartbeat.recheck.interval設(shè)置為5000(毫秒),dfs.heartbeat.interval設(shè)置為3(秒,默認(rèn)),則總的超時(shí)時(shí)間為40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
Java API
- HDFS客戶端依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
注意:org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security .AccessControlException: Permission denied
沒有權(quán)限訪問:core-site.xml添加
```
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
2. windows上開發(fā)注意
1. 解壓hadoop安裝包;
2. 將安裝包下的lib和bin目錄用對應(yīng)windows版本平臺(tái)編譯的本地庫替換
3. 在window系統(tǒng)中配置HADOOP_HOME指向你解壓的安裝包
4. 在windows系統(tǒng)的path變量中加入hadoop的bin目錄
3. 代碼
```
//創(chuàng)建連接
Configuration conf = new Configuration();
//配置連接服務(wù)器
conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
//副本數(shù)量 :參數(shù)優(yōu)先級: 1、客戶端代碼中設(shè)置的值 2、classpath下的用戶自定義配置文件 3、然后是服務(wù)器的默認(rèn)配置
conf.set("dfs.replication", "3");
//獲取Hdfs客戶端
FileSystem fs = FileSystem.get(conf);
// 如果這樣去獲取,那conf里面就可以不要配"fs.defaultFS"參數(shù),而且,這個(gè)客戶端的身份標(biāo)識已經(jīng)是hadoop用戶
FileSystem fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
@Test
public void testAddFileToHdfs() throws Exception {
// 要上傳的文件所在的本地路徑
Path src = new Path("g:/redis-recommend.zip");
// 要上傳到hdfs的目標(biāo)路徑
Path dst = new Path("/aaa");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
* 從hdfs中復(fù)制文件到本地文件系統(tǒng)
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 創(chuàng)建目錄
fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除文件夾 ,如果是非空文件夾,參數(shù)2必須給值true
fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夾
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
* 查看目錄信息,只顯示文件
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什么返回迭代器,而不是List之類的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------為angelababy打印的分割線--------------");
}
}
/**
* 查看文件及文件夾信息
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
通過流的方式訪問hdfs
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先獲取一個(gè)文件的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構(gòu)造一個(gè)文件的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再將輸入流中數(shù)據(jù)傳輸?shù)捷敵隽? IOUtils.copyBytes(in, out, 4096);
}
/**
* hdfs支持隨機(jī)定位進(jìn)行文件讀取,而且可以方便地讀取指定長度
* 用于上層分布式運(yùn)算框架并發(fā)處理數(shù)據(jù)
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先獲取一個(gè)文件的輸入流----針對hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以將流的起始偏移量進(jìn)行自定義
in.seek(22);
//再構(gòu)造一個(gè)文件的輸出流----針對本地的
FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/**
* 顯示hdfs上文件的內(nèi)容
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024);
}
```