CPU密集型任務(wù)并發(fā)與資源控制

人生苦短,我用Python!

情景描述:

當(dāng)我們在處理數(shù)據(jù)的時候,尤其對大量的文本數(shù)據(jù)或是大量的獨立的數(shù)據(jù)處理的時候,如果是用單線程的方式處理的話可能會耗費大量的時間,大量到你會哭的那種。比如我現(xiàn)在有2000萬條文本數(shù)據(jù),我現(xiàn)在想將其按照正則抽取不同的規(guī)則模式,并使用分詞軟件分詞,然后調(diào)用模型處理。在python編程的條件下,當(dāng)你readlines以后一條一條處理的話,我按照之前的實驗估算的話可能需要一天多,這個時候等不了果斷kill掉進程,然后使用多線程的方式來進行處理。比如我們單位的服務(wù)器有48個核心。所以理論上能將速度提升48倍,也就是在半個小時就能完成處理,吃個飯就能得出結(jié)果。這部分的解決辦法和示例代碼已經(jīng)在并行處理給出了。在這個實例中我們可以控制使用的CPU核數(shù),不會對服務(wù)器造成過大的壓力,歸根結(jié)底還是要控制后臺同時運行的線程數(shù)目。我之前遇到過一個項目,需要后臺運行差不多1000個線程,這樣服務(wù)器肯定受不了,所以我會將1000個線程分開寫到5-10個bash文件中,分別調(diào)用。每次還得自己去手動的開始,所以在這里利用兩個命令來對每一個后臺線程分配到一個具體的cpu,這樣就能保證資源的可控。

思路和用到的語言:

方案一:每一個shell命令,使用linux的fgrep的命令和taskset分別為每一個后臺運行的進程指定運行的CPU,由于Linux內(nèi)核在任務(wù)調(diào)度的時候需要考慮到進程的優(yōu)先級和空閑的CPU,實際在執(zhí)行的時候是在不同的核上分時進行的。而taskset則是可以直接指定運行的CPU。

遇到的問題,當(dāng)后臺需要運行的命令少的時候可以,但是當(dāng)你一個非管理員用戶在同時大量fork線程的時候,系統(tǒng)會限制用戶最大能開啟的線程,并且這個時候好多的線程擠在同一個CPU上,造成了線程的掛起。所以這個方案適合使用在需要同時執(zhí)行的線程數(shù)不是特別多的時候。我實際的測試在線程數(shù)超過2000的時候,會出現(xiàn)資源限制。

python script_1.py param1 param2 .... &
taskset -cp 0-max_cpus $!

python script_2.py param1 param2 ... &
taskset -cp 0-max_cpus $!
... 中間是同樣的調(diào)用shell腳本 ...
python script_n.py param1 param2 ... &
taskset -cp 0-max_cpus $!

上面的腳本文件展示了用方案一的時候,在這里每一個python調(diào)用的shell命令可以是相同的,也可以是不同的。后面的跟的參數(shù)可以看作是一組實驗的參數(shù)設(shè)置,最末尾的& 符號表示將這個命令掛到后臺執(zhí)行。第二行的taskset是linux將特定的程序放到某個/某些CPU上的命令。當(dāng)Linux運行程序的時候,會為當(dāng)前的程序分配一個pid,就跟給線程一個身份證號一樣。可以使用taskset -p pid 來看pid對應(yīng)的程序分配到了哪個CPU上,一般是分配到全部的CPU核上然后操作系統(tǒng)在后臺調(diào)度。使用taskset -cp 0 $! 就能將線程分配到CPU的第一個核上。在這個命令中可以指定特定的CPU,也可以使用逗號分隔的CPU列表,或是中橫線表示的一個范圍。后面的$!表示的是上一個線程的pid。

taskset -pc 0,1,2,5-10 $!

這個命令表示的是將上一個程序分配到{0,1,2,5,6,7,8,9,10}這些CPU上進行自由調(diào)度,這樣就能限制CPU的使用。

