多進程
要讓python程序?qū)崿F(xiàn)多進程,我們先了解操作系統(tǒng)的相關(guān)知識。
Unix、Linux操作系統(tǒng)提供了一個fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因為操作系統(tǒng)自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內(nèi)返回。
子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調(diào)用getppid() 就可以拿到父進程的ID
python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以再python程序中輕松創(chuàng)建子進程:
import os
print("Process (%s)? start..." % os.getpid())
pid = os.fork()
if pid == 0:
? ? ? ? print("i am child process (%s) and my parent is %s." % (os.getpid(),os.getppid()))
else:
? ? ? ? print("I (%s) just created a child process (%s)" % (os.getpid(),pid))
運行結(jié)果如下:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows沒有fork調(diào)用,上面的代碼在windows上無法運行。由于Mac系統(tǒng)是基于BSD(unix的一種)內(nèi)核,所以在Mac下運行是沒有問題的,推薦大家用Mac學python
有了fork調(diào)用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監(jiān)聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。
multiprocessing
如果你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由于windows沒有fork調(diào)用,難道在windows上無法用python編寫多進程的程序?
由于python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。
multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了一個子進程并等待其結(jié)束:
from multiprocessing import Process
import os
# 子進程要執(zhí)行的代碼
def run_proc(name):
? ? ? ? print('Run child process %s (%s)...' % (name,os.getpid()))
if __name__ == "__main__":
? ? ? ? print("Parent process %s." % os.getpid())
? ? ? ? p = Process(target = run_proc,args = ("test",))
? ? ? ? print("Child process will start.")
? ? ? ? p.start()
? ? ? ? p.join()
? ? ? ? print("Child process end")
執(zhí)行結(jié)果如下:
Parent process 928.
Process will start
Run child process test (929)...
Process end.
創(chuàng)建子進程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個Process實例,用start()方法啟動,這樣創(chuàng)建進程比fork()還要簡單。
join()方法可以等待子進程結(jié)束后再繼續(xù)往下運行,通常用于進程間的同步。
Pool
如果要啟動大量的子進程,可以用進程池的方式批量創(chuàng)建子進程:
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
? ? ? ? print("Run task %s (%s)..." % (name,os.getpid()))
? ? ? ? start = time.time()
? ? ? ? time.sleep(random.random() * 3)
? ? ? ? end = time.time()
? ? ? ? print("Task %s runs %0.2f seconds." % (name,(end - start)))
if __name__ == "__main__":
? ? ? ? print('Parent process %s.'% os.getpid())?
? ? ? ? p = Pool(4)
????????foriinrange(5):?
? ? ? ? ????????p.apply_async(long_time_task, args=(i,))?
? ? ? ? ? ? ? ? print('Waiting for all subprocesses done...')
? ? ? ? ? ? ? ? p.close()?
? ? ? ? ? ? ? ? p.join()?
? ? ? ? ? ? ? ? print('All subprocesses done.')
執(zhí)行結(jié)果如下:
Parentprocess669.
Waitingforall subprocesses done...
Run task0(671)...
Run task1(672)...
Run task2(673)...
Run task3(674)...
Task2runs0.14seconds.
Runtask4(673)...
Task1runs0.27seconds.
Task3runs0.86seconds.
Task0runs1.41seconds.
Task4runs1.91seconds.
All subprocesses done.
代碼解讀:
對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
請注意輸出的結(jié)果,task0,1,2,3是立刻執(zhí)行的,而task4要等待前面某個task完成后才執(zhí)行,這是因為Pool的默認大小在我的電腦上是4,因此,最多同時執(zhí)行4個進程。這是Pool有意設計的限制,并不是操作系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時跑5個進程。
由于Pool的默認大小是CPU的核數(shù),如果你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果
子進程
很多時候,子進程并不是自身,而是一個外部進程。我們創(chuàng)建了子進程后,還需要控制子進程的輸入和輸出。
subprocess模塊可以讓我們非常方便地啟動一個子進程,然后控制其輸入和輸出。
下面的例子演示了如何在Python代碼中運行命令nslookup www.python.org,這和命令行直接運行的效果是一樣的:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup','www.python.org'])
print('Exit code:', r)
運行結(jié)果:
$ nslookup www.python.org
Server:192.168.19.4
Address:192.168.19.4#53
Non-authoritativeanswer:
www.python.org \? ? canonical name = python.map.fastly.net.?
Name:python.map.fastly.net
Address:199.27.79.223
Exitcode:0
如果子進程還需要輸入,則可以通過communicate()方法輸入:
importsubprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=sub
process.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代碼相當于命令行執(zhí)行命令nslookup,然后手動輸入:
set q=mx
python.org
exit
運行結(jié)果如下:
$ nslookup
Server:192.168.19.4
Address:192.168.19.4#53
Non-authoritativeanswer:
python.org?
mail exchanger =50mail.python.org.
Authoritativeanswers can be foundfrom:
mail.python.org internet address =82.94.164.166
mail.python.org hasAAAAaddress2001:888:2000:d::a6
Exitcode:0
進程間通信
Process之間肯定是需要通信的,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數(shù)據(jù)。
我們以Queue為例,在父進程中創(chuàng)建兩個子進程,一個往Queue里寫數(shù)據(jù),一個從Queue里讀數(shù)據(jù):
frommultiprocessingimportProcess, Queue
importos, time, random
# 寫數(shù)據(jù)進程執(zhí)行的代碼:
defwrite(q):
????????print('Process to write: %s'% os.getpid())
????????forvaluein['A','B','C']:?
? ? ? ? print('Put %s to queue...'% value)
? ? ? ? q.put(value)
????????time.sleep(random.random())
# 讀數(shù)據(jù)進程執(zhí)行的代碼:
defread(q):
????????print('Process to read: %s'% os.getpid())
????????whileTrue:?
? ? ? ? ????????value = q.get(True)
????????????????print('Get %s from queue.'% value)
if__name__=='__main__'
????????:# 父進程創(chuàng)建Queue,并傳給各個子進程:
????????q = Queue()
????????pw = Process(target=write, args=(q,))
????????pr = Process(target=read, args=(q,))
????????# 啟動子進程pw,寫入:
????????pw.start()
????????# 啟動子進程pr,讀取:
????????pr.start()
????????# 等待pw結(jié)束:
????????pw.join()
????????# pr進程里是死循環(huán),無法等待其結(jié)束,只能強行終止:
????????pr.terminate()
運行結(jié)果如下:
Process to write:50563
Put A to queue...
Process to read:50564
Get Afromqueue.
Put B to queue...
Get Bfromqueue.
Put C to queue...
Get Cfromqueue.
在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所有,如果multiprocessing在Windows下調(diào)用失敗了,要先考慮是不是pickle失敗了。
小結(jié)
在Unix/Linux下,可以使用fork()調(diào)用實現(xiàn)多進程。
要實現(xiàn)跨平臺的多進程,可以使用multiprocessing模塊。
進程間通信是通過Queue、Pipes等實現(xiàn)的。
多線程
多任務可以由多進程完成,也可以由一個進程內(nèi)的多線程完成。
我們前面提到了進程是由若干線程組成的,一個進程至少有一個線程。
由于線程是操作系統(tǒng)直接支持的執(zhí)行單元,因此,高級語言通常都內(nèi)置多線程的支持,Python也不例外,并且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。
Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個高級模塊。
啟動一個線程就是把一個函數(shù)傳入并創(chuàng)建Thread實例,然后調(diào)用start()開始執(zhí)行:
importtime, threading
# 新線程執(zhí)行的代碼:
defloop():
????????print('thread %s is running...'% threading.current_thread().name)?
? ? ? ? n =0
????????while n <5:?
? ? ? ? ????????n = n +1
????????????????print('thread %s >>> %s'% (threading.current_thread().name, n))?
????????????????time.sleep(1)
????????print('thread %s ended.'% threading.current_thread().name)
print('thread %s is running...'% threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.'% threading.current_thread().name)
執(zhí)行結(jié)果如下:
thread MainThreadisrunning...
thread LoopThreadisrunning...
thread LoopThread >>>1
thread LoopThread >>>2
thread LoopThread >>>3
thread LoopThread >>>4
thread LoopThread >>>5
thread LoopThread ended.
thread MainThread ended.
由于任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數(shù),它永遠返回當前線程的實例。主線程實例的名字叫MainThread,子線程的名字在創(chuàng)建時指定,我們用LoopThread命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,如果不起名字Python就自動給線程命名為Thread-1,Thread-2……
Lock
多線程和多進程最大的不同在于,多進程中,同一個變量,各自有一份拷貝存在于每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數(shù)據(jù)最大的危險在于多個線程同時改一個變量,把內(nèi)容給改亂了。
來看看多個線程同時操作一個變量怎么把內(nèi)容給改亂了:
importtime, threading
# 假定這是你的銀行存款:
balance =0
defchange_it(n):
????????# 先存后取,結(jié)果應該為0:
????????globalbalance?
? ? ? ? balance = balance + n
????????balance = balance - n
defrun_thread(n):
????????for i in range(100000):
????????change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
我們定義了一個共享變量balance,初始值為0,并且啟動兩個線程,先存后取,理論上結(jié)果應該為0,但是,由于線程的調(diào)度是由操作系統(tǒng)決定的,當t1、t2交替執(zhí)行時,只要循環(huán)次數(shù)足夠多,balance的結(jié)果就不一定是0了。
原因是因為高級語言的一條語句在CPU執(zhí)行時是若干條語句,即使一個簡單的計算:
balance = balance + n
也分兩步:
? ? 1. 計算 balance + n ,存入臨時變量中;
? ? 2. 將臨時變量的值賦值給balance.
也就是可以看成:
x = balance + n
balance = x
由于x是局部變量,兩個線程各自都有自己的x,當代碼正常執(zhí)行時:
初始值 balance = 0
t1:x1 = balance +5# x1 = 0 + 5 = 5
t1:balance = x1# balance = 5
t1:x1 = balance -5# x1 = 5 - 5 = 0
t1:balance = x1# balance = 0
t2:x2 = balance +8# x2 = 0 + 8 = 8
t2:balance = x2# balance = 8
t2:x2 = balance -8# x2 = 8 - 8 = 0
t2:balance = x2# balance = 0
結(jié)果 balance =0
但是t1和t2是交替運行的,如果操作系統(tǒng)以下面的順序執(zhí)行t1、t2:
初始值 balance =0
t1:x1 = balance +5# x1 = 0 + 5 = 5
t2:x2 = balance +8# x2 = 0 + 8 = 8
t2:balance = x2# balance = 8
t1:balance = x1# balance = 5
t1:x1 = balance -5# x1 = 5 - 5 = 0
t1:balance = x1# balance = 0
t2:x2 = balance -8# x2 = 0 - 8 = -8
t2:balance = x2# balance = -8
結(jié)果 balance = -8
究其原因,是因為修改balance需要多條語句,而執(zhí)行這幾條語句時,線程可能中斷,從而導致多個線程把同一個對象的內(nèi)容改亂了。
兩個線程同時一存一取,就可能導致余額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數(shù),所以,我們必須確保一個線程在修改balance的時候,別的線程一定不能改。
如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執(zhí)行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執(zhí)行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由于鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創(chuàng)建一個鎖就是通過threading.Lock()來實現(xiàn):
balance =0
lock = threading.Lock()
defrun_thread(n):
????????foriinrange(100000):
????????# 先要獲取鎖:lock.acquire()
????????try:
????????????????# 放心地改吧:
????????????????change_it(n)
????????finally:
????????????????# 改完了一定要釋放鎖:
????????????????lock.release()
當多個線程同時執(zhí)行l(wèi)ock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續(xù)執(zhí)行代碼,其他線程就繼續(xù)等待直到獲得鎖為止。
獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。
鎖的好處就是確保了某段關(guān)鍵代碼只能由一個線程從頭到尾完整地執(zhí)行,壞處當然也很多,首先是阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實際上只能以單線程模式執(zhí)行,效率就大大地下降了。其次,由于可以存在多個鎖,不同的線程持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執(zhí)行,也無法結(jié)束,只能靠操作系統(tǒng)強制終止。
多核CPU
如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執(zhí)行多個線程。
如果寫一個死循環(huán)的話,會出現(xiàn)什么情況呢?
打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監(jiān)控某個進程的CPU使用率。
我們可以監(jiān)控到一個死循環(huán)線程會100%占用一個CPU。
如果有兩個死循環(huán)線程,在多核CPU中,可以監(jiān)控到會占用200%的CPU,也就是占用兩個CPU核心。
要想把N核CPU的核心全部跑滿,就必須啟動N個死循環(huán)線程。
試試用Python寫個死循環(huán):
import threading, multiprocessing
defloop():
????????x =0
????????whileTrue:
????????????????x = x ^1
for i in range(multiprocessing.cpu_count()):
????????t = threading.Thread(target=loop)
????????t.start()
啟動與CPU核心數(shù)量相同的N個線程,在4核CPU上可以監(jiān)控到CPU占用率僅有102%,也就是僅使用了一核。
但是用C、C++或Java來改寫相同的死循環(huán),直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?
因為Python的線程雖然是真正的線程,但解釋器執(zhí)行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執(zhí)行。這個GIL全局鎖實際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個線程跑在100核CPU上,也只能用到1個核。
GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現(xiàn)的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。
所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現(xiàn),不過這樣就失去了Python簡單易用的特點。
不過,也不用過于擔心,Python雖然不能利用多線程實現(xiàn)多核任務,但可以通過多進程實現(xiàn)多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。
小結(jié)
多線程編程,模型復雜,容易發(fā)生沖突,必須用鎖加以隔離,同時,又要小心死鎖的發(fā)生。
Python解釋器由于設計時有GIL全局鎖,導致了多線程無法利用多核。多線程的并發(fā)在Python中就是一個美麗的夢。