人生苦短,我用Python!
情景描述:
當(dāng)我們在處理數(shù)據(jù)的時候,尤其對大量的文本數(shù)據(jù)或是大量的獨立的數(shù)據(jù)處理的時候,如果是用單線程的方式處理的話可能會耗費大量的時間,大量到你會哭的那種。比如我現(xiàn)在有2000萬條文本數(shù)據(jù),我現(xiàn)在想將其按照正則抽取不同的規(guī)則模式,并使用分詞軟件分詞,然后調(diào)用模型處理。在python編程的條件下,當(dāng)你readlines以后一條一條處理的話,我按照之前的實驗估算的話可能需要一天多,這個時候等不了果斷kill掉進程,然后使用多線程的方式來進行處理。比如我們單位的服務(wù)器有48個核心。所以理論上能將速度提升48倍,也就是在半個小時就能完成處理,吃個飯就能得出結(jié)果。這部分的解決辦法和示例代碼已經(jīng)在并行處理給出了。在這個實例中我們可以控制使用的CPU核數(shù),不會對服務(wù)器造成過大的壓力,歸根結(jié)底還是要控制后臺同時運行的線程數(shù)目。我之前遇到過一個項目,需要后臺運行差不多1000個線程,這樣服務(wù)器肯定受不了,所以我會將1000個線程分開寫到5-10個bash文件中,分別調(diào)用。每次還得自己去手動的開始,所以在這里利用兩個命令來對每一個后臺線程分配到一個具體的cpu,這樣就能保證資源的可控。
思路和用到的語言:
方案一:每一個shell命令,使用linux的fgrep的命令和taskset分別為每一個后臺運行的進程指定運行的CPU,由于Linux內(nèi)核在任務(wù)調(diào)度的時候需要考慮到進程的優(yōu)先級和空閑的CPU,實際在執(zhí)行的時候是在不同的核上分時進行的。而taskset則是可以直接指定運行的CPU。
遇到的問題,當(dāng)后臺需要運行的命令少的時候可以,但是當(dāng)你一個非管理員用戶在同時大量fork線程的時候,系統(tǒng)會限制用戶最大能開啟的線程,并且這個時候好多的線程擠在同一個CPU上,造成了線程的掛起。所以這個方案適合使用在需要同時執(zhí)行的線程數(shù)不是特別多的時候。我實際的測試在線程數(shù)超過2000的時候,會出現(xiàn)資源限制。
python script_1.py param1 param2 .... &
taskset -cp 0-max_cpus $!
python script_2.py param1 param2 ... &
taskset -cp 0-max_cpus $!
... 中間是同樣的調(diào)用shell腳本 ...
python script_n.py param1 param2 ... &
taskset -cp 0-max_cpus $!
上面的腳本文件展示了用方案一的時候,在這里每一個python調(diào)用的shell命令可以是相同的,也可以是不同的。后面的跟的參數(shù)可以看作是一組實驗的參數(shù)設(shè)置,最末尾的& 符號表示將這個命令掛到后臺執(zhí)行。第二行的taskset是linux將特定的程序放到某個/某些CPU上的命令。當(dāng)Linux運行程序的時候,會為當(dāng)前的程序分配一個pid,就跟給線程一個身份證號一樣。可以使用taskset -p pid 來看pid對應(yīng)的程序分配到了哪個CPU上,一般是分配到全部的CPU核上然后操作系統(tǒng)在后臺調(diào)度。使用taskset -cp 0 $! 就能將線程分配到CPU的第一個核上。在這個命令中可以指定特定的CPU,也可以使用逗號分隔的CPU列表,或是中橫線表示的一個范圍。后面的$!表示的是上一個線程的pid。
taskset -pc 0,1,2,5-10 $!
這個命令表示的是將上一個程序分配到{0,1,2,5,6,7,8,9,10}這些CPU上進行自由調(diào)度,這樣就能限制CPU的使用。
方案二:使用上一個方法的時候,當(dāng)線程比較多,系統(tǒng)的資源會出現(xiàn)暫時無法使用的情況并且后臺一下掛起很多的線程,對系統(tǒng)的調(diào)度可能也不是很好,這里就使用通過監(jiān)測當(dāng)前運行腳本的數(shù)目來特定分配,實現(xiàn)同一個時刻只有一個后臺命令運行在特定的CPU核上。
import sys
import os, subprocess
import time
from collections import deque
import constant
def getThreadsNum(thread_name):
thread = subprocess.Popen("ps -aux | grep {thread_name}".format(
thread_name=thread_name
), shell=True, stdout=subprocess.PIPE)
out = thread.stdout.readlines()
return len(out) - 1
def runShell(cmd, cpu_index):
"""將shell命令掛到后臺執(zhí)行,并指定其CPU"""
# 這里一定要注意,
cmd_query = cmd.split(">")[0]
cmd_query = " ".join([item for item in cmd_query.split(" ") if item])
# 運行shell
subprocess.run(cmd, shell=True)
# 查找其pid
p = subprocess.Popen('pgrep -f "{name}"'.format(
name=cmd_query,
), shell=True, stdout=subprocess.PIPE)
out = p.stdout.readlines()
# 這里面需要選擇pid小的那個
pid = min([int(item) for item in out])
# 這里stdout讓程序自己來輸出,可以監(jiān)測進度
subprocess.run("taskset -cp {cpu_index} {pid}".format(
pid=pid,
cpu_index=cpu_index
), shell=True)
def runAll(sh_file):
"""運行全部"""
with open(sh_file) as f:
lines = [line.strip() for line in f.readlines() if line[0] != '#']
quene = deque(lines)
cpu_index = 0
while len(quene) != 0:
thread_num = getThreadsNum("predict_trend")
# 將剩余的cpu填滿線程
for i in range(0, constant.MAX_CPU - thread_num):
# 到最后可能就只剩幾個沒有執(zhí)行的了,所以這里需要實時的判斷隊列是不是為空
if len(quene) == 0:
break
cmd = quene.popleft()
runShell(cmd, cpu_index)
# cpu_index 的選擇
cpu_index += 1
if cpu_index == constant.MAX_CPU:
cpu_index = 0
time.sleep(1)
print("全部執(zhí)行完畢!")
if __name__ == "__main__":
params = sys.argv
shell_path = params[1]
runAll(shell_path)
上面的pyhton代碼就是最終的解決方案,在runAll函數(shù)中,通過將所有的shell腳本中命令放到一個隊列中(當(dāng)然也可以不用隊列),然后通過監(jiān)測當(dāng)前的系統(tǒng)中運行的腳本的數(shù)目,得出空閑的CPU核數(shù),然后將這些核填滿,相信大家自己看代碼就會明白了。最后,最重要的一點就是,使用pgrep -f 這個命令的時候,后面的name需要將多個空格合并成一個空格,一般會返回兩個pid,一個是我們后臺運行的程序,一個就是這個命令本身也包含了,我在操作的時候,由于沒有注意到多個空格的問題,得到的總是pgrep 的pid,造成將它分配到了CPU核上,而沒有真正的將cmd分配,其他的地方就沒有什么難處了。

當(dāng)然這只是一個簡單的原型,可以從我運行的圖中看出來,服務(wù)器有48個核,我在這里設(shè)定的是使用其中的前35個核干活,這樣剩余的核可以用來干別的事情,并且當(dāng)taskset成功執(zhí)行的時候,都會進行輸出。
總結(jié)
在處理數(shù)據(jù)的時候,要么你去學(xué)mapreduce技術(shù),要么就一定要掌握linux的一些命令,比如awk,join, grep, split等命令和管道技術(shù),然后配合python的靈活性,就會有你想不到的特殊效果。