方案二:使用上一個方法的時候,當(dāng)線程比較多,系統(tǒng)的資源會出現(xiàn)暫時無法使用的情況并且后臺一下掛起很多的線程,對系統(tǒng)的調(diào)度可能也不是很好,這里就使用通過監(jiān)測當(dāng)前運行腳本的數(shù)目來特定分配,實現(xiàn)同一個時刻只有一個后臺命令運行在特定的CPU核上。

import sys
import os, subprocess
import time
from collections import deque
import constant


def getThreadsNum(thread_name):

    thread = subprocess.Popen("ps -aux | grep {thread_name}".format(
        thread_name=thread_name
    ), shell=True, stdout=subprocess.PIPE)
    out = thread.stdout.readlines()
    return len(out) - 1


def runShell(cmd, cpu_index):
    """將shell命令掛到后臺執(zhí)行,并指定其CPU"""
    # 這里一定要注意,
    cmd_query = cmd.split(">")[0]
    cmd_query = " ".join([item for item in cmd_query.split(" ") if item])
    # 運行shell
    subprocess.run(cmd, shell=True)

    # 查找其pid
    p = subprocess.Popen('pgrep -f "{name}"'.format(
        name=cmd_query,
    ), shell=True, stdout=subprocess.PIPE)

    out = p.stdout.readlines()

    # 這里面需要選擇pid小的那個
    pid = min([int(item) for item in out])
    # 這里stdout讓程序自己來輸出,可以監(jiān)測進度
    subprocess.run("taskset -cp {cpu_index} {pid}".format(
        pid=pid,
        cpu_index=cpu_index
    ), shell=True)



def runAll(sh_file):
    """運行全部"""

    with open(sh_file) as f:
        lines = [line.strip() for line in f.readlines() if line[0] != '#']
        quene = deque(lines)

    cpu_index = 0
    while len(quene) != 0:
        thread_num = getThreadsNum("predict_trend")
        # 將剩余的cpu填滿線程
        for i in range(0, constant.MAX_CPU - thread_num):
            # 到最后可能就只剩幾個沒有執(zhí)行的了,所以這里需要實時的判斷隊列是不是為空
            if len(quene) == 0:
                break
            cmd = quene.popleft()
            runShell(cmd, cpu_index)

            # cpu_index 的選擇
            cpu_index += 1
            if cpu_index == constant.MAX_CPU:
                cpu_index = 0

        time.sleep(1)

    print("全部執(zhí)行完畢!")

if __name__ == "__main__":
    params = sys.argv
    shell_path = params[1]
    runAll(shell_path)

上面的pyhton代碼就是最終的解決方案,在runAll函數(shù)中,通過將所有的shell腳本中命令放到一個隊列中(當(dāng)然也可以不用隊列),然后通過監(jiān)測當(dāng)前的系統(tǒng)中運行的腳本的數(shù)目,得出空閑的CPU核數(shù),然后將這些核填滿,相信大家自己看代碼就會明白了。最后,最重要的一點就是,使用pgrep -f 這個命令的時候,后面的name需要將多個空格合并成一個空格,一般會返回兩個pid,一個是我們后臺運行的程序,一個就是這個命令本身也包含了,我在操作的時候,由于沒有注意到多個空格的問題,得到的總是pgrep 的pid,造成將它分配到了CPU核上,而沒有真正的將cmd分配,其他的地方就沒有什么難處了。

最后的結(jié)果

當(dāng)然這只是一個簡單的原型,可以從我運行的圖中看出來,服務(wù)器有48個核,我在這里設(shè)定的是使用其中的前35個核干活,這樣剩余的核可以用來干別的事情,并且當(dāng)taskset成功執(zhí)行的時候,都會進行輸出。

總結(jié)

在處理數(shù)據(jù)的時候,要么你去學(xué)mapreduce技術(shù),要么就一定要掌握linux的一些命令,比如awk,join, grep, split等命令和管道技術(shù),然后配合python的靈活性,就會有你想不到的特殊效果。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容