python 多進(jìn)程(一)multiprocessing.Process

該文章基于 python3.7,部分功能只有python3.7能實(shí)現(xiàn)

一、 進(jìn)程模塊multiprocessing

多進(jìn)程可以實(shí)現(xiàn)多個(gè)程序的并行,充分利用計(jì)算機(jī)的資源,在不同的平臺(tái)/操作系統(tǒng)上,python實(shí)現(xiàn)多進(jìn)程的方式不同

在Unix/Linux 中,通過fork()調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。

Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

運(yùn)行結(jié)果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows沒有fork調(diào)用,上面的代碼在Windows上無法運(yùn)行,我們可以使用multiprocessing模塊,其封裝了底層復(fù)制進(jìn)程的過程,Unix 和 Windows 上都可以運(yùn)行。

根據(jù)不同的平臺(tái), multiprocessing 支持三種啟動(dòng)進(jìn)程的方法。

  • spawn
    父進(jìn)程啟動(dòng)一個(gè)新的Python解釋器進(jìn)程。子進(jìn)程只會(huì)繼承那些運(yùn)行進(jìn)程對(duì)象的 run() 方法所需的資源。特別是父進(jìn)程中非必須的文件描述符和句柄不會(huì)被繼承。相對(duì)于使用 fork 或者 forkserver,使用這個(gè)方法啟動(dòng)進(jìn)程相當(dāng)慢。
    可在Unix和Windows上使用。 Windows上的默認(rèn)設(shè)置。

  • fork
    父進(jìn)程使用 os.fork() 來產(chǎn)生 Python 解釋器分叉。子進(jìn)程在開始時(shí)實(shí)際上與父進(jìn)程相同。父進(jìn)程的所有資源都由子進(jìn)程繼承。請(qǐng)注意,安全分叉多線程進(jìn)程是棘手的。
    只存在于Unix。Unix中的默認(rèn)值。

  • forkserver
    程序啟動(dòng)并選擇* forkserver * 啟動(dòng)方法時(shí),將啟動(dòng)服務(wù)器進(jìn)程。從那時(shí)起,每當(dāng)需要一個(gè)新進(jìn)程時(shí),父進(jìn)程就會(huì)連接到服務(wù)器并請(qǐng)求它分叉一個(gè)新進(jìn)程。分叉服務(wù)器進(jìn)程是單線程的,因此使用 os.fork() 是安全的。沒有不必要的資源被繼承。
    可在Unix平臺(tái)上使用,支持通過Unix管道傳遞文件描述符。

二、進(jìn)程對(duì)象Process

進(jìn)程模塊multiprocessing中包含與進(jìn)程相關(guān)的異常、同步、通信等等相關(guān),其中Process封裝了進(jìn)程對(duì)象的相關(guān)API,是一個(gè)子進(jìn)程的物化實(shí)現(xiàn),封裝了子進(jìn)程狀態(tài)與管理相關(guān)功能。

  1. 如何創(chuàng)建一個(gè)子進(jìn)程對(duì)象
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

應(yīng)始終使用關(guān)鍵字參數(shù)調(diào)用構(gòu)造函數(shù),而不是 None, None, p1……這樣來傳入?yún)?shù),否則可能造成不可知錯(cuò)誤。

  • group 應(yīng)該始終是 None ;它僅用于兼容性考慮
  • target 傳入一個(gè)可調(diào)用對(duì)象。它默認(rèn)為 None,這里傳入的是子進(jìn)程的運(yùn)行函數(shù)。
  • name 是進(jìn)程名稱,僅僅具有標(biāo)識(shí)作用,并不會(huì)改變操作系統(tǒng)中的進(jìn)程名稱。
  • args 是目標(biāo)調(diào)用的參數(shù)元組,也就是target調(diào)用函數(shù)的參數(shù)
  • kwargs 是目標(biāo)調(diào)用的關(guān)鍵字參數(shù)字典,也是target調(diào)用函數(shù)的參數(shù)
  • daemon 將進(jìn)程 daemon 標(biāo)志設(shè)置為 TrueFalse 。如果是 None (默認(rèn)值),則該標(biāo)志將從創(chuàng)建的進(jìn)程繼承

