python 多進程和多線程

多進程

要讓python程序?qū)崿F(xiàn)多進程,我們先了解操作系統(tǒng)的相關(guān)知識。

Unix、Linux操作系統(tǒng)提供了一個fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因為操作系統(tǒng)自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內(nèi)返回。

子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調(diào)用getppid() 就可以拿到父進程的ID

python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以再python程序中輕松創(chuàng)建子進程:

import os

print("Process (%s)? start..." % os.getpid())

pid = os.fork()

if pid == 0:

? ? ? ? print("i am child process (%s) and my parent is %s." % (os.getpid(),os.getppid()))

else:

? ? ? ? print("I (%s) just created a child process (%s)" % (os.getpid(),pid))

運行結(jié)果如下:

Process (876) start...

I (876) just created a child process (877).

I am child process (877) and my parent is 876.

由于Windows沒有fork調(diào)用,上面的代碼在windows上無法運行。由于Mac系統(tǒng)是基于BSD(unix的一種)內(nèi)核,所以在Mac下運行是沒有問題的,推薦大家用Mac學python

有了fork調(diào)用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監(jiān)聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。

multiprocessing

如果你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由于windows沒有fork調(diào)用,難道在windows上無法用python編寫多進程的程序?

由于python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。

multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了一個子進程并等待其結(jié)束:

from multiprocessing import Process

import os

# 子進程要執(zhí)行的代碼

def run_proc(name):

? ? ? ? print('Run child process %s (%s)...' % (name,os.getpid()))

if __name__ == "__main__":

? ? ? ? print("Parent process %s." % os.getpid())

? ? ? ? p = Process(target = run_proc,args = ("test",))

? ? ? ? print("Child process will start.")

? ? ? ? p.start()

? ? ? ? p.join()

? ? ? ? print("Child process end")

執(zhí)行結(jié)果如下:

Parent process 928.

Process will start

Run child process test (929)...

Process end.

創(chuàng)建子進程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個Process實例,用start()方法啟動,這樣創(chuàng)建進程比fork()還要簡單。

join()方法可以等待子進程結(jié)束后再繼續(xù)往下運行,通常用于進程間的同步。

Pool

如果要啟動大量的子進程,可以用進程池的方式批量創(chuàng)建子進程:

from multiprocessing import Pool

import os,time,random

def long_time_task(name):

? ? ? ? print("Run task %s (%s)..." % (name,os.getpid()))

? ? ? ? start = time.time()

? ? ? ? time.sleep(random.random() * 3)

? ? ? ? end = time.time()

? ? ? ? print("Task %s runs %0.2f seconds." % (name,(end - start)))

if __name__ == "__main__":

? ? ? ? print('Parent process %s.'% os.getpid())?

? ? ? ? p = Pool(4)

????????foriinrange(5):?

? ? ? ? ????????p.apply_async(long_time_task, args=(i,))?

? ? ? ? ? ? ? ? print('Waiting for all subprocesses done...')

? ? ? ? ? ? ? ? p.close()?

? ? ? ? ? ? ? ? p.join()?

? ? ? ? ? ? ? ? print('All subprocesses done.')

執(zhí)行結(jié)果如下:

Parentprocess669.

Waitingforall subprocesses done...

Run task0(671)...

Run task1(672)...

Run task2(673)...

Run task3(674)...

Task2runs0.14seconds.

Runtask4(673)...

Task1runs0.27seconds.

Task3runs0.86seconds.

Task0runs1.41seconds.

Task4runs1.91seconds.

All subprocesses done.

代碼解讀:

對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。

請注意輸出的結(jié)果,task0,1,2,3是立刻執(zhí)行的,而task4要等待前面某個task完成后才執(zhí)行,這是因為Pool的默認大小在我的電腦上是4,因此,最多同時執(zhí)行4個進程。這是Pool有意設計的限制,并不是操作系統(tǒng)的限制。如果改成:

p = Pool(5)

就可以同時跑5個進程。

由于Pool的默認大小是CPU的核數(shù),如果你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果

子進程

很多時候,子進程并不是自身,而是一個外部進程。我們創(chuàng)建了子進程后,還需要控制子進程的輸入和輸出。

subprocess模塊可以讓我們非常方便地啟動一個子進程,然后控制其輸入和輸出。

下面的例子演示了如何在Python代碼中運行命令nslookup www.python.org,這和命令行直接運行的效果是一樣的:

import subprocess

print('$ nslookup www.python.org')

r = subprocess.call(['nslookup','www.python.org'])

print('Exit code:', r)

運行結(jié)果:

$ nslookup www.python.org

Server:192.168.19.4

Address:192.168.19.4#53

Non-authoritativeanswer:

www.python.org \? ? canonical name = python.map.fastly.net.?

Name:python.map.fastly.net

Address:199.27.79.223

Exitcode:0

如果子進程還需要輸入,則可以通過communicate()方法輸入:

importsubprocess

print('$ nslookup')

p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=sub

process.PIPE, stderr=subprocess.PIPE)

