進(jìn)程基本概念
進(jìn)程由程序,數(shù)據(jù)和進(jìn)程控制塊組成,是正在執(zhí)行的程序,程序的一次執(zhí)行過程,是資源調(diào)度的基本單位。
進(jìn)程調(diào)度算法:先來先服務(wù)調(diào)度算法(FCFS),短作業(yè)(進(jìn)程)優(yōu)先調(diào)度算法,時間片輪轉(zhuǎn)等
并行和并發(fā):
并行:微觀上,一個精確時間片刻,有不同的程序在執(zhí)行,要求必須有多個處理器。
并發(fā):宏觀上,在一個時間段上看是同時執(zhí)行的,實際是進(jìn)程的快速切換。
進(jìn)程三種狀態(tài):
就緒:已獲得運行所需資源,除了cpu資源。
阻塞:等待cpu以外的其他資源
執(zhí)行→阻塞:執(zhí)行的進(jìn)程發(fā)生等待事件而無法執(zhí)行變?yōu)樽枞麪顟B(tài)。例如IO請求,申請資源得不到滿足。
阻塞→就緒:處于阻塞狀態(tài)在其等待的事件已經(jīng)發(fā)生,并不馬上轉(zhuǎn)入執(zhí)行狀態(tài),先轉(zhuǎn)入就緒狀態(tài)
執(zhí)行→就緒:時間片用完而被暫停執(zhí)行。
臨界區(qū):一次只允許一個進(jìn)程進(jìn)入訪問的一段代碼。臨界區(qū)保護(hù)原則是有空讓進(jìn),有限等待。
多進(jìn)程編程
- 一個子進(jìn)程處理任務(wù)
from multiprocessing import Process
def fun1(s):
while 1:
s=s+1
s=0
p=Process(target=fun1,args=(s,)) #創(chuàng)建進(jìn)程對象
p.start() #開啟進(jìn)程
p.join() #主線程等待子線程p終止
cpu檢測結(jié)果如下:

-兩個子進(jìn)程處理任務(wù)
from multiprocessing import Process
def fun1(s):
while 1:
s=0
s=s+1
s=0
p1=Process(target=fun1,args=(s,))
p2=Process(target=fun1,args=(s,))
p1.start()
p2.start()
p1.join()
p2.join()
多進(jìn)程情況下,雙核cpu都處于100%狀態(tài)