示例:創(chuàng)建子進(jìn)程,并顯示子進(jìn)程和父進(jìn)程的的進(jìn)程ID

from multiprocessing import Process
import os


def child_main(name):
    print('I am', name, 'process id:', os.getpid())
    print('parent process:', os.getppid())


if __name__ == '__main__':
    print('main process id:', os.getpid())
    p = Process(target=child_main, args=('bob',))
    p.start()
    p.join()

打印結(jié)果

main process id: 2649
I am bob process id: 2650
parent process: 2649

從程序運(yùn)行來看

p = Process(target=child_main, args=('bob',))

創(chuàng)建了一個(gè)子進(jìn)程,該子進(jìn)程執(zhí)行child_main方法,方法的參數(shù)為'bob'p為該子進(jìn)程對(duì)象的物化實(shí)現(xiàn),封裝子進(jìn)程相關(guān)的功能。

  1. Process對(duì)象的常用方法
  • start()
    啟動(dòng)進(jìn)程,并調(diào)用run()方法。

  • run()
    表示進(jìn)程活動(dòng)的方法,在可以在子類中重載此方法。標(biāo)準(zhǔn) run() 方法調(diào)用傳遞給對(duì)象構(gòu)造函數(shù)的可調(diào)用對(duì)象作為目標(biāo)參數(shù)(如果有),分別從 argskwargs 參數(shù)中獲取順序和關(guān)鍵字參數(shù)。

  • join([timeout])
    如果可選參數(shù) timeoutNone (默認(rèn)值),則該方法將阻塞,直到調(diào)用 join() 方法的進(jìn)程終止。如果 timeout 是一個(gè)正數(shù),它最多會(huì)阻塞 timeout 秒。請(qǐng)注意,如果進(jìn)程終止或方法超時(shí),則該方法返回 None 。檢查進(jìn)程的 exitcode 以確定它是否終止。

  • name
    進(jìn)程的名稱。該名稱是一個(gè)字符串,僅用于識(shí)別目的。它沒有語義。

  • is_alive()
    返回進(jìn)程是否還活著。
    粗略地說,從 start() 方法返回到子進(jìn)程終止之前,進(jìn)程對(duì)象仍處于活動(dòng)狀態(tài)。

  • daemon
    進(jìn)程的守護(hù)標(biāo)志,一個(gè)布爾值。這必須在 start() 被調(diào)用之前設(shè)置。
    當(dāng)進(jìn)程退出時(shí),它會(huì)嘗試終止其所有守護(hù)進(jìn)程子進(jìn)程。
    請(qǐng)注意,不允許守護(hù)進(jìn)程創(chuàng)建子進(jìn)程。否則,守護(hù)進(jìn)程會(huì)在子進(jìn)程退出時(shí)終止其子進(jìn)程。 另外,這些 不是 Unix守護(hù)進(jìn)程或服務(wù),它們是正常進(jìn)程,如果非守護(hù)進(jìn)程已經(jīng)退出,它們將被終止(并且不被合并)。

  • pid
    返回進(jìn)程ID。在生成該進(jìn)程之前,這將是 None 。

  • exitcode
    子進(jìn)程退出代碼。如果進(jìn)程尚未終止,這將是 None 。負(fù)值 -N 表示孩子被信號(hào) N 終止。

  • terminate()
    終止進(jìn)程。 在Unix上,這是使用 SIGTERM 信號(hào)完成的;在Windows上使用 TerminateProcess() 。 請(qǐng)注意,不會(huì)執(zhí)行退出處理程序和finally子句等
    請(qǐng)注意,進(jìn)程的后代進(jìn)程將不會(huì)被終止 —— 它們將孤兒進(jìn)程。參考進(jìn)程基礎(chǔ)

  • kill()
    terminate() 相同,但在Unix上使用 SIGKILL 信號(hào)
    3.7 新版功能.

  • close()
    關(guān)閉 Process 對(duì)象,釋放與之關(guān)聯(lián)的所有資源。如果底層進(jìn)程仍在運(yùn)行,則會(huì)引發(fā) ValueError 。一旦 close() 成功返回, Process 對(duì)象的大多數(shù)其他方法和屬性將引發(fā) ValueError
    3.7 新版功能.

