Python大型文件數據讀取及并行高效寫入MongoDB代碼分享

記錄最近一個大文件數據處理入庫任務,需求大致是這樣:

數據處理需求:

需要定期拉取一個千萬行級的大型文件,文件內容是按照指定分隔符進行分割的數據表內容.

  • 每行結束按照\n換行,類似下面這種格式:
1;female;Micky;19746;
2;male;Tom;749573;
3;male;Bob;465926;
...
  • 字段名分別為:id、gender、name、code

分析思路

在進行代碼編寫之前,先整理思路,需要考慮的問題。
整體思路倒是簡單:讀取文件-->將每行轉換key:value字典格式-->插入MongoDB;
但是這里不得不考慮兩個問題,內存問題、效率問題。

  • 大文件一般不能直接讀取到內存,也許你的開發(fā)機器足夠好能剛好承受,但你的代碼如果直接放到生產上,可能直接就讀取失敗或者讓生產機器的內存爆掉了。。
  • 寫入效率第一點考慮,在insert_one和insert_many中必然是優(yōu)先選用insert_many,即一次將一個列表的dict數據插入到MongoDB,這樣的效率遠高于逐條遍歷的insert_one。
  • 寫入效率第二點考慮,需考慮將數據并發(fā)寫入MongoDB,具體究竟該使用協(xié)程、多線程還是多進程的方式呢?我們后面再說。

我的整體代碼在最后部分,不想看中間啰嗦部分的可直接跳到最后~

本地數據庫測試效率:1千萬行數據,68秒寫入完成

1. 生成器方式讀取大文件

自定義一個生成器函數如下,使用next()方法每次可獲取一行文件數據。
每行數據需要轉換為字典格式返回,以便于可直接插如MongoDB。

FILEDS = ["id", "gender", "name", "code"]
def gen_file(filepath):
    """將大型文件轉化為生成器對象,每次返回一行對應的字典格式數據"""
   with open(filepath, "r", encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            filed_list = line.split(";")[:-1]
            doc_dict = {k: v for k, v in zip(FILEDS, filed_list)
            yield doc_dict

可使用如下方式獲取指定一行數據:

# example
gen = gen_file("./example_file.dat")
# 獲取一行字典數據
doc = next(gen)

2.將數據批量寫入MongoDB

每次讀取1000行數據,使用insert_many批量插入MongoDB,你也可以自定義一次插入多少數據量。最好不要使用insert_one一次插入一條數據,數據量大時效率非常低下。
代碼示例:

def insert_mongo():
    # 連接到指定MongoDB集合
    client = MongoClient(host=MONGO_HOST, 
    port=MONGO_PORT)
    coll = client[MONGO_DB][MONGO_COLLECTION]
     # 獲取大文件生成器對象
    gen =gen_file("example_file.dat")
    # 循環(huán)讀取生成器,一次寫入1000條文檔到MongoDB,直到生成器讀取完全
    while True:
        docs = []
        try:
            for i in range(1000):
                doc = next(sp_gen)
                docs.append(doc)
        except StopIteration:
            break
        finally:
            coll.insert_many(docs)

以上就做到了讀取任意大小文件并相對高效的將每行數據寫入到MongoDB中。不過這只是一個單線程的任務,若還想要更高效的提高寫入效率,則需要設計并行讀取寫入的程序邏輯。

3. 如何并行將數據批量寫入MongoDB?

在這一步我借助生成器做了多線程和協(xié)程寫入的測試,也不再詳述測試代碼了,得出的結果是,使用多線程或協(xié)程在批量寫入MongoDB時,并未有太多效率的提升,我猜想原因有二:一是Python的多線程本來由于GIL鎖的存在是無法做到真正的并行的,二是MongoDB是NoSQL數據庫本身寫入的效率就很高了,在非并發(fā)的情況下并不能真正明顯提升效率。

因此,看來想要在此基礎上成倍提升寫入效率只能采用并行,即多進程的方式才行。但是使用多進程有一點必須要注意的是,多進程通常是無法共享資源的,一個生成器對象無法互相讀取。
解決思路

  • 按行拆分原始大文件,將原本的大文件拆分為多個小文件
  • 再將每個小文件轉換為生成器對象
  • 使用進程池讀取小文件生成器對象,并行批量寫入MonoDB

4. 拆分小文件

以下我借助我的部分代碼提供一個大致邏輯,但不再每個函數進行詳細代碼說明:

# 將大文件按指定行數切分為多個小文件,默認按照100w行進行拆分
filepath = "./test_dat.DAT"
splitpath = "./split_directory"
def split_file(self, sp_nm=1000000):
    # 以原文件名為基礎名,拆分小文件以為_1、_2模式命名,
    base_path = filepath.replace(".", "_{}.")
    # 讀取大文件為逐行返回的生成器對象
    gen = self.__gen_file()
    # 創(chuàng)建拆分小文件存放的目錄
    self.__mk_sp_dir()
    flag = 1
    # 循環(huán)內每次讀取生成器對象最高100w行寫入小文件,直到完全讀取觸發(fā)StopIteration異常退出循環(huán)
    while True:
        split_name = base_path.format(str(flag))
        try:
            with open("%s/%s" % (splitpath, split_name), "w", encoding="utf8") as f:
                for i in range(sp_nm):
                    line = next(gen)
                    f.write(line)
            flag += 1
        except StopIteration:
            break
    print("Finished! Split into %s files in total." % flag)


# 獲取拆分文件路徑列表
def sp_file_lists(self):
    """獲取生成器列表"""
    abspath = os.path.abspath(splitpath) + "/"
    sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(splitpath)))
    return sp_filepath_list

