項(xiàng)目背景
之前項(xiàng)目中經(jīng)常使用paramiko模塊SFTP下載遠(yuǎn)端文件到本地,再讀取本地文件進(jìn)行相關(guān)操作后入庫,最終再刪除該本地文件。
思考優(yōu)化
但是最近開始思考到,其實(shí)很多情況下是不需要保留SFTP下載到本地的文件的,只需一次性使用其中的數(shù)據(jù)而已。
查看了一下paramiko模塊的API,paramiko模塊中下載數(shù)據(jù)的方法SFTPClient類有get和getfo兩種。前者則是我常用的下載文件到本地的方法,后者是將文件一次讀取為數(shù)據(jù)流再寫入到文件對(duì)象的方法。但都不符合我的預(yù)期要求。
我希望能安全的讀取SFTP遠(yuǎn)端文件的數(shù)據(jù)對(duì)象到內(nèi)存中使用,便有了這次嘗試。既然要安全的讀取遠(yuǎn)端文件,便不能把文件內(nèi)容一次全部加載到內(nèi)存,這里就需要考慮使用生成器。
代碼實(shí)現(xiàn)
我的整體測試代碼如下,借助sftp的open方法,實(shí)現(xiàn)一個(gè)按行讀取遠(yuǎn)端大文件的生成器方法。
更新:參考源碼的getfo方法,使用prefeach預(yù)加載全部數(shù)據(jù),否則實(shí)際使用過程中遇到大型文佳效率將會(huì)非常低。
import paramiko
class SftpTest(object):
def __init__(self):
self.host = 'Your sftp host'
self.port = 22
self.username = 'Your username'
self.password = 'Your password'
self.ssh = None
self.sftp = None
self.connect()
def __get_ssh(self):
try:
self.ssh = paramiko.Transport((self.host, self.port))
self.ssh.connect(username=self.username, password=self.password)
except Exception as e:
print(e)
def __get_sftp(self):
try:
self.sftp = paramiko.SFTPClient.from_transport(self.ssh)
except Exception as e:
print(e)
def connect(self):
self.__get_ssh()
self.__get_sftp()
def close(self):
if hasattr(self.sftp, "close") and callable(self.sftp.close):
self.sftp.close()
self.sftp = None
if hasattr(self.ssh, "close") and callable(self.ssh.close):
self.ssh.close()
self.ssh = None
# 借助sftp的open方法自定義實(shí)現(xiàn)一個(gè)生成器方法,返回一個(gè)按行讀取遠(yuǎn)端文件的生成器對(duì)象
def generator_readfile(self, remotepath):
file_size = self.sftp.stat(remotepath).st_size
with self.sftp.open(remotepath, "r") as f:
f.prefetch(file_size)
while True:
line = f.readline()
if not line:
break
yield line
# 使用示例
if __name__ == '__main__':
sftp = SftpTest()
gen_obj = sftp.generator_readfile(remotepath="/root/download/test_dat2.DAT")
for i in gen_obj:
print(i, end="")
# do something
print("insert to MongoDB")
sftp.close()
這樣就可以按行讀取到遠(yuǎn)端文件數(shù)據(jù),并對(duì)數(shù)據(jù)進(jìn)行自定義的操作。輸出示例如下:
2020-02-04;上海貿(mào)易有限公司0;2004;1423307;
insert to MongoDB
2020-02-04;上海貿(mào)易有限公司1;2004;4889789;
insert to MongoDB
2020-02-04;上海貿(mào)易有限公司2;2004;1577178;
insert to MongoDB
2020-02-04;上海貿(mào)易有限公司3;2001;7205864;
insert to MongoDB
2020-02-04;上海貿(mào)易有限公司4;2004;7202767;
insert to MongoDB
...
也可以利用生成器的特性,使用next()方法對(duì)生成器進(jìn)行讀取操作:
if __name__ == '__main__':
sftp = SftpTest()
gen_obj = sftp.generator_readfile(remotepath="/root/download/test_dat2.DAT")
res = []
for i in range(1000):
line = next(gen_obj)
res.append(line)
# do something
print(res)
sftp.close()
輸出如下:
['2020-02-04;上海貿(mào)易有限公司0;2004;1423307;\n', '2020-02-04;上海貿(mào)易有限公司1;2004;4889789;\n', '2020-02-04;上海貿(mào)易有限公司2;2004;1577178;\n', '2020-02-04;上海貿(mào)易有限公司3;2001;7205864;\n', '2020-02-04;上海貿(mào)易有限公司4;2004;7202767;\n', '2020-02-04;上海貿(mào)易有限公司5;2004;9012832;\n', '2020-02-04;上海貿(mào)易有限公司6;2001;2525190;\n', '2020-02-04;上海貿(mào)易有限公司7;2001;8152497;\n', '2020-02-04;上海貿(mào)易有限公司8;2004;2008408;\n', '2020-02-04;上海貿(mào)易有限公司9;2004;6432318;\n', '2020-02-04;上海貿(mào)易有限公司10;2004;8080234;\n', '2020-02-04;上海貿(mào)易有限公司11;2001;1511492;\n', '2020-02-04;上海貿(mào)易有限公司12;2001;5644331;\n', '2020-02-04;上海貿(mào)易有限公司13;2001;5638310;\n', '2020-02-04;上海貿(mào)易有限公司14;2001;2499542;\n', '2020-02-04;上海貿(mào)易有限公司15;2004;3782315;\n', '2020-02-04;上海貿(mào)易有限公司16;2001;5187995;\n', '2020-02-04;上海貿(mào)易有限公司17;2004;2797247;\n', '2020-02-04;上海貿(mào)易有限公司18;2004;7077140;\n', '2020-02-04;上海貿(mào)易有限公司19;2004;5229405;\n', '2020-02-04;上海貿(mào)易有限公司20;2001;3052820;\n', '2020-02-04;上海貿(mào)易有限公司21;2001;2084866;\n', '2020-02-04;上海貿(mào)易有限公司22;2001;6490240;\n', '2020-02-04;上海貿(mào)易有限公司23;2001;5979480;\n', '2020-02-04;上海貿(mào)易有限公司24;2004;9088521;\n',
...
]
總結(jié)一下
有了遠(yuǎn)端文件的生成器對(duì)象,不必?fù)?dān)心文件太大內(nèi)存不足的問題,也不再需要先下載文件到本地,對(duì)于大型文件可以節(jié)約很長的時(shí)間以及本地儲(chǔ)存空間。
還可以做一系列的其余操作,比如對(duì)每行數(shù)據(jù)進(jìn)行數(shù)據(jù)預(yù)處理后再yield,比如指定讀取行數(shù),比如按行將遠(yuǎn)端文件拆分到本地等等。
這些都可借助next()方法調(diào)用生成器對(duì)象進(jìn)行靈活的操作。