前言
我是偏后臺開發(fā)的coder,學(xué)到python的這里時尤其的關(guān)注。操作系統(tǒng)的相關(guān)接口在python是不是比linux C中要簡潔的多。OS的概念不說了,這次筆記集中關(guān)注python中多進(jìn)程、多線程、高并發(fā)、加鎖同步、進(jìn)程間通信等實現(xiàn)。
Definition
進(jìn)程(process),在我的理解中,就是一個任務(wù),是一段運行的程序。后臺的童鞋應(yīng)該知道其本質(zhì)就是一個task_struct結(jié)構(gòu)體,里面記載著程序運行需要的所有資源和他自身的信息。當(dāng)他獲得運行所需的內(nèi)存、CPU資源等,也就是成為了一個running狀態(tài)的進(jìn)程。
可以把進(jìn)程理解為一個任務(wù),那線程就是完成這個任務(wù)的執(zhí)行流。線程是CPU調(diào)度的最小粒度。通常來說,現(xiàn)在的項目中,至少我接觸的,一個進(jìn)程中都包括著不止一個的線程。畢竟現(xiàn)在的OS都是SMP的,充分利用多核心提高程序效率應(yīng)該是每個coder敲鍵盤時需要優(yōu)先考慮的。
多進(jìn)程
linux的內(nèi)核向外提供了 fork() 這個系統(tǒng)調(diào)用來創(chuàng)建一個本進(jìn)程的拷貝,當(dāng)然往往fork()后都跟著 exec() 族系統(tǒng)調(diào)用,我們創(chuàng)建一個進(jìn)程一般都是為了執(zhí)行其他的代碼程序。
python的 os 模塊封裝了很多常用的系統(tǒng)調(diào)用,可以說是python中最常用的一個庫了。舉個栗子:
import os
print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid == 0:
print('Child process (%s).' % os.getpid())
else:
print('Parent process (%s).' % pid)
fork() 會返回兩個結(jié)果,父進(jìn)程返回一個大于0的無符號數(shù),子進(jìn)程返回0。
我們都知道socket()是有好幾個步驟的,而對于web服務(wù)器,每天每時每分都有著成千上萬的訪問請求。如果是一個進(jìn)程向外提供服務(wù),那就是這個進(jìn)程為第一個用戶從創(chuàng)建socket到關(guān)閉,再為下一個用戶提供服務(wù)。用戶時排著隊接受服務(wù)的,顯然不符合邏輯。
拿Apache舉個栗子,它是多進(jìn)程架構(gòu)服務(wù)器的代表。
- 運行主程序,只負(fù)責(zé)server端socket的
listen()和accept(),當(dāng)然主進(jìn)程是一個守護(hù)進(jìn)程 - 每當(dāng)一個用戶請求服務(wù),就會調(diào)用
fork(),在子程序中接受數(shù)據(jù),read()或者write(),然后提供服務(wù)直至關(guān)閉 - 主進(jìn)程還是要負(fù)責(zé)回收結(jié)束的子進(jìn)程資源的
偽代碼如下:
import os
server_fd = socket()
bind(server_fd,ip,port)
listen(server_fd,MAX_PROCESS)
While Online:
connfd = accpet(server_fd)
for each connfd:
os.fork()
// TODO
close(server_fd)
上面這段程序只適用linux平臺,windows平臺創(chuàng)建進(jìn)程的方式并不是 fork() 調(diào)用。python中提供了multiprocesssing模塊來兼容windows,比起fork(),代碼的語義更好理解一些
from multiprocessing import Process
import os
def run_proc(name):
print('Child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
#創(chuàng)建Process實例
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
這里的join語義和linux平臺的多線程中的join語義很像,但效果其實是linux平臺的wait
有時候需要進(jìn)程池,multiprocessing 也直接提供了pool用于創(chuàng)建。
pool.apply(func,params) 是單進(jìn)程阻塞模式
pool.apply_async(func,params,callback) 是多進(jìn)程異步模式
pool.map(func,iter) 用于可迭代結(jié)構(gòu),阻塞式調(diào)用
pool.map_async(func,iter,callback)
一般情況下,還是把進(jìn)程數(shù)控制成和CPU核數(shù)相同。pool結(jié)束調(diào)用pool.join()回收進(jìn)程資源時,需要先pool.close()
上面提到過,創(chuàng)建一個新進(jìn)程的原因往往是為了加載新的代碼,去執(zhí)行新的任務(wù)。所以python封裝了fork()和之后的exec族,提供subprocess模塊,直接操作新的子進(jìn)程。這個包,一般是用來執(zhí)行外部的命令或者程序如shell命令,和os.system()類似。
import subprocess
r = subprocess.call(['ls','-l']) #阻塞
r = subprocess.call('ls -l',shell = True)
r = subprocess.check_call(['ls','-l']) #returncode不為0則raise CalledProcessError異常
r = subprocess.check_output('ls -l',shell=True)
r = subprocess.Popen(['ls','-l']) #非阻塞,需主動wait
r = subprocess.Popen(['ls','-l'],stdin=child1.stdout,stdout=subprocess.PIPE, stderr=subprocess.PIPE) #設(shè)置標(biāo)準(zhǔn)輸入輸出出錯的句柄
out,err = r.communicate() #繼續(xù)輸入,或者用來獲得返回的元組(stdoutdata,stderrdata)
手動繼續(xù)輸入的例子:
import subprocess
print('$ python')
p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"print('Hello,world')")
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
進(jìn)程間通信
用multiprocessing的Queue或者Pipe來幫助實現(xiàn),類似linux中的Pipe,打開一條管道,一個進(jìn)程往里面扔數(shù)據(jù),一個從另一頭撿數(shù)據(jù)。python中的Pipe是全雙工管道,既可以讀也可以寫??梢酝ㄟ^Pipe(duplex=False)創(chuàng)建半雙工管道。
from multiprocessing import Pipe,Queue
#實例
q = Queue()
p = Pipe()
#寫入數(shù)據(jù)
q.put(value)
p[0].send(value)
#讀數(shù)據(jù)
q.get()
p[1].recv()
分別舉個例子,用Queue
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
time.sleep(random.random())
print('Get %s from queue.' % value)
if __name__=='__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
用Pipe:
from multiprocessing import Process, Pipe
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A','B', 'C']:
print('Put %s to pipe...' % value)
q.send(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.recv()
time.sleep(random.random())
print('Get %s from pipe.' % value)
if __name__=='__main__':
p = Pipe()
pw = Process(target=write, args=(p[0],))
pr = Process(target=read, args=(p[1],))
pw.start()
pr.start()
pw.join()
time.sleep(2)
pr.terminate()
多線程
有人會有疑問,問什么要在進(jìn)程中開多個線程,多創(chuàng)建幾個進(jìn)程一起干活不就行了。其實這樣是可以的,只不過進(jìn)程這個單位有點大,比較占用資源,創(chuàng)建的時候開銷比較大(尤其在windows系統(tǒng)下),進(jìn)程多了CPU調(diào)度起來,在進(jìn)程間切換也是非常耗時的。還有多任務(wù)協(xié)同合作時,需要數(shù)據(jù)交換,進(jìn)程間通信也是開銷,而一個進(jìn)程中的線程是共享進(jìn)程的內(nèi)存空間的,可以直接交互。所以現(xiàn)在多線程的程序更加常見。
不過多線程也是有弊端的,協(xié)同合作的多線程,有一個掛了,會影響到所有的其他線程,也就代表這個任務(wù)是做不下去了。進(jìn)程因為有著獨立的地址空間,所以一個進(jìn)程死了對其他進(jìn)程的影響可以說很小。
python中提供了threading模塊為多線程服務(wù),threading.current_thread()返回當(dāng)前線程,主線程名為MainThread
import threading
thread = threading.Thread(target=func,args=())
thread.start()
thread.join()
多線程編程,最重要的就是同步和互斥,也就是各種鎖的用法。為什么要用鎖,后臺的童鞋應(yīng)該都懂,現(xiàn)在的SMP操作系統(tǒng)都是搶占式內(nèi)核,也就是即使你不同的核共同工作時,很幸運的沒有改亂一個共享變量,當(dāng)然這就不可能了。當(dāng)你的CPU時間片到時間了,或者需要內(nèi)存或者IO資源,你被踢出了CPU的工作隊列,你必須得在走的時候給你的資源把鎖加上,下次再來接著做。線程同步的重點的是對共享資源的判斷,和選擇合適的鎖。也就是對什么資源加鎖和用什么鎖。
不過在python中很遺憾,多線程存在著天生的缺陷,因為有著GIL的存在,這是python解釋器的設(shè)計缺陷。導(dǎo)致python程序在被解釋時,只能有一個線程。不過,對于IO密集型的程序,多線程的設(shè)計還是很有幫助的。比如爬蟲
- 最常用的鎖,類似 mutex
- 條件變量,
threading.Condition()會包含一個Lock對象,因為這兩者一般都是配合使用的。 - 信號量,
threading.Semaphore()
import threading
lock = threading.Lock()
lock.acquire()
lock.realease() #配合try...finally保證最后釋放掉鎖,防止死鎖
cond = threading.Condition()
cond.wait()
cond.notify() cond.notify_all()
sem = threading.Semaphore(NUM)
sem.acquire()
sem.realease()
event = threading.Event() #相當(dāng)于沒有l(wèi)ock的cond
event.set(True)
event.clear()
假設(shè)以下的情況
thread_func(params):
web_res = params
def func1(web_res):
http = web_res.http
TODO
def func2(web_res):
data = web_res.data
TODO
def func3(web_res):
user = web_res.user
TODO
在一個線程中,又存在多個子線程或者函數(shù)時,需要把一個參數(shù)都傳給它們時。可以通過唯一的id來區(qū)分出從全局變量自己的局部變量時。可以用ThreadLocal實現(xiàn)
import threading
student = threading.local()
def func(name):
person = student.name #需要之前關(guān)聯(lián)過
p1 = threading.Thread(target=func,argc='A')
p1 = threading.Thread(target=func,argc='B')
通過ThredLocal免去了我們親自去字典中存取。通常用于web開發(fā)中的為每個線程綁定一個數(shù)據(jù)庫連接,HTTP請求,用戶身份信息等。
分布式進(jìn)程
分布式是為了在橫向上提升整個系統(tǒng)的負(fù)載能力。python中multiprocessing模塊中的manage子模塊支持把多進(jìn)程分布到不同的機(jī)器上。當(dāng)然肯定存在一個master進(jìn)程來負(fù)責(zé)任務(wù)的調(diào)度。依賴manage子模塊,可以很輕松的寫出分布式程序。
比如爬蟲,想要爬下豆瓣或者知乎這樣網(wǎng)站的全部數(shù)據(jù),用單機(jī)估計得花費好幾年??梢园研枰赖木W(wǎng)站的所有URL放在一個Queue中,master進(jìn)程負(fù)責(zé)Queue的管理,可以將很多設(shè)備與master進(jìn)程所在的設(shè)備建立聯(lián)系,爬蟲開始獲取URL時,都從主機(jī)器獲取。這樣就能保證協(xié)同不沖突的合作。