python操作hdfs模塊上傳文件到HDFS

因為公司需要,需要寫一個腳本將Windows server上的部分日志文件同步到HDFS上,每天定時啟動腳本上傳。
大體思路是,首先對比Windows server和HDFS上的是否一樣,不一樣就證明產(chǎn)生了新的日志文件,然后上傳。折騰了一天才弄好。。。
系統(tǒng): Mac(確切的說是黑蘋果,電腦老掉牙了,用起來還是挺卡的,木辦法,窮使我堅持住了,哈哈),如果是Windows,就是settings.py的路徑區(qū)別。
存在問題:目前,如果是新增目錄,暫時無法同步,但是固定的那幾個文件夾,新生成日志后是可以同步的,目前需求就這樣,所以先用了,有空了修改。

1. 用到hdfs模塊

 pip install hdfs

2. 首先,修改hosts文件,這步不做是連不上的。

vi /etc/hosts
# 將hdfs映射添加進去
10.211.XXX.XX hdfs01.XXXX.XXX
10.211.XXX.XX hdfs02.XXXX.XXX
......

3. 代碼可以在Windowsserver和linux server跑。
transferfile.py

# -*- coding: utf-8 -*-
import settings
import os
import platform
import logging as lg
from hdfs.client import Client
from hdfs.client import _logger


# 記錄日志(用了hdfs中源碼的logger)
logdir = settings.LOG_DIR
_logger.setLevel(lg.DEBUG)
myhandler = lg.FileHandler(logdir, "w")  # 覆蓋模式輸出日志
myhandler.setLevel(lg.DEBUG)
formatter = lg.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
myhandler.setFormatter(formatter)
_logger.addHandler(myhandler)


class Transfer(object):

    def __init__(self):
        self.host = settings.HOST
        self.remotepath = settings.REMOTE_PATH
        self.localpath = settings.LOCAL_PATH
        self.rootpath = settings.ROOT_PATH
        self.client = Client(self.host, root=self.rootpath)

    def upload_file_windows(self):
        """windowsserver"""
        try:
            base_dir = self.localpath.split('\\').pop()  # 要上傳的路徑的最后一個文件夾

            for root, dirs, files in os.walk(self.localpath):

                new_dir = base_dir + root.split(base_dir).pop().replace('\\','/')  # 去除本地路徑前綴
                

                for file in files:
                    old_path = root + '\\' + file  # 原始本地路徑文件
                    lpath = new_dir + '/' + file  # 去除本地路徑前綴后的文件
    
                    if not self.client.status(self.remotepath + '/' + lpath, strict=False):

                        # 第一個參數(shù)遠程路徑,第二個參數(shù)本地路徑,第三個參數(shù)是否覆蓋,第四個參數(shù)工作線程數(shù)
                        self.client.upload(self.remotepath + '/' + lpath, old_path, overwrite=False)

        except Exception as e:
            with open("err.log", "a") as f:
                f.write(str(e))
                
    def upload_file_linux(self, sep):
        """linuxserver"""
        try:
            base_dir = self.localpath.split(sep).pop()  # 要上傳的路徑的最后一個文件夾

            for root, dirs, files in os.walk(self.localpath):

                new_dir = base_dir + root.split(base_dir).pop()  # 去除本地路徑前綴

                for file in files:
                    old_path = root + sep + file  # 原始本地路徑文件
                    lpath = new_dir + sep + file  # 去除本地路徑前綴后的文件
                    
                    if not self.client.status(self.remotepath + sep + lpath, strict=False):

                        # len(old_path.split("/"))
                        # 第一個參數(shù)遠程路徑,第二個參數(shù)本地路徑,第三個參數(shù)是否覆蓋,第四個參數(shù)工作線程數(shù)
                        self.client.upload(self.remotepath + sep + lpath, old_path, overwrite=False)
        except Exception as e:
            with open("err.log", "a") as f:
                f.write(str(e))


if __name__ == "__main__":
    transfer = Transfer()
    # windows server
    if platform.platform().startswith("Windows"):
        transfer.upload_file_windows()
    else:
        # linux server
        transfer.upload_file_linux('/')

settings.py

# -*- coding: utf-8 -*-

HOST = "http://IP:PORT"  # 地址
ROOT_PATH = "/dsmp/johnny"  # 客戶端連接的目錄
REMOTE_PATH = "/dsmp/johnny"  # HDFS上的路徑
LOCAL_PATH = "/Users/smallcaff/Desktop/test"  # 本地路徑
LOG_DIR = "/Users/smallcaff/Desktop/test/mylog.log"  # 日志路徑
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容