5. 多進程讀取拆分文件寫入MongoDB

代碼示例:

from multiprocessing import Pool

sp_filepath_list = handler.sp_file_lists()
p = Pool()
for file in sp_filepath_list:
    p.apply_async(insert_mongo, (file,))
print("----start----")
p.close()
p.join()
print("----finished----")

最后:完整代碼分享

主邏輯部分完成代碼分享在下面,希望對需要的人有幫助,代碼包含生成器函數、文件拆分、批量插入MongoDB、文件清理等方法。使用多進程方式,寫入效率按進程多少幾乎成倍增加。

import os
import time
from multiprocessing import Pool
# 可配置的進程池大小、數據字段列表、小文件拆分行數標準
from config import PROCESS_POOL, INSERT_MANY_COUNT, COLUMNS, SPLIT_LINES
# 自定義的MongoDB連接類
from mongo_client import DBManager


class BigFileToMongoDB(object):
    # 初始化指定大文件路徑及拆分結果目錄,默認拆分至當前路徑下的split_directory文件夾下
    def __init__(self, filepath, splitpath="./split_directory"):
        self.filepath = filepath
        self.splitpath = splitpath

    # 將大型文件轉換為生成器,每次返回一行數據
    def __gen_file(self):
        """將大型文件轉化為生成器對象,每次返回一行"""
        with open(self.filepath, "r", encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                yield line

    def __mk_sp_dir(self):
        """創(chuàng)建拆分文件存放目錄,split_directory"""
        if not os.path.exists(self.splitpath):
            os.mkdir(self.splitpath)

    # 將大文件按指定行數切分為多個小文件
    def split_file(self, sp_nm=1000000):
        """讀取大文件生成器,默認每100w行拆分一個新的文件寫入"""
        if SPLIT_LINES is not None:
            sp_nm = SPLIT_LINES
        base_path = self.filepath.replace(".", "_{}.")
        gen = self.__gen_file()
        self.__mk_sp_dir()
        flag = 1
        while True:
            split_name = base_path.format(str(flag))
            try:
                with open("%s/%s" % (self.splitpath, split_name), "w", encoding="utf8") as f:
                    for i in range(sp_nm):
                        line = next(gen)
                        f.write(line)
                flag += 1
            except StopIteration:
                break
        print("Finished! Split into %s files in total." % flag)

    @staticmethod
    def spfile_generator(sp_filepath):
        """將拆分后的文件轉換為字典生成器
        針對不同格式文件需要不同的處理函數
        """
        with open(sp_filepath, "r", encoding="utf-8") as f:
            while True:
                line = f.readline()
                line_list = line.split(";")[:-1]
                dic = {i: j for i, j in zip(COLUMNS, line_list)}
                if not line:
                    break
                yield dic

    @property
    def sp_file_lists(self):
        """獲取所有拆分文件絕對路徑列表"""
        abspath = os.path.abspath(self.splitpath) + "/"
        sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(self.splitpath)))
        return sp_filepath_list

    @staticmethod
    def insert_mongo(filepath):
        sp_gen = BigFileToMongoDB.spfile_generator(filepath)
        coll = DBManager()
        coll.connect()
        while True:
            docs = []
            try:
                for i in range(INSERT_MANY_COUNT):
                    doc = next(sp_gen)
                    docs.append(doc)
                coll.insert_many(docs)
            except StopIteration:
                break
        print("生成器元素已寫入MongoDB")

    def clean_split_dir(self):
        for split_file in self.sp_file_lists:
            os.remove(split_file)
        print("清理完成")


def run_insert_pool(file_path):
    start = time.time()
    handler = BigFileToMongoDB(file_path)
    print("開始切分源文件")
    handler.split_file()
    sp_filepath_list = handler.sp_file_lists
    p = Pool(PROCESS_POOL)
    for file in sp_filepath_list:
        p.apply_async(handler.insert_mongo, (file,))

    print("----start----")
    p.close()
    p.join()
    end = time.time()
    print("Finish to MongoDB spend:{}s".format(end - start))
    handler.clean_split_dir()


if __name__ == '__main__':
    file_path = "test_dat.DAT"
    run_insert_pool(file_path)

以上。
所有文件的讀取都使用生成器的方式,可以避免內存不足的問題。
本地數據庫測試效率:1千萬行數據,第二次76秒寫入完成
當前代碼只是demo,有很多不規(guī)范中英文混合打印請見諒,??。

屏幕快照 2020-02-03

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容