Hadoop是一個分布式的文件系統(tǒng)(HDFS),由很多服務器聯合起來實現其功能,集群中的服務器有各自的角色,用于存儲文件通過目錄樹來定位文件。
HDFS集群包括,NameNode、DataNode、Secondary Namenode:
(1)NameNode:負責管理整個文件系統(tǒng)的元數據,以及每一個路徑(文件)所對應的數據塊信息。
(2)DataNode:負責管理用戶的文件數據塊,每一個數據塊都可以在多個datanode上存儲多個副本。
(3)Secondary NameNode用來監(jiān)控HDFS狀態(tài)的輔助后臺程序,每隔一段時間獲取HDFS元數據的快照。
HDFS可以通過Java Api來實現對HDFS內的文件進行讀寫操作。
1、Hadoop安裝
由于本篇重點講HDFS的開發(fā),Hadoop的安裝配置就不重點講,我們通過最快速的方式來實現Hadoop安裝,即通過下載別人已經配置好的Docker鏡像進行Hadoop安裝。
我的鏡像:registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop
這個鏡像是已經完全配置好的Hadoop,通過偽分布式方式(管理節(jié)點和數據節(jié)點在一臺機)部署,直接運行即可使用。
(1)創(chuàng)建容器:
[root@iZbp13sno1lc2yxlhjc4b3Z ~]# docker run --name hadoop -d -p 8091:50070 -p 8092:9000 registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop
說明:
50070端口:提供HDFS的管理控制臺,可在瀏覽器里面查看HDFS各節(jié)點信息和文件目錄
9000端口:提供訪問HDFS文件系統(tǒng)地址,通過該端口對文件進行讀寫操作
該鏡像hadoop安裝目錄:/usr/local/hadoop
(2)配置文件:
2個重要的核心配置文件,hdfs-site.xml、core-site.xml。配置文件目錄:/usr/local/hadoop/etc/hadoop
hdfs-site.xml,配置hdfs各節(jié)點目錄地址:
<?xml version="1.0"?>
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///root/hdfs/namenode</value>
<description>NameNode directory for namespace and transaction logs storage.</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///root/hdfs/datanode</value>
<description>DataNode directory</description>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
core-site.xml,配置hdfs的訪問地址端口:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000/</value>
</property>
</configuration>
(3)訪問控制臺
瀏覽器輸入http://192.168.2.104:8091

