父進(jìn)程如何捕獲子進(jìn)程的異常?

摘要

在使用Python開發(fā)某些高并發(fā)WEB應(yīng)用后臺時,為了提高處理能力,通常會采用多進(jìn)程或者多線程的方式,但這同時也給異常處理帶了一些麻煩。


背景

線程是一個輕型實體,只有由很少的支持其獨立運行的資源。 對于Python,線程擁有自己獨立的棧, 當(dāng)線程運行出錯,線程會直接結(jié)束運行,當(dāng)需要進(jìn)行錯誤處理時,一般都會在線程中進(jìn)行處理,但是如果只能由主進(jìn)程來處理異常,那么線程要怎么才能將異常通知給主進(jìn)程呢?
對于進(jìn)程,子進(jìn)程的產(chǎn)生的異常如何讓父進(jìn)程去處理?


Multiprocessing Package

Multiprocessing是Python的一個多進(jìn)程庫,其實現(xiàn)的API類似threading. 其子模塊dummy實現(xiàn)了與Multiprocessing.process相同的API,唯一的區(qū)別在于,dummy其實只是對threading做了一層封裝而已. 這給不知道是CPU密集型或者是IO密集型的項目提供了便利,你可以自由地在多進(jìn)程與多線程之間進(jìn)行切換而只需更改一行代碼。


調(diào)用error_callback函數(shù)

在python3中, Multiprocessing對與apply_async()進(jìn)行了改進(jìn),增加了一個默認(rèn)參數(shù)error_callback=None,在這里你可以指定相應(yīng)的錯誤處理函數(shù),它將在子進(jìn)程或者子線程運行出現(xiàn)異常是被調(diào)用. 與之對應(yīng)的是callback=None參數(shù)(Python2也支持),它將在子進(jìn)程或者子線程順利運行之后被調(diào)用.
其原理,在于子線程或者子進(jìn)程的一個_success屬性, 其指出了運行狀態(tài).

    def get(self, timeout=None):
        self.wait(timeout)
        if not self.ready():
            raise TimeoutError
        if self._success:
            return self._value
        else:
            raise self._value

    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        if self._error_callback and not self._success:
            self._error_callback(self._value)
        self._event.set()
        del self._cache[self._job]

需要注意的是,raise self._value語句拋出的是一個Exception.AttributeError, 可以看出只會拋出兩種錯誤,一種是超時錯誤,一種是AttributeError. 而我們能利用的就是AttributeError, 在線程或者進(jìn)程執(zhí)行過程種出現(xiàn)錯誤, 我們只需要將合適的錯誤信息使用格式化字符串通過AttributeError傳遞出去,在error_callback去獲取這個錯誤信息,在進(jìn)行進(jìn)一步的處理即可, 如果你只需要告知主進(jìn)程出錯了,那么只需要raise AttributeError即可.


進(jìn)程間數(shù)據(jù)共享

不推薦pipequeue,而它們都有可能堵塞進(jìn)程,所以最好采用其他方式,這里介紹兩種,共享內(nèi)存變量,以及服務(wù)進(jìn)程Manager. 你可以通過使用這兩種方式在主進(jìn)程中檢測子線程或者子進(jìn)程的狀態(tài). 至于訪問速度,前者與和直接訪問內(nèi)存速度相差無幾, 后者速度要低兩個數(shù)量級

共享內(nèi)存變量 Shared Ctypes Objects

multi提供了兩種類型,Multiprocessing.Value以及MultiProcessing.Array, 這兩種結(jié)果內(nèi)部實現(xiàn)了Lock,默認(rèn)是開啟的, 因此這時,它們是process-safe的. 共享內(nèi)存變量在父進(jìn)程中被創(chuàng)建,然后在創(chuàng)建子進(jìn)程的時候傳進(jìn)去即可.

官方文檔示例代碼:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print("Ounput:")
    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Output:
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

服務(wù)進(jìn)程 Manager

如果你需要支持更多類型數(shù)據(jù)的共享呢? Sever Process Manager 可以完成, 它支持很多數(shù)據(jù)類型,包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, ValueArray. 作為一個獨立的服務(wù)進(jìn)程存在,意味著可以遠(yuǎn)程共享數(shù)據(jù),內(nèi)部實現(xiàn)類似與RPC服務(wù)器,每個需要進(jìn)行讀寫共享數(shù)據(jù)的進(jìn)程,都需要通過proxies來訪問服務(wù)進(jìn)程進(jìn)行操作. 一個Manager對象就控制一個用于管理共享數(shù)據(jù)的服務(wù)進(jìn)程。由于訪問是在網(wǎng)絡(luò)之上進(jìn)行的, 因此速度方面沒有共享內(nèi)存快.
Manager的具體使用方法


檢查線程退出狀態(tài)

由于線程出現(xiàn)異常退出和正常退出時的exitcode時不一樣的,而線程結(jié)束運行后,內(nèi)存又不會立即被回收,基于此,我們可以通過檢查exitcode的方式來捕獲線程中出現(xiàn)的錯誤,這種方法來自PengMeng's Blog, 一般地,Python中進(jìn)程的屬性, 或許可以達(dá)到和exitcode相同的效果. 這里不再深入討論.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容