output, err = p.communicate(b'set q=mx\npython.org\nexit\n')

print(output.decode('utf-8'))

print('Exit code:', p.returncode)

上面的代碼相當于命令行執(zhí)行命令nslookup,然后手動輸入:

set q=mx

python.org

exit

運行結(jié)果如下:

$ nslookup

Server:192.168.19.4

Address:192.168.19.4#53

Non-authoritativeanswer:

python.org?

mail exchanger =50mail.python.org.

Authoritativeanswers can be foundfrom:

mail.python.org internet address =82.94.164.166

mail.python.org hasAAAAaddress2001:888:2000:d::a6

Exitcode:0

進程間通信

Process之間肯定是需要通信的,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數(shù)據(jù)。

我們以Queue為例,在父進程中創(chuàng)建兩個子進程,一個往Queue里寫數(shù)據(jù),一個從Queue里讀數(shù)據(jù):

frommultiprocessingimportProcess, Queue

importos, time, random

# 寫數(shù)據(jù)進程執(zhí)行的代碼:

defwrite(q):

????????print('Process to write: %s'% os.getpid())

????????forvaluein['A','B','C']:?

? ? ? ? print('Put %s to queue...'% value)

? ? ? ? q.put(value)

????????time.sleep(random.random())

# 讀數(shù)據(jù)進程執(zhí)行的代碼:

defread(q):

????????print('Process to read: %s'% os.getpid())

????????whileTrue:?

? ? ? ? ????????value = q.get(True)

????????????????print('Get %s from queue.'% value)

if__name__=='__main__'

????????:# 父進程創(chuàng)建Queue,并傳給各個子進程:

????????q = Queue()

????????pw = Process(target=write, args=(q,))

????????pr = Process(target=read, args=(q,))

????????# 啟動子進程pw,寫入:

????????pw.start()

????????# 啟動子進程pr,讀取:

????????pr.start()

????????# 等待pw結(jié)束:

????????pw.join()

????????# pr進程里是死循環(huán),無法等待其結(jié)束,只能強行終止:

????????pr.terminate()

運行結(jié)果如下:

Process to write:50563

Put A to queue...

Process to read:50564

Get Afromqueue.

Put B to queue...

Get Bfromqueue.

Put C to queue...

Get Cfromqueue.

在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所有,如果multiprocessing在Windows下調(diào)用失敗了,要先考慮是不是pickle失敗了。

小結(jié)

在Unix/Linux下,可以使用fork()調(diào)用實現(xiàn)多進程。

要實現(xiàn)跨平臺的多進程,可以使用multiprocessing模塊。

進程間通信是通過Queue、Pipes等實現(xiàn)的。

多線程

多任務可以由多進程完成,也可以由一個進程內(nèi)的多線程完成。

我們前面提到了進程是由若干線程組成的,一個進程至少有一個線程。

由于線程是操作系統(tǒng)直接支持的執(zhí)行單元,因此,高級語言通常都內(nèi)置多線程的支持,Python也不例外,并且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。

Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個高級模塊。

啟動一個線程就是把一個函數(shù)傳入并創(chuàng)建Thread實例,然后調(diào)用start()開始執(zhí)行:

importtime, threading

# 新線程執(zhí)行的代碼:

defloop():

????????print('thread %s is running...'% threading.current_thread().name)?

? ? ? ? n =0

????????while n <5:?

? ? ? ? ????????n = n +1

????????????????print('thread %s >>> %s'% (threading.current_thread().name, n))?

????????????????time.sleep(1)

????????print('thread %s ended.'% threading.current_thread().name)

print('thread %s is running...'% threading.current_thread().name)

t = threading.Thread(target=loop, name='LoopThread')

t.start()

t.join()

print('thread %s ended.'% threading.current_thread().name)

執(zhí)行結(jié)果如下:

thread MainThreadisrunning...

thread LoopThreadisrunning...

thread LoopThread >>>1

thread LoopThread >>>2

thread LoopThread >>>3

thread LoopThread >>>4

thread LoopThread >>>5

thread LoopThread ended.

thread MainThread ended.

由于任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數(shù),它永遠返回當前線程的實例。主線程實例的名字叫MainThread,子線程的名字在創(chuàng)建時指定,我們用LoopThread命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,如果不起名字Python就自動給線程命名為Thread-1,Thread-2……

Lock

多線程和多進程最大的不同在于,多進程中,同一個變量,各自有一份拷貝存在于每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數(shù)據(jù)最大的危險在于多個線程同時改一個變量,把內(nèi)容給改亂了。

來看看多個線程同時操作一個變量怎么把內(nèi)容給改亂了:

importtime, threading

# 假定這是你的銀行存款:

balance =0

defchange_it(n):

????????# 先存后取,結(jié)果應該為0:

????????globalbalance?

? ? ? ? balance = balance + n

????????balance = balance - n

defrun_thread(n):

????????for i in range(100000):

????????change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))

t2 = threading.Thread(target=run_thread, args=(8,))