可查看hdfs的各節(jié)點健康信息以及文件目錄信息。
2、hadoop項目搭建
使用springboot快速搭建項目結構,在項目里面引入hadoop相關的依賴包即可。
(1)pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--hadoop依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
(2)application.yml
主要配置hadoop的訪問地址,以及訪問用戶
#HDFS配置
hdfs:
path: hdfs://192.168.2.104:8092
user: root
注意:
path:我這里是docker主機的地址和端口,對應映射是docker里面hdfs配置的地址端口,即:hdfs://localhost:9000
user:這里是個坑點,這個用戶必須是運行hadoop的系統(tǒng)用戶,否則會報沒有權限操作的異常,由于docker是root用戶,所以填root。你可以試下隨便填,看報什么錯
(3)主要的JAVA類
操作HDFS涉及到的主要類就兩個,比較簡單:
Configuration:封裝客戶端和服務端的配置,就是訪問hdfs的地址path和用戶user。
FileSystem:封裝操作文件系統(tǒng)對象,提供了很多方法來對hdfs進行操作,就是對目錄/文件進行增/刪/改/查/上傳/下載/遍歷等。
3、通過API操作hdfs
(1)獲取FileSystem對象
public class HdfsService {
@Value("${hdfs.path}")
private String hdfsPath;
@Value("${hdfs.user}")
private String user;
/**
* 獲取hdfs配置信息
* @return
*/
private Configuration getConfiguration(){
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsPath);
return configuration;
}
/**
* 獲取文件系統(tǒng)對象
* @return
*/
public FileSystem getFileSystem() throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), user);
return fileSystem;
}
...
}
(2)通過FileSystem操作hdfs
在Service層添加如下API操作代碼:
/**
* 創(chuàng)建HDFS文件夾
* @param dir
* @return
* @throws Exception
*/
public boolean mkdir(String dir) throws Exception{
if(StringUtils.isBlank(dir)){
return false;
}
if(exist(dir)){
return true;
}
FileSystem fileSystem = getFileSystem();
boolean isOk = fileSystem.mkdirs(new Path(dir));
fileSystem.close();
return isOk;
}
/**
* 判斷HDFS的文件是否存在
* @param path
* @return
* @throws Exception
*/
public boolean exist(String path) throws Exception {
if(StringUtils.isBlank(path)){
return false;
}
FileSystem fileSystem = getFileSystem();
return fileSystem.exists(new Path(path));
}
/**
* 讀取路徑下的文件信息
* @param path
* @return
* @throws Exception
*/
public List<Map<String,Object>> readPathInfo(String path) throws Exception {
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FileStatus[] statuses = fs.listStatus(new Path(path));
if(statuses == null || statuses.length < 1){
return null;
}
List<Map<String,Object>> list = new ArrayList<>();
for(FileStatus fileStatus : statuses){
Map<String,Object> map = new HashMap<>();
map.put("filePath", fileStatus.getPath());
map.put("fileStatus", fileStatus.toString());
list.add(map);
}
return list;
}
/**
* HDFS創(chuàng)建文件
* @param path
* @param file
* @throws Exception
*/
public void createFile(String path, MultipartFile file) throws Exception {
if(StringUtils.isBlank(path) || null == file){
return;
}
FileSystem fs = getFileSystem();
String fileName = file.getOriginalFilename();//文件名
Path filePath = new Path(path + "/" + fileName);
FSDataOutputStream outputStream = fs.create(filePath);
outputStream.write(file.getBytes());
outputStream.close();
fs.close();
}
/**
* 讀取HDFS文件內容
* @param path
* @return
* @throws Exception
*/
public String readFileToString(String path) throws Exception{
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(new Path(path));
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer sb = new StringBuffer();
String line = "";
while ((line = reader.readLine()) != null){
sb.append(line);
}
return sb.toString();
}finally {
if(inputStream != null){
inputStream.close();
}
fs.close();
}
}
/**
* 獲取目錄下的文件列表
* @param path
* @return
* @throws Exception
*/
public List<Map<String,Object>> listFiles(String path) throws Exception {
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);
List<Map<String,Object>> list = new ArrayList<>();
while (iterator.hasNext()){
LocatedFileStatus fileStatus = iterator.next();
Map<String,Object> map = new HashMap<>();
map.put("filePath", fileStatus.getPath().toString());
map.put("fileName", fileStatus.getPath().getName());
list.add(map);
}
return list;
}
/**
* 重命名HDFS文件
* @param oldName
* @param newName
* @return
* @throws Exception
*/
public boolean renameFile(String oldName, String newName)throws Exception{
if(!exist(oldName) || StringUtils.isBlank(newName)){
return false;
}
FileSystem fs = getFileSystem();
boolean isOk = fs.rename(new Path(oldName), new Path(newName));
fs.close();
return isOk;
}
/**
* 刪除HDFS文件
* @param path
* @return
* @throws Exception
*/
public boolean deleteFile(String path)throws Exception {
if(!exist(path)){
return false;
}
FileSystem fs = getFileSystem();
boolean isOk = fs.deleteOnExit(new Path(path));
fs.close();
return isOk;
}
/**
* 上傳文件到HDFS
* @param path
* @param uploadPath
* @throws Exception
*/
public void uploadFile(String path,String uploadPath) throws Exception{
if(StringUtils.isBlank(path) || StringUtils.isBlank(uploadPath)){
return;
}
FileSystem fs = getFileSystem();
fs.copyFromLocalFile(new Path(path), new Path(uploadPath));
fs.close();
}
/**
* 從HDFS下載文件
* @param path
* @param downloadPath
* @throws Exception
*/
public void downloadFile(String path, String downloadPath) throws Exception{
if(StringUtils.isBlank(path) || StringUtils.isBlank(downloadPath)){
return;
}
FileSystem fs = getFileSystem();
fs.copyToLocalFile(new Path(path), new Path(downloadPath) );
fs.close();
}
/**
* 拷貝HDFS文件
* @param sourcePath
* @param targetPath
* @throws Exception
*/
public void copyFile(String sourcePath, String targetPath) throws Exception{
if(StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)){
return;
}
FileSystem fs = getFileSystem();
FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
try{
inputStream = fs.open(new Path(sourcePath));
outputStream = fs.create(new Path(targetPath));
//todo IOUtils.copyBytes(inputStream, outputStream, , false);
}finally {
if(inputStream != null){
inputStream.close();
}
if(outputStream != null){
outputStream.close();
}
fs.close();
}
}
/**
* 讀取HDFS文件并返回byte[]
* @param path
* @return
* @throws Exception
*/
public byte[] readFileToBytes(String path) throws Exception{
if(!exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(new Path(path));
return IOUtils.readFullyToByteArray(inputStream);
}finally {
if(inputStream != null){
inputStream.close();
}
fs.close();
}
}
/**
* 獲取文件塊在集群的位置
* @param path
* @return
* @throws Exception
*/
public BlockLocation[] getFileBlockLocations(String path)throws Exception{
if(exist(path)){
return null;
}
FileSystem fs = getFileSystem();
FileStatus fileStatus = fs.getFileStatus(new Path(path));
return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
(3)編寫Controller提供接口
@RestController
@RequestMapping("/api/hdfs")
public class HdfsController {
@Autowired
private HdfsService service;
@GetMapping("/mkdir")
public Object mkdir(String path){
try {
service.mkdir(path);
return RtnData.ok();
} catch (Exception e) {
return RtnData.fail(e);
}
}
@PostMapping("/createFile")
public Object createFile(String path, MultipartFile file){
try {
service.createFile(path, file);
return RtnData.ok();
} catch (Exception e) {
return RtnData.fail(e);
}
}
@GetMapping("/readFileToString")
public Object readFileToString(String path){
try {
return RtnData.ok(service.readFileToString(path));
} catch (Exception e) {
return RtnData.fail(e);
}
}
...
}
4、來個測試
我們通過調用/api/hdfs/mkdir接口來創(chuàng)建目錄,然后在控制臺看下是文件目錄是否存在。
通過postman調用接口,創(chuàng)建/test目錄:

登錄控制臺,菜單Utilities/Browse the file siystem,t可以看到已經成功添加:

5、填下坑,一定要看
下面的幾個坑點我搞了好幾天才解決。
(1)可能你會遇到Connect Refused之類的錯誤。
修改core-site.xml,將hdfs://localhost:9000/中的localhost修改為容器地址
(2)可能你會遇到Permission denied之類的錯誤
客戶端調用的時候,user不是運行hdfs的用戶/用戶組,沒有權限操作。
(3)可能你會遇到HADOOP_HOME and hadoop.home.dir are unset之類的報錯
客戶端沒有安裝設置hadoop的環(huán)境變量:HADOOP_HOME??梢圆挥枥頃?,不影響操作。
(4)可能你會遇到Could not locate Hadoop executable: xxx\bin\winutils.exe之類的報錯
客戶端hadoop的bin目錄沒有winutils.exe文件??梢圆挥枥頃挥绊懖僮?。