摘要
在使用Python開發(fā)某些高并發(fā)WEB應(yīng)用后臺時,為了提高處理能力,通常會采用多進(jìn)程或者多線程的方式,但這同時也給異常處理帶了一些麻煩。
背景
線程是一個輕型實體,只有由很少的支持其獨立運行的資源。 對于Python,線程擁有自己獨立的棧, 當(dāng)線程運行出錯,線程會直接結(jié)束運行,當(dāng)需要進(jìn)行錯誤處理時,一般都會在線程中進(jìn)行處理,但是如果只能由主進(jìn)程來處理異常,那么線程要怎么才能將異常通知給主進(jìn)程呢?
對于進(jìn)程,子進(jìn)程的產(chǎn)生的異常如何讓父進(jìn)程去處理?
Multiprocessing Package
Multiprocessing是Python的一個多進(jìn)程庫,其實現(xiàn)的API類似threading. 其子模塊dummy實現(xiàn)了與Multiprocessing.process相同的API,唯一的區(qū)別在于,dummy其實只是對threading做了一層封裝而已. 這給不知道是CPU密集型或者是IO密集型的項目提供了便利,你可以自由地在多進(jìn)程與多線程之間進(jìn)行切換而只需更改一行代碼。
調(diào)用error_callback函數(shù)
在python3中, Multiprocessing對與apply_async()進(jìn)行了改進(jìn),增加了一個默認(rèn)參數(shù)error_callback=None,在這里你可以指定相應(yīng)的錯誤處理函數(shù),它將在子進(jìn)程或者子線程運行出現(xiàn)異常是被調(diào)用. 與之對應(yīng)的是callback=None參數(shù)(Python2也支持),它將在子進(jìn)程或者子線程順利運行之后被調(diào)用.
其原理,在于子線程或者子進(jìn)程的一個_success屬性, 其指出了運行狀態(tài).
def get(self, timeout=None):
self.wait(timeout)
if not self.ready():
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
self._event.set()
del self._cache[self._job]
需要注意的是,raise self._value語句拋出的是一個Exception.AttributeError, 可以看出只會拋出兩種錯誤,一種是超時錯誤,一種是AttributeError. 而我們能利用的就是AttributeError, 在線程或者進(jìn)程執(zhí)行過程種出現(xiàn)錯誤, 我們只需要將合適的錯誤信息使用格式化字符串通過AttributeError傳遞出去,在error_callback去獲取這個錯誤信息,在進(jìn)行進(jìn)一步的處理即可, 如果你只需要告知主進(jìn)程出錯了,那么只需要raise AttributeError即可.
進(jìn)程間數(shù)據(jù)共享
不推薦pipe和queue,而它們都有可能堵塞進(jìn)程,所以最好采用其他方式,這里介紹兩種,共享內(nèi)存變量,以及服務(wù)進(jìn)程Manager. 你可以通過使用這兩種方式在主進(jìn)程中檢測子線程或者子進(jìn)程的狀態(tài). 至于訪問速度,前者與和直接訪問內(nèi)存速度相差無幾, 后者速度要低兩個數(shù)量級
共享內(nèi)存變量 Shared Ctypes Objects
multi提供了兩種類型,Multiprocessing.Value以及MultiProcessing.Array, 這兩種結(jié)果內(nèi)部實現(xiàn)了Lock,默認(rèn)是開啟的, 因此這時,它們是process-safe的. 共享內(nèi)存變量在父進(jìn)程中被創(chuàng)建,然后在創(chuàng)建子進(jìn)程的時候傳進(jìn)去即可.
官方文檔示例代碼:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print("Ounput:")
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
Output:
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
服務(wù)進(jìn)程 Manager
如果你需要支持更多類型數(shù)據(jù)的共享呢? Sever Process Manager 可以完成, 它支持很多數(shù)據(jù)類型,包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和Array. 作為一個獨立的服務(wù)進(jìn)程存在,意味著可以遠(yuǎn)程共享數(shù)據(jù),內(nèi)部實現(xiàn)類似與RPC服務(wù)器,每個需要進(jìn)行讀寫共享數(shù)據(jù)的進(jìn)程,都需要通過proxies來訪問服務(wù)進(jìn)程進(jìn)行操作. 一個Manager對象就控制一個用于管理共享數(shù)據(jù)的服務(wù)進(jìn)程。由于訪問是在網(wǎng)絡(luò)之上進(jìn)行的, 因此速度方面沒有共享內(nèi)存快.
Manager的具體使用方法
檢查線程退出狀態(tài)
由于線程出現(xiàn)異常退出和正常退出時的exitcode時不一樣的,而線程結(jié)束運行后,內(nèi)存又不會立即被回收,基于此,我們可以通過檢查exitcode的方式來捕獲線程中出現(xiàn)的錯誤,這種方法來自PengMeng's Blog, 一般地,Python中進(jìn)程的屬性, 或許可以達(dá)到和exitcode相同的效果. 這里不再深入討論.