問題1--如何將Elasticsearch中千萬數(shù)據(jù)保存到本地文件

問題: 工作中,需要將Elasticsearch中幾千萬的數(shù)據(jù)下載到本地磁盤中,

1. 先借助游標(biāo),將所有結(jié)果數(shù)據(jù)存儲(chǔ)到內(nèi)存中
2. 然后將內(nèi)存中結(jié)果數(shù)據(jù)寫入到磁盤,也就是文件中
3. 因?yàn)閹浊f條數(shù)據(jù)過大,所以我們?cè)谔幚淼臅r(shí)候就需要先將緩存在內(nèi)存中的數(shù)據(jù)寫入到磁盤中,然后將緩存清空,繼續(xù)讀取下一段數(shù)據(jù)(我們?cè)趯懭氲酱疟P中需要注意不能覆蓋之前寫的文件,選擇模式應(yīng)該是追加讀寫)

解決:

import csv

from elasticsearch import Elasticsearch

es = Elasticsearch(hosts="218.22.29.213", port=59200, timeout=200)
# 1. 先借助游標(biāo),將所有結(jié)果數(shù)據(jù)存儲(chǔ)到內(nèi)存中
# 2. 然后將內(nèi)存中的結(jié)果數(shù)據(jù)寫入到磁盤,也就是文件中
query1 = {
    "size": 10000
}
query = es.search(index="passdata_20190318", doc_type="epolice", scroll='5m', body=query1)
value = query["hits"]["hits"]

# es查詢出的結(jié)果第一頁
results = query['hits']['hits']
# es查詢出的結(jié)果總量
total = query['hits']['total']
# 游標(biāo)用于輸出es查詢出的所有結(jié)果
scroll_id = query['_scroll_id']
# 在發(fā)送查詢請(qǐng)求的時(shí)候,就告訴ES需要使用游標(biāo),并定義每次返回?cái)?shù)據(jù)量的大小
# 定義一個(gè)list變量results用來存儲(chǔ)數(shù)據(jù)結(jié)果,在代碼中,可以另其為空list,即results=[],也可以先將返回結(jié)果
# 的第一頁存盡進(jìn)來, 即results = query['hits']['hits']
# 對(duì)于所有二級(jí)結(jié)果數(shù)據(jù)寫個(gè)分頁加載到內(nèi)存變量的循環(huán)
task = 1
for i in range(0, int(total / 100) + 1):
    # scroll參數(shù)必須制定否則會(huì)報(bào)錯(cuò)
    query_scroll = es.scroll(scroll_id=scroll_id, scroll="5m")['hits']['hits']
    results += query_scroll
    # 導(dǎo)出到Excel文件中
    # 下述函數(shù)模式有(1. 文件名 ,mode 模式)
    # -- w write,r ride ,a append
    # 1. w 只能操作寫入    r 只能讀取  a向文件追加
    # 2. w+ 只讀可寫       r+ 可讀可寫   a+可讀可追加
    # 3. wb+ 寫入進(jìn)制數(shù)據(jù)
    # 4. w模式打開文件,如果文件中有數(shù)據(jù),再次寫入內(nèi)容,會(huì)把原來的覆蓋掉
    # 我們這里需要保留之前寫入的數(shù)據(jù),所以模式選擇的是'a+'可讀可追加
    with open("D://ml/passdata_20190318.csv", 'a+', newline='', encoding="utf-8") as flow:
        # 獲取_source 下的所有字段名
        names = results[0]['_source'].keys()
        csv_writer = csv.writer(flow)
        csv_writer.writerow(names)
        for res in results:
            csv_writer.writerow(res['_source'].values())
        # 將內(nèi)存中的數(shù)據(jù)沖刷到文件中
        flow.flush()
    results = []
    print("done!" + "---" + str(task))
    task += 1
# # 導(dǎo)出到txt文本中
# file_handle = open("D://ml/data.txt", 'w', newline='', encoding="gbk")
# # 上述函數(shù)模式有(1. 文件名 ,mode 模式)
# # 1. w 只能操作寫入    r 只能讀取  a向文件追加
# # 2. w+ 只讀可寫       r+ 可讀可寫   a+可讀可追加
# # 3. wb+ 寫入進(jìn)制數(shù)據(jù)
# # 4. w模式打開文件,如果而文件中有數(shù)據(jù),再次寫入內(nèi)容,會(huì)把原來的覆蓋掉
#
# for res in results:
#     file_handle.write(str(res['_source'].values()))
#     file_handle.write("\r\n")

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容