t1.start()

t2.start()

t1.join()

t2.join()

print(balance)

我們定義了一個共享變量balance,初始值為0,并且啟動兩個線程,先存后取,理論上結(jié)果應該為0,但是,由于線程的調(diào)度是由操作系統(tǒng)決定的,當t1、t2交替執(zhí)行時,只要循環(huán)次數(shù)足夠多,balance的結(jié)果就不一定是0了。

原因是因為高級語言的一條語句在CPU執(zhí)行時是若干條語句,即使一個簡單的計算:

balance = balance + n

也分兩步:

? ? 1. 計算 balance + n ,存入臨時變量中;

? ? 2. 將臨時變量的值賦值給balance.

也就是可以看成:

x = balance + n

balance = x

由于x是局部變量,兩個線程各自都有自己的x,當代碼正常執(zhí)行時:

初始值 balance = 0

t1:x1 = balance +5# x1 = 0 + 5 = 5

t1:balance = x1# balance = 5

t1:x1 = balance -5# x1 = 5 - 5 = 0

t1:balance = x1# balance = 0

t2:x2 = balance +8# x2 = 0 + 8 = 8

t2:balance = x2# balance = 8

t2:x2 = balance -8# x2 = 8 - 8 = 0

t2:balance = x2# balance = 0

結(jié)果 balance =0

但是t1和t2是交替運行的,如果操作系統(tǒng)以下面的順序執(zhí)行t1、t2:

初始值 balance =0

t1:x1 = balance +5# x1 = 0 + 5 = 5

t2:x2 = balance +8# x2 = 0 + 8 = 8

t2:balance = x2# balance = 8

t1:balance = x1# balance = 5

t1:x1 = balance -5# x1 = 5 - 5 = 0

t1:balance = x1# balance = 0

t2:x2 = balance -8# x2 = 0 - 8 = -8

t2:balance = x2# balance = -8

結(jié)果 balance = -8

究其原因,是因為修改balance需要多條語句,而執(zhí)行這幾條語句時,線程可能中斷,從而導致多個線程把同一個對象的內(nèi)容改亂了。

兩個線程同時一存一取,就可能導致余額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數(shù),所以,我們必須確保一個線程在修改balance的時候,別的線程一定不能改。

如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執(zhí)行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執(zhí)行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由于鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創(chuàng)建一個鎖就是通過threading.Lock()來實現(xiàn):

balance =0

lock = threading.Lock()

defrun_thread(n):

????????foriinrange(100000):

????????# 先要獲取鎖:lock.acquire()

????????try:

????????????????# 放心地改吧:

????????????????change_it(n)

????????finally:

????????????????# 改完了一定要釋放鎖:

????????????????lock.release()

當多個線程同時執(zhí)行l(wèi)ock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續(xù)執(zhí)行代碼,其他線程就繼續(xù)等待直到獲得鎖為止。

獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。

鎖的好處就是確保了某段關(guān)鍵代碼只能由一個線程從頭到尾完整地執(zhí)行,壞處當然也很多,首先是阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實際上只能以單線程模式執(zhí)行,效率就大大地下降了。其次,由于可以存在多個鎖,不同的線程持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執(zhí)行,也無法結(jié)束,只能靠操作系統(tǒng)強制終止。

多核CPU

如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執(zhí)行多個線程。

如果寫一個死循環(huán)的話,會出現(xiàn)什么情況呢?

打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監(jiān)控某個進程的CPU使用率。

我們可以監(jiān)控到一個死循環(huán)線程會100%占用一個CPU。

如果有兩個死循環(huán)線程,在多核CPU中,可以監(jiān)控到會占用200%的CPU,也就是占用兩個CPU核心。

要想把N核CPU的核心全部跑滿,就必須啟動N個死循環(huán)線程。

試試用Python寫個死循環(huán):

import threading, multiprocessing

defloop():

????????x =0

????????whileTrue:

????????????????x = x ^1

for i in range(multiprocessing.cpu_count()):

????????t = threading.Thread(target=loop)

????????t.start()

啟動與CPU核心數(shù)量相同的N個線程,在4核CPU上可以監(jiān)控到CPU占用率僅有102%,也就是僅使用了一核。

但是用C、C++或Java來改寫相同的死循環(huán),直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?

因為Python的線程雖然是真正的線程,但解釋器執(zhí)行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執(zhí)行。這個GIL全局鎖實際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個線程跑在100核CPU上,也只能用到1個核。

GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現(xiàn)的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。

所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現(xiàn),不過這樣就失去了Python簡單易用的特點。

不過,也不用過于擔心,Python雖然不能利用多線程實現(xiàn)多核任務,但可以通過多進程實現(xiàn)多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。

小結(jié)

多線程編程,模型復雜,容易發(fā)生沖突,必須用鎖加以隔離,同時,又要小心死鎖的發(fā)生。

Python解釋器由于設計時有GIL全局鎖,導致了多線程無法利用多核。多線程的并發(fā)在Python中就是一個美麗的夢。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容