Python并發(fā)編程之多進(jìn)程
-
什么是進(jìn)程?
進(jìn)程是一個(gè)抽象的概念,進(jìn)程的概念起源于操作系統(tǒng),正在進(jìn)行的一個(gè)過(guò)程或者一個(gè)過(guò)程的總和,而負(fù)責(zé)執(zhí)行過(guò)程的則是CPU。- 進(jìn)程與程序的區(qū)別
程序僅僅只是一堆文件、一堆代碼而已,而進(jìn)程指的是程序的運(yùn)行過(guò)程或過(guò)程的總和。 - 并發(fā)與并行
無(wú)論是并行還是并發(fā),在用戶看來(lái)都是'同時(shí)'運(yùn)行的,不管是進(jìn)程還是線程,都只是一個(gè)任務(wù)而已,真是干活的是CPU,CPU來(lái)做這些任務(wù),而一個(gè)CPU同一時(shí)刻只能執(zhí)行一個(gè)任務(wù)。- 并發(fā):是偽并行,即看起來(lái)是同時(shí)運(yùn)行。單個(gè)CPU+多道技術(shù)就可以實(shí)現(xiàn)并發(fā),(并行也屬于并發(fā))。
- 并行:同時(shí)運(yùn)行,只有具備多個(gè)CPU才能實(shí)現(xiàn)并行。
- 多道技術(shù)
多道的產(chǎn)生背景是想要在單核CPU的情況下實(shí)現(xiàn)多個(gè)進(jìn)程并發(fā)執(zhí)行,具有兩大核心特點(diǎn):- 空間上的復(fù)用(多道程序復(fù)用內(nèi)存的空間):多道程序同時(shí)讀入內(nèi)存,等待被CPU執(zhí)行,即產(chǎn)生了多個(gè)進(jìn)程;進(jìn)程之間的內(nèi)存地址空間是相互隔離的,而且這種隔離是物理級(jí)別實(shí)現(xiàn)的。
- 時(shí)間上的復(fù)用(多道程序復(fù)用CPU的時(shí)間):正在執(zhí)行的進(jìn)程遇到了I/O操作 或 正在執(zhí)行的進(jìn)程占用CPU時(shí)間過(guò)長(zhǎng) 或 來(lái)了一個(gè)優(yōu)先級(jí)更高的進(jìn)程,操作系統(tǒng)都會(huì)強(qiáng)制收回當(dāng)前進(jìn)程的CPU使用權(quán)限,并將其分配給其他進(jìn)程。
- 因此,若要提升程序的執(zhí)行效率,需要減少或降低I/O操作。
- 進(jìn)程與程序的區(qū)別
-
開(kāi)啟子進(jìn)程的兩種方式
-
第一種
from multiprocessing import Process import time def task(name): print(F'{name} is running') time.sleep(3) print(F'{name} is done') if __name__ == '__main__': # 在Windows系統(tǒng)之上,開(kāi)啟子進(jìn)程的操作一定要放到這下面 # if __name__ == '__main__'的意思是:當(dāng).py文件被直接運(yùn)行時(shí),if __name__ == '__main__'之下的代碼塊將被運(yùn)行; # 當(dāng).py文件以模塊形式被導(dǎo)入時(shí),if __name__ == '__main__'之下的代碼塊不被運(yùn)行。 P = Process(target=task, args=('Python',)) # args= 后面是一個(gè)元組,因此一個(gè)參數(shù)的時(shí)候必須加逗號(hào) # Process(target=task,kwargs={'name':'Python'}) # kwargs= 后面是一個(gè)字典,這兩種傳參方式任選一種 P.start() # 向操作系統(tǒng)發(fā)送申請(qǐng)內(nèi)存空間請(qǐng)求,注意,是發(fā)請(qǐng)求給操作系統(tǒng),至于操作系統(tǒng)怎么申請(qǐng)內(nèi)存空間不管,然后把父進(jìn)程的數(shù)據(jù)拷貝給子進(jìn)程,作為子進(jìn)程的初始狀態(tài) print('父進(jìn)程') -
第二種
from multiprocessing import Process import time class MyProcess(Process): # 自己定義一個(gè)類,并繼承Process def __init__(self, name): super(Process, self).__init__() self.name = name def run(self): # 規(guī)定必須要定義run方法 print(F'{self.name} is running') time.sleep(3) print(F'{self.name} is done') if __name__ == "__main__": p = MyProcess('Python') p.start() # 規(guī)定,start()調(diào)用的是MyProcess()中的run(self)方法 print('父進(jìn)程')
-
-
父進(jìn)程等待子進(jìn)程結(jié)束
-
驗(yàn)證進(jìn)程的內(nèi)存空間相互隔離
from multiprocessing import Process import time x = 1 # 子進(jìn)程要執(zhí)行的代碼 def task(): time.sleep(3) global x x = 0 print('子進(jìn)程結(jié)束',x) # 打印結(jié)果:子進(jìn)程結(jié)束 0 # 開(kāi)啟子進(jìn)程 if __name__ == '__main__': p = Process(target=task) p.start() p.join() # 讓父進(jìn)程等待子進(jìn)程結(jié)束,子進(jìn)程結(jié)束后才會(huì)執(zhí)行下一行代碼,也有回收僵尸進(jìn)程的效果 print(x) # 打印結(jié)果:1 -
驗(yàn)證父進(jìn)程等待子進(jìn)程結(jié)束后才會(huì)結(jié)束
from multiprocessing import Process import time import random # 子進(jìn)程要執(zhí)行的代碼 def task(n): print(F'{n} is running') time.sleep(random.randint(1,10)) # 開(kāi)啟子進(jìn)程 start_time = time.time() p_l= [] if __name__ == '__main__': # 申請(qǐng)開(kāi)啟5次子進(jìn)程 for i in range(5): p = Process(target=task, args=(i,)) p_l.append(p) p.start() # 由于p.start()只是向操作系統(tǒng)申請(qǐng)開(kāi)辟內(nèi)存空間,每次的操作系統(tǒng)開(kāi)辟內(nèi)存空間的時(shí)間程序無(wú)法掌控 # 等待所有子進(jìn)程結(jié)束 for p in p_l: p.join() end_time = time.time() print('父進(jìn)程',(end_time - start_time)) ''' 打印機(jī)結(jié)果: 主進(jìn)程 2 is running 1 is running 0 is running 4 is running 3 is running 父進(jìn)程 9.107510566711426 Process finished with exit code 0 '''
-
-
父進(jìn)程操作子進(jìn)程的其他屬性(方法)
from multiprocessing import Process import time import os # 子進(jìn)程要執(zhí)行的代碼 def task(): print('子進(jìn)程開(kāi)始,子進(jìn)程:%s,父進(jìn)程:%s' %(os.getpid,os.ppid)) time.sleep(3) print('子進(jìn)程結(jié)束',x) # 開(kāi)啟子進(jìn)程 if __name__ == '__main__': print(os.getpid) #查看父進(jìn)程的進(jìn)程號(hào) p = Process(target=task) p.start() print(p.pid) # 查看子進(jìn)程的進(jìn)程號(hào) p.terminate() # 向操作系統(tǒng)發(fā)送結(jié)束子進(jìn)程請(qǐng)求 time.sleep(1) print(p.is_alive()) # 判斷子進(jìn)程是否存活,返回的是布爾值
-
僵尸進(jìn)程與孤兒進(jìn)程
- 僵尸進(jìn)程:僵尸進(jìn)程是當(dāng)子進(jìn)程比父進(jìn)程先結(jié)束,而父進(jìn)程又沒(méi)有回收子進(jìn)程,釋放子進(jìn)程占用的資源,此時(shí)子進(jìn)程將成為一個(gè)僵尸進(jìn)程。
- 危害:僵尸進(jìn)程占用進(jìn)程號(hào),如果存在過(guò)多的僵尸進(jìn)程,很容易導(dǎo)致沒(méi)有新的進(jìn)程號(hào)提供給要產(chǎn)生的進(jìn)程。一般來(lái)說(shuō)父進(jìn)程結(jié)束后會(huì)調(diào)用
wait/waitpid來(lái)回收僵尸進(jìn)程,如果父進(jìn)程沒(méi)有回收僵尸進(jìn)程,則僵尸進(jìn)程變成孤兒進(jìn)程,然后由init進(jìn)程進(jìn)行回收。 - 孤兒進(jìn)程:在操作系統(tǒng)領(lǐng)域中,孤兒進(jìn)程指的是在其父進(jìn)程執(zhí)行完成或被終止后仍繼續(xù)運(yùn)行的一類進(jìn)程。這些孤兒進(jìn)程將被
init進(jìn)程(進(jìn)程號(hào)為1)所收養(yǎng),并由init進(jìn)程對(duì)它們完成狀態(tài)收集工作。
-
守護(hù)進(jìn)程
-
守護(hù)進(jìn)程(daemon)是一類在后臺(tái)運(yùn)行的特殊進(jìn)程,用于執(zhí)行特定的系統(tǒng)任務(wù)。如果父進(jìn)程將子進(jìn)程設(shè)置為守護(hù)進(jìn)程,那么在父進(jìn)程代碼運(yùn)行完畢后,注意,是父進(jìn)程'代碼運(yùn)行完畢',而不是父進(jìn)程結(jié)束,守護(hù)進(jìn)程就立即被回收。
For example: from multiprocessing import Process import time # 子進(jìn)程代碼 def task(name): print('%s is running' %(name)) # 父進(jìn)程代碼 if __name__ == '__main__': obj = Process(target=task,args=('process',)) obj.daemon = True # 將obj設(shè)置為守護(hù)進(jìn)程,守護(hù)進(jìn)程的特點(diǎn)是當(dāng)父進(jìn)程代碼運(yùn)行完畢后,無(wú)論自己的任務(wù)有沒(méi)有完成,都隨之結(jié)束。 obj.start() print('父進(jìn)程')
-
-
互斥鎖
在編程中,引入了對(duì)象互斥鎖的概念,來(lái)保證共享數(shù)據(jù)操作的完整性。每個(gè)對(duì)象都對(duì)應(yīng)于一個(gè)可稱為'互斥鎖' 的標(biāo)記,這個(gè)標(biāo)記用來(lái)保證在任一時(shí)刻,只能有一個(gè)線程訪問(wèn)該對(duì)象。
互斥鎖和join的區(qū)別:
-
二者原理一樣,都是將并發(fā)變成串行,從而保證有序;
- 區(qū)別一:join是按照人為指定的順序執(zhí)行,互斥鎖是所有進(jìn)程平等的競(jìng)爭(zhēng),誰(shuí)先搶到誰(shuí)執(zhí)行。
- 區(qū)別二:互斥鎖可以讓一部分代碼(修改共享數(shù)據(jù)的代碼)串行,而join只能將代碼整體串行。
-
互斥鎖用法(區(qū)別一舉例):
from multiprocessing import Process,Lock lock = Lock() def task1(lock): lock.acquire() # lock.acquire()只能獲取一次互斥鎖 print('task1 is running') lock.release() # lock.release()釋放互斥鎖,一定要在操作完畢后釋放鎖! def task2(lock): lock.acquire() print('task2 is running') lock.release() def task3(lock): lock.acquire() print('task3 is running') lock.release() if __name__ == '__main__': p1 = Process(target=task1,args=(lock,)) p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start() -
互斥鎖應(yīng)用場(chǎng)景之搶票(區(qū)別二舉例):
import os import json import time import random from multiprocessing import Process,Lock mutex_lock = Lock() def search_ticket(): ''' 這是一個(gè)車票查詢的操作 :return: ''' time.sleep(random.randint(1,3)) with open('db.json', mode='r', encoding='utf-8') as f: dic = json.load(f) print(F'用戶:{os.getpid()} 查詢到剩余車票:{dic['count']}' return dic['count'] def get_ticket(): ''' 這是一個(gè)搶票的操作 :return: ''' with open('db.json', mode='r', encoding='utf-8') as f: time.sleep(random.randint(1, 3)) dic = json.load(f) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', mode='w', encoding='utf-8') as f: json.dump(dic,f) print('用戶:%s 購(gòu)票成功!' %(os.getpid())) else: print('用戶:%s 購(gòu)票失敗!' %(os.getpid())) def buy_ticket(): ''' 這是一個(gè)購(gòu)買車票的函數(shù) :return: ''' search_ticket() with mutex_lock: get_ticket() if __name__ == '__main__': for i in range(10): p = Process(target = buy_ticket) p.start()
-
IPC機(jī)制(Inter Process Communication)
-
IPC:進(jìn)程間通信,進(jìn)程間為什么要通信?因?yàn)檫M(jìn)程需要協(xié)同完成工作,就涉及到一個(gè)進(jìn)程要把自己處理的結(jié)果交給另外一個(gè)進(jìn)程,而進(jìn)程之間內(nèi)存空間相互隔離,因此進(jìn)程之間通信必須找到一種介質(zhì),該介質(zhì)必須滿足:
- 是所有進(jìn)程共享的;(實(shí)現(xiàn)進(jìn)程共享的機(jī)制有管道和隊(duì)列,隊(duì)列本質(zhì)是由管道+鎖實(shí)現(xiàn))
- 必須是內(nèi)存空間;
- 幫我們處理好鎖的問(wèn)題。
-
但凡涉及到進(jìn)程之間通信,就使用Queue,隊(duì)列是IPC機(jī)制實(shí)現(xiàn)的一種方式
強(qiáng)調(diào):
- 隊(duì)列是用來(lái)存進(jìn)程之間通信消息的,數(shù)據(jù)量不應(yīng)該過(guò)大;
-
Queue(maxsize)中的maxsize的值超過(guò)內(nèi)存限制就變得毫無(wú)意義;
from multiprocessing import Queue # 從multiprocessing導(dǎo)入隊(duì)列,隊(duì)列特點(diǎn):先進(jìn)先出 q = Queue(3,block=True) # maxsize=3,若不指定大小就無(wú)限大,受限于內(nèi)存大??;block=True默認(rèn)是阻塞。 q.put('Hello') q.put({'name':'HC'}) q.put(100) # q.put(55) 由于maxsize=3,第四個(gè)put會(huì)引起阻塞 print(q.get()) print(q.get()) print(q.get())
-