多進(jìn)程鎖機制
- 鎖的目的是保證同一時間只有一個進(jìn)程訪問資源,通過鎖可以實現(xiàn)上文提到的臨界區(qū)。
存錢和取錢的例子
- 存錢和取錢不能同時發(fā)生,所以通過鎖實現(xiàn)存錢的過程中不會出現(xiàn)取錢的事件。
- 鎖 from multiprocessing import Lock
-- l=Lock(), l.acquire() 和 l.release() 代表了加鎖解鎖
from multiprocessing import Process,Lock
def get_money(num,l):
l.acquire() #加鎖
for i in range(100):
num=num-1
print (num)
l.release() #解鎖
def put_money(num,l):
for i in range(100):
num=num+1
print (num)
if __name__=='__main__':
num=100
l=Lock()
p=Process(target=get_money,args=(num,l))
p.start()
p1=Process(target=put_money,args=(num,l))
p1.start()
p.join()
p1.join()
#
0
200
由于進(jìn)程間不能共享資源,結(jié)果會出現(xiàn)0 和 200,以后會講到進(jìn)程間共享內(nèi)存時再把代碼完善一下。
多進(jìn)程信號量
上文中提到的鎖機制只能創(chuàng)建一把鎖,而信號量可以配置多把鎖。信號量內(nèi)部由value,queue構(gòu)成。value大于0進(jìn)程可以放入隊列,同時value=value-1,value小于0代表隊列阻塞。當(dāng)進(jìn)程執(zhí)行完從隊列出去value++.保證了進(jìn)程的有序推進(jìn)。
例子:理發(fā)店有三位理發(fā)師,同時只能服務(wù)三位客戶,但是同時進(jìn)來的客戶很多。為了保證三位理發(fā)師同時服務(wù)三位客戶,使用信號量配置三把鎖。只有當(dāng)其中有理發(fā)師空閑時下一位客戶才能進(jìn)入。
信號量 from multiprocessing import Semaphore
-- l=semaphore(3) 配三把鎖信號量代碼例子1
from multiprocessing import Semaphore
if __name__=='__main__':
l=Semaphore(2)
l.acquire()
print (1)
l.acquire()
print (2)
l.acquire()
print (3)
-
結(jié)果輸出如下,由于只配置了兩把鎖,只能輸出1,2,當(dāng)釋放其中一個鎖時,才能執(zhí)行3的輸出 image.png
信號量代碼例子2
初始化5把鎖,開啟10個進(jìn)程,同時只有5個進(jìn)程執(zhí)行函數(shù)fun,當(dāng)釋放鎖時其他進(jìn)程才能執(zhí)行。
from multiprocessing import Semaphore,Process
import time
def fun(i,l):
l.acquire()
print(' %s process start work'%i)
time.sleep(1)
print ('%s process end'%i)
l.release()
if __name__=='__main__':
l=Semaphore(5)
p_l=[]
for i in range(10):
p=Process(target=fun,args=(i,l,))
p.start()
p_l.append(p)
for j in p_l:
j.join()
#結(jié)果輸出
0 process start work
1 process start work
2 process start work
3 process start work
5 process start work
0 process end
1 process end
7 process start work
2 process end
6 process start work
8 process start work
3 process end
9 process start work
5 process end
4 process start work
7 process end
6 process end
8 process end
4 process end
9 process end
- 總結(jié)
信號量機制比鎖機制多了一個計數(shù)器,計數(shù)器用來統(tǒng)計當(dāng)前剩余的鑰匙,當(dāng)計數(shù)器為0表示沒有鑰匙,acquire()處于阻塞狀態(tài)。每acquire()一次,計數(shù)器內(nèi)部減一,release()一次計數(shù)器加一。鎖和信號量都可以看作進(jìn)程間的通信,雖然沒有數(shù)據(jù)交換,但實現(xiàn)了進(jìn)程的有序推進(jìn)。
IPC 進(jìn)程間通信
- 正常情況下,多進(jìn)程之間無法進(jìn)行通信,因為每個進(jìn)程都有自己獨立的內(nèi)存空間。
- 進(jìn)程間數(shù)據(jù)交換:消息隊列
q=Queue(3) 初始化隊列,q.put(): 如果可以繼續(xù)往隊列中放數(shù)據(jù)就直接放,不能放就阻塞等待。 q.get(): 隊列有數(shù)據(jù)直接獲取,沒有數(shù)據(jù)阻塞等待。 - 代碼小例子:使用消息隊列實現(xiàn)多進(jìn)程間的數(shù)據(jù)共享。下面例子中一個進(jìn)程負(fù)責(zé)往隊列中添加數(shù)據(jù),另一個進(jìn)程負(fù)責(zé)刪除數(shù)據(jù)
from multiprocessing import Process,Queue
import time
def add(q):
for i in range(5):
q.put(i)
print ("add %s"%i)
time.sleep(1)
def dele(q):
for i in range(5):
result=q.get()
print ("delete %s"%result)
time.sleep(1)
if __name__=='__main__':
q=Queue()
p=Process(target=add,args=(q,))
p1=Process(target=dele,args=(q,))
p.start()
p1.start()
p.join()
p1.join()
#結(jié)果
add 0
delete 0
add 1
delete 1
add 2
delete 2
add 3
delete 3
add 4
delete 4
多進(jìn)程間共享內(nèi)存數(shù)據(jù)
- from multiprocessing import Manager
-m=Manager() num=m.list([1,2,3]) num可以進(jìn)程間共享
-代碼小例子 共享內(nèi)存數(shù)據(jù)
from multiprocessing import Process,Lock,Manager
def get_money(num,l):
for i in range(100):
l.acquire()
num[0]=num[0]-1
l.release()
# print (num)
def put_money(num,l):
for i in range(100):
l.acquire()
num[0]=num[0]+1
l.release()
# print (num)
if __name__=='__main__':
m=Manager()
num=m.list([1,2,3])
l=Lock()
p=Process(target=get_money,args=(num,l,))
p.start()
p1=Process(target=put_money,args=(num,l))
p1.start()
p.join()
p1.join()
print (num)
#結(jié)果輸出
[1, 2, 3]
生產(chǎn)者和消費者模型(必會)
- 主要是為解耦合,借助上面學(xué)到的隊列實現(xiàn)生產(chǎn)者消費者模型
- 代碼小例子 一個進(jìn)程負(fù)責(zé)生產(chǎn) 另一個進(jìn)程負(fù)責(zé)消費
from multiprocessing import Process,Queue
import time
def producer(q):
for i in range(1,5):
q.put(i)
print ("produce %s"%i)
time.sleep(1)
q.put(None) #添加None是為了當(dāng)消費者消費了隊列中所有值以后可以正常退出
def consumer(q):
while 1:
result=q.get()
if result:
print ("consume %s"%result)
else:
break
if __name__=='__main__':
q=Queue()
p=Process(target=producer,args=(q,))
p1=Process(target=consumer,args=(q,))
p.start()
p1.start()
p.join()
p1.join()
#結(jié)果輸出
produce 1
consume 1
produce 2
consume 2
produce 3
consume 3
produce 4
consume 4
進(jìn)程池
- 一個池子,里面有固定數(shù)量的進(jìn)程,這些進(jìn)程一直處于待命狀態(tài),一旦有任務(wù)來,馬上調(diào)度進(jìn)程去處理。
- 優(yōu)點:開啟多進(jìn)程需要消耗大量時間讓操作系統(tǒng)來為你管理,其次需要消耗大量時間讓cpu調(diào)度。進(jìn)程池可以節(jié)省很多時間。
- 根據(jù)經(jīng)驗進(jìn)程池里的進(jìn)程數(shù)最好設(shè)置為核數(shù)+1
- 代碼小例子:比較了使用進(jìn)程池創(chuàng)建100個進(jìn)程和手動創(chuàng)建100個進(jìn)程的耗費時間
進(jìn)程池:from multiprocessing improt Pool
p.apply_async()函數(shù)代表了異步調(diào)用。池子中的進(jìn)程一次性都去執(zhí)行任務(wù)。
p.apply()函數(shù)代表了同步調(diào)用。進(jìn)程池中的進(jìn)程一個一個執(zhí)行任務(wù)。
from multiprocessing import Pool,Process
import os
import time
def worker():
for i in range(1000000):
i=i+1
# print (os.getpid())
if __name__=="__main__":
start=time.time()
p=Pool(10)
for i in range(100):
p.apply_async(worker)
p.close()
p.join()
end=time.time()
print ("進(jìn)程池時間:",start-end)
start1=time.time()
p_l=[]
for j in range(100):
p=Process(target=worker)
p.start()
p_l.append(p)
[i.join() for i in p_l]
end1=time.time()
print ("手動創(chuàng)建進(jìn)程時間:",start1-end1)
#結(jié)果輸出
進(jìn)程池時間: -3.6715526580810547
手動創(chuàng)建進(jìn)程時間: -9.386803388595581
