背景
由于信息安全的管理需要,很多生產(chǎn)環(huán)境的服務器都采用了非常嚴格的遠程訪問方式,一些例行的服務器維護工作就變的比較繁瑣,需要多層登陸,并且執(zhí)行一大堆的命令才能夠完成,那么,作為一個畫家和攝影師,寫那么復雜的shell腳本怕是會要了我半條命,因此,我想到了一個曲線救國的方案——Jupyter。
思路
具體思路是這樣的:

為了安全起見,我沒有直接暴露堡壘機到互聯(lián)網(wǎng),而是在堡壘機上安裝了Docker環(huán)境,這樣我的Jupyter環(huán)境就可以只在我需要的時候打開,萬一Jupyter被別有用心的人發(fā)現(xiàn),也只是針對docker環(huán)境,多一層緩沖。由于我的堡壘機本身是一個計算集群中的一臺,所以我使用Rancher作為容器的管理工具,順便給大家安利一下,非常好用!比某帽的openshfit商業(yè)版還好用一百倍!
我需要做的主要工作是每個月把生產(chǎn)服務器上的物聯(lián)網(wǎng)數(shù)據(jù)壓縮打包傳輸?shù)奖緳C的計算集群并且倒入到InfluxDB時序數(shù)據(jù)庫中。由于物聯(lián)網(wǎng)數(shù)據(jù)量很大,并且壓縮比也非常大,所以如果直接傳輸非常浪費帶寬資源和時間,因此,我的大致步驟如下:
#%% ——————————Step0. 建立遠程服務器連接——————————
import os, time
# 我自己封裝的工具包,里面有python通過ssh協(xié)議遠程登錄服務器以及數(shù)據(jù)清洗、寫入influxDB等功能
from utilities import koalaTools as k
# 因為數(shù)據(jù)量很大,所以需要關注一下每次任務的執(zhí)行時間
s_time = time.process_time()
k.logger.info("start.........")
# 一些全局參數(shù)
local_work_dir ='/data/'
# 年月標簽,用于后面的文件命名
ym ='202003'
# zz是我的遠程服務器登陸信息,這些信息實際上存在我的一個數(shù)據(jù)庫中,通過Rest API調用獲得,這里為了方便,寫了一個本地的dict代替。
zz = {
"host_name":"",
"ssh_port":22,
"username":"root",
"password":"",
"remote_root":"/shared/",
"work_dir":"data",
}
# ssh client
sc = k.get_ssh_connection(hostname=zz['host_name'], username=zz['username'], port=zz['ssh_port'], password=zz['password'])
# sftp client
sftp = sc.open_sftp()
sftp.chdir(zz['remote_root'] + zz['work_dir'])
sftp.listdir()
網(wǎng)上很多文章用paramiko的sftpClient 去創(chuàng)建sftp連接,我用了一下感覺很多問題,并且我還要用bash命令去執(zhí)行tar打包等操作,因此,sftp可以從ssh connection來創(chuàng)建。另外,查看目標目錄的文件列表我更喜歡用sftp的listdir方法,返回一個list,比執(zhí)行command返回stdout,然后readline讀出來感覺省事兒很多。
這里說一下jupyter的好處,就是我可以把我的執(zhí)行步驟拆解成幾部分,執(zhí)行完一個jupyther的cell可以看到一些結果,確認無誤后再執(zhí)行下一個環(huán)節(jié),個人感覺也比寫shell方便很多,不然你的shell需要設計一大堆用于反饋的東西。
那么,上一步最后的sftp.listdir()輸出結果如果是我預期的,那么我就可以執(zhí)行下一步操作了。
#%% ——————————Step1. 遠程服務器文件打包——————————
# 遠程文件打包 tar -zcvf 2002020326.tar.gz 202002*,我生產(chǎn)服務器上的數(shù)據(jù)文件都是每天一個csv這樣dump出來的,所以可以利用這個規(guī)律進行按月的打包。
# 壓縮包文件名
gz_name = "".join([ym, 'tar.gz'])
tar_cmd = "".join(['tar -zcvf ', gz_name, ym + '*'])
k.exec_cmd(sc, tar_cmd)
k.exec_cmd(sc, 'ls -l')
同樣,執(zhí)行完后看一下打包結果,確認無誤后再進行下一步驟。
接下來是把文件通過sftp get到本地服務器,操作非常簡單,只是文件太大,傳輸萬一出現(xiàn)網(wǎng)絡不穩(wěn)定情況終端就比較麻煩,我畢竟是個藝術家,寫代碼不擅長,這里一直想加入一個進度監(jiān)控,但是還沒有想到合適的方式,請高人看到后指點一下,謝謝!
#%% ——————————Step2. 獲取文件到本地——————————
remote_file_path = zz['remote_root']+zz['work_dir']+'/' + gz_name
local_file_path = local_work_dir + gz_name
sftp.get(remote_file_path, local_file_path)
os.listdir(local_work_dir)
這里os.listdir看一下文件是否get過來了,沒問題的話執(zhí)行下一步。
這里注意,不要忘了關閉ssh連接哦!
#%% ——————————Step3. 本地服務解壓——————————
sc.close()
tmp_dir = '_tmp'
file_list = k.un_tar(gz_name, tmp_dir)
# 歸檔壓縮包
os.rename(local_work_dir + gz_name, "./tarbak/" + gz_name)
os.listdir(local_work_dir)
這里有一個感覺很麻煩的事情,就是老要用絕對路徑,也不知道確實是需要這樣還是因為我是菜鳥。
whatever,能用就行了吧,絕對路徑也有好處,程序拷到任何目錄執(zhí)行不受影響。
最后依然是老規(guī)矩,檢查一下這一步的操作成果是否正確,然后才執(zhí)行下一步。
最后一步就是將數(shù)據(jù)導入時序數(shù)據(jù)庫InfluxDB
#%% ——————————Step3. 導入至InfluxDB——————————
client = k.connect_to_influxdb()
k.logger.info('File list : %s' % os.listdir(tmp_dir))
for file_name in file_list:
if file_name.endswith('.csv') and os.path.getsize(file_name) > 0:
k.logger.info('Start saving File: %s' % file_name)
cf = k.pd.DataFrame()
cf = k.clear_me2(file_name)
k.save_to_influxdb(file_name, cf, client)
k.logger.info('Delete file: %s' % file_name)
os.remove(local_work_dir+file_name)
client.close()
e_time = time.process_time()
k.logger.info("The total running time is: %s" % (e_time - s_time))
下面是我的koalaTools.py的部分代碼,至于里面的數(shù)據(jù)清洗的內(nèi)容就不便公開了,涉及商業(yè)秘密啦,哈哈
import paramiko
import tarfile
# 加入日志
# 獲取logger實例
logger = logging.getLogger("baseSpider")
# 指定輸出格式
formatter = logging.Formatter('%(asctime)s\
%(levelname)-8s:%(message)s')
# 文件日志
file_handler = logging.FileHandler("operation_theServer.log")
file_handler.setFormatter(formatter)
# 控制臺日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 為logge添加具體的日志處理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
def get_ssh_connection(hostname, username, password=None, key_dir=None, port=22):
try:
# 創(chuàng)建SSH對象
sf = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
sf.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
# 是否證書登陸
if key_dir:
sf.connect(hostname=hostname, username=username, key_filename=key_dir,
password=password, port=port)
else:
sf.connect(hostname=hostname, port=port, username=username,
password=password)
logger.info("SSHConnection-->" + hostname + "<--succes!")
return sf
except Exception as e:
logger.error("SSHConnection-->" + hostname + "<--failed!")
logger.error(e)
return False
def exec_cmd(ssh_client, command):
stdin, stdout, stderr = ssh_client.exec_command(command)
for i in stdout.readlines():
logger.info(i)
def un_tar(tgz_name, tmp_dir="_tmp"):
"""
untar zip file
:param tgz_name: Tar gz File name
:return:
"""
tar = tarfile.open(tgz_name)
names = tar.getnames()
file_list = []
if os.path.isdir(tmp_dir):
pass
else:
os.mkdir(tmp_dir)
# 因為解壓后是很多文件,預先建立同名目錄
for name in names:
tar.extract(name, tmp+"/")
logger.info('Extract', name, 'From', tgz_name)
file_list.append(tmp_dir + '/' + name)
tar.close()
return file_list
之前我還用django寫了一個小的管理工具,試圖實現(xiàn)這些功能,但是本人是個強迫癥,因為做這個小工具,什么技術都想用最新的,所以從django開始,又研究了rest_framwork、VUE、React、flask等等,兜了一大圈子,畢竟我不是靠寫代碼吃飯的,作為一個藝術家,這樣的投入產(chǎn)出太不劃算了,而且非常不靈活,所以只用來作為我管理一些數(shù)據(jù)的小工具作罷。
下一篇文章,我打算分享一下給初級攝影愛好者的照片管理方案,本人的特點就是務實,絕對不會因為要顯示自己的牛逼故意講很多很復雜很專業(yè)的工具(雖然我也很專業(yè),但是沒必要拿出來顯擺啦,真正強大的人不需要證明自己,哈哈哈哈)