進(jìn)程和線程
線程是最小的執(zhí)行單元,而進(jìn)程由至少一個(gè)線程組成。
多進(jìn)程
Unix下多進(jìn)程和系統(tǒng)原生調(diào)用很像,使用fork()
# coding=utf-8
import os
print 'Process (%s) start...' % os.getpid()
pid = os.fork() # win 下沒有該調(diào)用
if pid == 0: # 子進(jìn)程返回0
print 'I am child process (%s) and my parent is %s' % (os.getpid(), os.getppid())
else: # 父進(jìn)程返回子進(jìn)程的pid
'I (%s) just created a child process (%s).' % (os.getpid(), pid)
multiprocessing
提供了跨平臺(tái)的多進(jìn)程支持
# coding=utf-8
from multiprocessing import Process
import os
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',)) # 傳入子進(jìn)程要執(zhí)行的函數(shù)和參數(shù),有點(diǎn)像Linux上創(chuàng)建線程的方式
print 'Process will start.'
p.start()
p.join() # 等待子進(jìn)程執(zhí)行結(jié)束
print 'Process end.'
Pool
# coding=utf-8
import os
import random
import time
from multiprocessing import Pool
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, (end - start))
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Pool()
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close() # 調(diào)用后不能再增加新的Process
p.join() # 必須先調(diào)用close().等待所有子進(jìn)程執(zhí)行完畢
print 'All subprocesses done.'
進(jìn)程間通信
提供了Queue、Pipes等多種方式來交換數(shù)據(jù)
# coding=utf-8
import random
from multiprocessing import Process, Queue
import time
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value) # 寫入隊(duì)列
time.sleep(random.random())
def read(q):
while True:
value = q.get(True) # 從隊(duì)列讀取,無數(shù)據(jù)時(shí)阻塞
print 'Get %s from queue.' % value
if __name__ == '__main__':
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
tips:
在Unix/Linux下,
multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果,父進(jìn)程所有Python對(duì)象都必須通過pickle序列化再傳到子進(jìn)程去,所有,如果multiprocessing在Windows下調(diào)用失敗了,要先考慮是不是pickle失敗了。
多線程
- Python的線程是真正的Posix Thread,而不是模擬出來的線程
-
thread是低級(jí)模塊,threading是高級(jí)模塊,對(duì)thread進(jìn)行了封裝。絕大多數(shù)情況下,只需要使用threading - 啟動(dòng)一個(gè)線程就是把一個(gè)函數(shù)傳入并創(chuàng)建
Thread實(shí)例,然后調(diào)用start()開始執(zhí)行:
t = threading.Thread(target=loop, name='LoopThread')
t.start()
Lock
多線程中為了保證不同線程訪問同一個(gè)變量的安全性,需要在訪問變量的時(shí)候加鎖
lock = threading.Lock() # 獲取鎖實(shí)例
lock.acquire() # 獲取線程鎖
lock.release() # 釋放線程鎖
ThreadLocal
多線程下每個(gè)線程都有自己的數(shù)據(jù),為了時(shí)數(shù)據(jù)訪問方便,一般使用全局變量,但是全局變量會(huì)被別的線程訪問,還必須加鎖比較麻煩,ThreadLocal的實(shí)例可以定義為全局變量,每個(gè)線程對(duì)其屬性的訪問都是獨(dú)立的,相當(dāng)于于線程的全局變量,雖然不同線程訪問同一個(gè)ThreadLocal實(shí)例,但是互不沖突
ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫連接,HTTP請(qǐng)求,用戶身份信息等,這樣一個(gè)線程的所有調(diào)用到的處理函數(shù)都可以非常方便地訪問這些資源。
進(jìn)程 vs. 線程
IO密集型使用多線程能明顯提升性能,而計(jì)算密集型任務(wù)使用多線程可能反而降低性能。
線程過多操作系統(tǒng)會(huì)耗費(fèi)大量資源來進(jìn)行調(diào)度,可能反而降低效率
分布式進(jìn)程
在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。
原理是Queue的遠(yuǎn)程訪問,服務(wù)端將計(jì)算的參數(shù)寫入?yún)?shù)Queue,客戶端將參數(shù)從Queue中讀取出來然后將計(jì)算結(jié)果寫入結(jié)果Queue,最后服務(wù)端將結(jié)果從Queue中讀取出來完成分布式計(jì)算。