注意 start() 、 join() 、 is_alive() 、 terminate()exitcode 方法只能由創(chuàng)建進(jìn)程對(duì)象的進(jìn)程調(diào)用。

使用實(shí)例 1,利用多進(jìn)程實(shí)現(xiàn)timeout 函數(shù)

在使用 爬蟲 的相關(guān)技術(shù)中,有很多方法都具有 timeout 參數(shù),也可以利用多進(jìn)程實(shí)現(xiàn)timeout 函數(shù),思路如下:
將運(yùn)行函數(shù)放到子進(jìn)程中運(yùn)行,在主進(jìn)程中等待子進(jìn)程執(zhí)行join([timeout]),然后判斷子進(jìn)程的狀態(tài) is_alive(),如果為真,說明子進(jìn)程還在運(yùn)行,已經(jīng)超過我們的限制時(shí)間,則打斷子進(jìn)程,并在主進(jìn)程中拋出異常。
實(shí)現(xiàn)如下:
這里在函數(shù) run_limit()中包含一個(gè)輔助函數(shù),我們也可以將輔助函數(shù)作為一個(gè)參數(shù),可以實(shí)現(xiàn)對(duì)所有函數(shù)轉(zhuǎn)成timeout 函數(shù)

from multiprocessing import Process
import time


def run_limit(timeout=5):
    def fun():
        i = 0
        while True:
            time.sleep(1)
            i += 1
            print(i)

    p = Process(target=fun, )
    p.start()
    p.join(timeout=timeout)
    if p.is_alive():
        p.terminate()
        raise TimeoutError(f'運(yùn)行超時(shí){timeout}!')


if __name__ == '__main__':
    run_limit()

注意,這里每執(zhí)行該函數(shù)都會(huì)啟動(dòng)一個(gè)子進(jìn)程,較普通函數(shù)有較大的消耗。

使用實(shí)例 2,利用守護(hù)進(jìn)程daemon實(shí)現(xiàn)心跳機(jī)制

背景:

心跳機(jī)制是定時(shí)發(fā)送一個(gè)自定義的結(jié)構(gòu)體(心跳包),讓對(duì)方知道自己還活著,以確保連接的有效性的機(jī)制。

在Linux系統(tǒng)中,計(jì)算機(jī)剛啟動(dòng)時(shí)只有一個(gè)進(jìn)程,PID為1,名字為init(centos6系統(tǒng))或者systemd(centos7系統(tǒng)),systemd進(jìn)程通過復(fù)制自身進(jìn)程啟動(dòng)了其它進(jìn)程,后面再復(fù)制出整個(gè)計(jì)算機(jī)進(jìn)程,所以進(jìn)程被設(shè)計(jì)成獨(dú)立的,也就是說父進(jìn)程關(guān)閉了,子進(jìn)程照樣能正常運(yùn)行,這是非常必要的,不至于其中一個(gè)進(jìn)程掛了,讓它的子孫后代進(jìn)程都掛掉。

但如果我們?cè)谥鬟M(jìn)程中開啟一個(gè)子進(jìn)程用于向遠(yuǎn)方服務(wù)器報(bào)告,這種進(jìn)程間的獨(dú)立就不符合我們的期望,而daemon參數(shù)可以實(shí)現(xiàn)主進(jìn)程與子進(jìn)程的綁定,主進(jìn)程結(jié)束,守護(hù)進(jìn)程的子進(jìn)程也結(jié)束,如下

from multiprocessing import Process
import time


def child_main():
    while True:
        print('i am live')
        time.sleep(3)


if __name__ == '__main__':
    p = Process(target=child_main, daemon=True)
    p.start()
    time.sleep(10)
    print('main end')

這里沒有使用join()方法等待子進(jìn)程,所以在運(yùn)行10秒后主進(jìn)程結(jié)束,
由于參數(shù)daemon=True ,所以子進(jìn)程在主進(jìn)程結(jié)束后也跟著結(jié)束了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容