記錄最近一個大文件數據處理入庫任務,需求大致是這樣:
數據處理需求:
需要定期拉取一個千萬行級的大型文件,文件內容是按照指定分隔符進行分割的數據表內容.
- 每行結束按照
\n換行,類似下面這種格式:
1;female;Micky;19746;
2;male;Tom;749573;
3;male;Bob;465926;
...
- 字段名分別為:
id、gender、name、code
分析思路
在進行代碼編寫之前,先整理思路,需要考慮的問題。
整體思路倒是簡單:讀取文件-->將每行轉換key:value字典格式-->插入MongoDB;
但是這里不得不考慮兩個問題,內存問題、效率問題。
- 大文件一般不能直接讀取到內存,也許你的開發(fā)機器足夠好能剛好承受,但你的代碼如果直接放到生產上,可能直接就讀取失敗或者讓生產機器的內存爆掉了。。
- 寫入效率第一點考慮,在insert_one和insert_many中必然是優(yōu)先選用insert_many,即一次將一個列表的dict數據插入到MongoDB,這樣的效率遠高于逐條遍歷的insert_one。
- 寫入效率第二點考慮,需考慮將數據并發(fā)寫入MongoDB,具體究竟該使用協(xié)程、多線程還是多進程的方式呢?我們后面再說。
我的整體代碼在最后部分,不想看中間啰嗦部分的可直接跳到最后~
本地數據庫測試效率:1千萬行數據,68秒寫入完成
1. 生成器方式讀取大文件
自定義一個生成器函數如下,使用next()方法每次可獲取一行文件數據。
每行數據需要轉換為字典格式返回,以便于可直接插如MongoDB。
FILEDS = ["id", "gender", "name", "code"]
def gen_file(filepath):
"""將大型文件轉化為生成器對象,每次返回一行對應的字典格式數據"""
with open(filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
if not line:
break
filed_list = line.split(";")[:-1]
doc_dict = {k: v for k, v in zip(FILEDS, filed_list)
yield doc_dict
可使用如下方式獲取指定一行數據:
# example
gen = gen_file("./example_file.dat")
# 獲取一行字典數據
doc = next(gen)
2.將數據批量寫入MongoDB
每次讀取1000行數據,使用insert_many批量插入MongoDB,你也可以自定義一次插入多少數據量。最好不要使用insert_one一次插入一條數據,數據量大時效率非常低下。
代碼示例:
def insert_mongo():
# 連接到指定MongoDB集合
client = MongoClient(host=MONGO_HOST,
port=MONGO_PORT)
coll = client[MONGO_DB][MONGO_COLLECTION]
# 獲取大文件生成器對象
gen =gen_file("example_file.dat")
# 循環(huán)讀取生成器,一次寫入1000條文檔到MongoDB,直到生成器讀取完全
while True:
docs = []
try:
for i in range(1000):
doc = next(sp_gen)
docs.append(doc)
except StopIteration:
break
finally:
coll.insert_many(docs)
以上就做到了讀取任意大小文件并相對高效的將每行數據寫入到MongoDB中。不過這只是一個單線程的任務,若還想要更高效的提高寫入效率,則需要設計并行讀取寫入的程序邏輯。
3. 如何并行將數據批量寫入MongoDB?
在這一步我借助生成器做了多線程和協(xié)程寫入的測試,也不再詳述測試代碼了,得出的結果是,使用多線程或協(xié)程在批量寫入MongoDB時,并未有太多效率的提升,我猜想原因有二:一是Python的多線程本來由于GIL鎖的存在是無法做到真正的并行的,二是MongoDB是NoSQL數據庫本身寫入的效率就很高了,在非并發(fā)的情況下并不能真正明顯提升效率。
因此,看來想要在此基礎上成倍提升寫入效率只能采用并行,即多進程的方式才行。但是使用多進程有一點必須要注意的是,多進程通常是無法共享資源的,一個生成器對象無法互相讀取。
解決思路:
- 按行拆分原始大文件,將原本的大文件拆分為多個小文件
- 再將每個小文件轉換為生成器對象
- 使用進程池讀取小文件生成器對象,并行批量寫入MonoDB
4. 拆分小文件
以下我借助我的部分代碼提供一個大致邏輯,但不再每個函數進行詳細代碼說明:
# 將大文件按指定行數切分為多個小文件,默認按照100w行進行拆分
filepath = "./test_dat.DAT"
splitpath = "./split_directory"
def split_file(self, sp_nm=1000000):
# 以原文件名為基礎名,拆分小文件以為_1、_2模式命名,
base_path = filepath.replace(".", "_{}.")
# 讀取大文件為逐行返回的生成器對象
gen = self.__gen_file()
# 創(chuàng)建拆分小文件存放的目錄
self.__mk_sp_dir()
flag = 1
# 循環(huán)內每次讀取生成器對象最高100w行寫入小文件,直到完全讀取觸發(fā)StopIteration異常退出循環(huán)
while True:
split_name = base_path.format(str(flag))
try:
with open("%s/%s" % (splitpath, split_name), "w", encoding="utf8") as f:
for i in range(sp_nm):
line = next(gen)
f.write(line)
flag += 1
except StopIteration:
break
print("Finished! Split into %s files in total." % flag)
# 獲取拆分文件路徑列表
def sp_file_lists(self):
"""獲取生成器列表"""
abspath = os.path.abspath(splitpath) + "/"
sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(splitpath)))
return sp_filepath_list
5. 多進程讀取拆分文件寫入MongoDB
代碼示例:
from multiprocessing import Pool
sp_filepath_list = handler.sp_file_lists()
p = Pool()
for file in sp_filepath_list:
p.apply_async(insert_mongo, (file,))
print("----start----")
p.close()
p.join()
print("----finished----")
最后:完整代碼分享
主邏輯部分完成代碼分享在下面,希望對需要的人有幫助,代碼包含生成器函數、文件拆分、批量插入MongoDB、文件清理等方法。使用多進程方式,寫入效率按進程多少幾乎成倍增加。
import os
import time
from multiprocessing import Pool
# 可配置的進程池大小、數據字段列表、小文件拆分行數標準
from config import PROCESS_POOL, INSERT_MANY_COUNT, COLUMNS, SPLIT_LINES
# 自定義的MongoDB連接類
from mongo_client import DBManager
class BigFileToMongoDB(object):
# 初始化指定大文件路徑及拆分結果目錄,默認拆分至當前路徑下的split_directory文件夾下
def __init__(self, filepath, splitpath="./split_directory"):
self.filepath = filepath
self.splitpath = splitpath
# 將大型文件轉換為生成器,每次返回一行數據
def __gen_file(self):
"""將大型文件轉化為生成器對象,每次返回一行"""
with open(self.filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
if not line:
break
yield line
def __mk_sp_dir(self):
"""創(chuàng)建拆分文件存放目錄,split_directory"""
if not os.path.exists(self.splitpath):
os.mkdir(self.splitpath)
# 將大文件按指定行數切分為多個小文件
def split_file(self, sp_nm=1000000):
"""讀取大文件生成器,默認每100w行拆分一個新的文件寫入"""
if SPLIT_LINES is not None:
sp_nm = SPLIT_LINES
base_path = self.filepath.replace(".", "_{}.")
gen = self.__gen_file()
self.__mk_sp_dir()
flag = 1
while True:
split_name = base_path.format(str(flag))
try:
with open("%s/%s" % (self.splitpath, split_name), "w", encoding="utf8") as f:
for i in range(sp_nm):
line = next(gen)
f.write(line)
flag += 1
except StopIteration:
break
print("Finished! Split into %s files in total." % flag)
@staticmethod
def spfile_generator(sp_filepath):
"""將拆分后的文件轉換為字典生成器
針對不同格式文件需要不同的處理函數
"""
with open(sp_filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
line_list = line.split(";")[:-1]
dic = {i: j for i, j in zip(COLUMNS, line_list)}
if not line:
break
yield dic
@property
def sp_file_lists(self):
"""獲取所有拆分文件絕對路徑列表"""
abspath = os.path.abspath(self.splitpath) + "/"
sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(self.splitpath)))
return sp_filepath_list
@staticmethod
def insert_mongo(filepath):
sp_gen = BigFileToMongoDB.spfile_generator(filepath)
coll = DBManager()
coll.connect()
while True:
docs = []
try:
for i in range(INSERT_MANY_COUNT):
doc = next(sp_gen)
docs.append(doc)
coll.insert_many(docs)
except StopIteration:
break
print("生成器元素已寫入MongoDB")
def clean_split_dir(self):
for split_file in self.sp_file_lists:
os.remove(split_file)
print("清理完成")
def run_insert_pool(file_path):
start = time.time()
handler = BigFileToMongoDB(file_path)
print("開始切分源文件")
handler.split_file()
sp_filepath_list = handler.sp_file_lists
p = Pool(PROCESS_POOL)
for file in sp_filepath_list:
p.apply_async(handler.insert_mongo, (file,))
print("----start----")
p.close()
p.join()
end = time.time()
print("Finish to MongoDB spend:{}s".format(end - start))
handler.clean_split_dir()
if __name__ == '__main__':
file_path = "test_dat.DAT"
run_insert_pool(file_path)
以上。
所有文件的讀取都使用生成器的方式,可以避免內存不足的問題。
本地數據庫測試效率:1千萬行數據,第二次76秒寫入完成
當前代碼只是demo,有很多不規(guī)范中英文混合打印請見諒,??。
