問題: 工作中,需要將Elasticsearch中幾千萬的數(shù)據(jù)下載到本地磁盤中,
1. 先借助游標(biāo),將所有結(jié)果數(shù)據(jù)存儲(chǔ)到內(nèi)存中
2. 然后將內(nèi)存中結(jié)果數(shù)據(jù)寫入到磁盤,也就是文件中
3. 因?yàn)閹浊f條數(shù)據(jù)過大,所以我們?cè)谔幚淼臅r(shí)候就需要先將緩存在內(nèi)存中的數(shù)據(jù)寫入到磁盤中,然后將緩存清空,繼續(xù)讀取下一段數(shù)據(jù)(我們?cè)趯懭氲酱疟P中需要注意不能覆蓋之前寫的文件,選擇模式應(yīng)該是追加讀寫)
解決:
import csv
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts="218.22.29.213", port=59200, timeout=200)
# 1. 先借助游標(biāo),將所有結(jié)果數(shù)據(jù)存儲(chǔ)到內(nèi)存中
# 2. 然后將內(nèi)存中的結(jié)果數(shù)據(jù)寫入到磁盤,也就是文件中
query1 = {
"size": 10000
}
query = es.search(index="passdata_20190318", doc_type="epolice", scroll='5m', body=query1)
value = query["hits"]["hits"]
# es查詢出的結(jié)果第一頁
results = query['hits']['hits']
# es查詢出的結(jié)果總量
total = query['hits']['total']
# 游標(biāo)用于輸出es查詢出的所有結(jié)果
scroll_id = query['_scroll_id']
# 在發(fā)送查詢請(qǐng)求的時(shí)候,就告訴ES需要使用游標(biāo),并定義每次返回?cái)?shù)據(jù)量的大小
# 定義一個(gè)list變量results用來存儲(chǔ)數(shù)據(jù)結(jié)果,在代碼中,可以另其為空list,即results=[],也可以先將返回結(jié)果
# 的第一頁存盡進(jìn)來, 即results = query['hits']['hits']
# 對(duì)于所有二級(jí)結(jié)果數(shù)據(jù)寫個(gè)分頁加載到內(nèi)存變量的循環(huán)
task = 1
for i in range(0, int(total / 100) + 1):
# scroll參數(shù)必須制定否則會(huì)報(bào)錯(cuò)
query_scroll = es.scroll(scroll_id=scroll_id, scroll="5m")['hits']['hits']
results += query_scroll
# 導(dǎo)出到Excel文件中
# 下述函數(shù)模式有(1. 文件名 ,mode 模式)
# -- w write,r ride ,a append
# 1. w 只能操作寫入 r 只能讀取 a向文件追加
# 2. w+ 只讀可寫 r+ 可讀可寫 a+可讀可追加
# 3. wb+ 寫入進(jìn)制數(shù)據(jù)
# 4. w模式打開文件,如果文件中有數(shù)據(jù),再次寫入內(nèi)容,會(huì)把原來的覆蓋掉
# 我們這里需要保留之前寫入的數(shù)據(jù),所以模式選擇的是'a+'可讀可追加
with open("D://ml/passdata_20190318.csv", 'a+', newline='', encoding="utf-8") as flow:
# 獲取_source 下的所有字段名
names = results[0]['_source'].keys()
csv_writer = csv.writer(flow)
csv_writer.writerow(names)
for res in results:
csv_writer.writerow(res['_source'].values())
# 將內(nèi)存中的數(shù)據(jù)沖刷到文件中
flow.flush()
results = []
print("done!" + "---" + str(task))
task += 1
# # 導(dǎo)出到txt文本中
# file_handle = open("D://ml/data.txt", 'w', newline='', encoding="gbk")
# # 上述函數(shù)模式有(1. 文件名 ,mode 模式)
# # 1. w 只能操作寫入 r 只能讀取 a向文件追加
# # 2. w+ 只讀可寫 r+ 可讀可寫 a+可讀可追加
# # 3. wb+ 寫入進(jìn)制數(shù)據(jù)
# # 4. w模式打開文件,如果而文件中有數(shù)據(jù),再次寫入內(nèi)容,會(huì)把原來的覆蓋掉
#
# for res in results:
# file_handle.write(str(res['_source'].values()))
# file_handle.write("\r\n")
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。