個(gè)人獨(dú)立博客:www.limiao.tech
微信公眾號(hào):TechBoard
進(jìn)程
進(jìn)程:通俗理解就是一個(gè)運(yùn)行的程序或者軟件,進(jìn)程是操作系統(tǒng)資源分配的基本單位
一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程,多進(jìn)程可以完成多任務(wù)
進(jìn)程的狀態(tài)
工作中,任務(wù)數(shù)往往大于cpu的核數(shù),即一定有一些任務(wù)正在執(zhí)行,而另外一些任務(wù)在等待cpu進(jìn)行執(zhí)行,因此導(dǎo)致了有了不同的狀態(tài)
進(jìn)程的使用
導(dǎo)入進(jìn)程模塊:
import multiprocessing
用進(jìn)程完成多任務(wù)
import multiprocessing
import time
def sing():
for i in range(10):
print("唱歌中...")
time.sleep(0.2)
def dance():
for i in range(10):
print("跳舞中...")
time.sleep(0.2)
if __name__ == "__main__":
# 創(chuàng)建對(duì)應(yīng)的子進(jìn)程執(zhí)行對(duì)應(yīng)的任務(wù)
sing_process = multiprocessing.Process(target=sing)
dance_process = multiprocessing.Process(target=dance)
# 啟動(dòng)進(jìn)程執(zhí)行對(duì)應(yīng)的任務(wù)
sing_process.start()
dance_process.start()
Process類參數(shù)介紹
import multiprocessing
import os
def show_info(name,age):
print("show_info:", multiprocessing.current_process())
# 獲取進(jìn)程的編號(hào)
pritn("show_info pid:", multiprocessing.current_process().pid, os.getpid)
print(name, age)
if __name__ == "__main__":
# 創(chuàng)建子進(jìn)程
# group: 進(jìn)程組,目前只能使用None
# target: 執(zhí)行的目標(biāo)任務(wù)
# args: 以元組方式傳參
# kwargs: 以字典方式傳參
sub_prcess = multiprocessing.Process(group=None, target=show_info, arg=("楊冪", 18))
sub_prcess.start()
進(jìn)程之間不共享全局變量
import multiprocessing
import time
# 全局變量
g_list = []
# 添加數(shù)據(jù)
def add_data():
for i in range(15):
g_list.append(i)
time.sleep(0.1)
print("add_data:", g_list)
# 讀取數(shù)據(jù)
def read_data():
print("read_data:", g_list)
if __name__ == "__main__":
# 創(chuàng)建添加數(shù)據(jù)的子進(jìn)程
add_process = multiprocessing.Process(target=add_data)
# 創(chuàng)建讀取數(shù)據(jù)的子進(jìn)程
read_process = multiprocessing.Process(target=read_data)
# 啟動(dòng)進(jìn)程
add_process.start()
# 主進(jìn)程等待添加數(shù)據(jù)的子進(jìn)程執(zhí)行完成以后再執(zhí)行讀取進(jìn)程的操作
add_process.join()
# 代碼執(zhí)行到此說明添加數(shù)據(jù)的子進(jìn)程把任務(wù)執(zhí)行完成了
read_process.start()
創(chuàng)建子進(jìn)程其實(shí)就是對(duì)主進(jìn)程資源的拷貝
主進(jìn)程會(huì)等待所有的子進(jìn)程執(zhí)行完成程序再退出
import multiprocessing
import time
# 工作任務(wù)
def work():
for i in range(10):
print("工作中...")
time.sleep(0.3)
if __name__ == "__main__":
# 創(chuàng)建子進(jìn)程
sub_prcess = multiprocessing.Process(target=work)
# 查看進(jìn)程的守護(hù)狀態(tài)
# print(sub_prcess.daemon)
# 守護(hù)主進(jìn)程,主進(jìn)程退出子進(jìn)程直接銷毀,不再執(zhí)行子進(jìn)程里面的代碼
# sub_prcess.daemon = True
# 啟動(dòng)進(jìn)程執(zhí)行對(duì)應(yīng)的任務(wù)
sub_process.start()
# 主進(jìn)程延時(shí)1s
time.sleep(1)
print("主進(jìn)程執(zhí)行完了")
# 主進(jìn)程退出之前把所有的子進(jìn)程銷毀
sub_prcess.terminate()
exit()
總結(jié): 主進(jìn)程會(huì)等待所有的子進(jìn)程執(zhí)行完成程序再退出
獲取進(jìn)程pid
# 獲取進(jìn)程pid
import multiprocessing
import time
import os
def work():
# 獲取當(dāng)前進(jìn)程編號(hào)
print("work進(jìn)程編號(hào):", os.getpid())
# 獲取父進(jìn)程編號(hào)
print("work父進(jìn)程編號(hào):", os.getppid())
for i in range(10):
print("工作中...")
time.sleep(1)
# 擴(kuò)展:根據(jù)進(jìn)程編號(hào)殺死對(duì)應(yīng)的進(jìn)程
# os.kill(os.getpid(), 9)
if __name__ == '__main__':
# 獲取當(dāng)前進(jìn)程的編號(hào):
print("當(dāng)前進(jìn)程編號(hào):", multiprocessing.current_process().pid)
# 創(chuàng)建子進(jìn)程
sub_process = multiprocessing.Process(target=work)
# 啟動(dòng)進(jìn)程
sub_process.start()
# 主進(jìn)程執(zhí)行打印信息操作
for i in range(20):
print("我在主進(jìn)程中執(zhí)行...")
time.sleep(1)
運(yùn)行結(jié)果:
當(dāng)前進(jìn)程編號(hào): 624
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
work進(jìn)程編號(hào): 1312
work父進(jìn)程編號(hào): 624
工作中...
工作中...
工作中...
工作中...
工作中...
工作中...
工作中...
工作中...
工作中...
工作中...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
我在主進(jìn)程中執(zhí)行...
***Repl Closed***
進(jìn)程間通信——Queue
可以使用multiprocessing模塊Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,Queue本身是一個(gè)消息隊(duì)列程序
import multiprocessing
if __name__ == "__main__":
# 創(chuàng)建消息隊(duì)列
# 3:表示消息隊(duì)列的最大個(gè)數(shù)
queue = multiprocessing.Queue(3)
# 存放數(shù)據(jù)
queue.put(1)
queue.put("hello")
queue.put([1, 5, 8])
# 總結(jié):隊(duì)列可以放入任意類型的數(shù)據(jù)
# queue.put("xxx": "yyy")
# 放入消息的時(shí)候不會(huì)進(jìn)行等待,如果發(fā)現(xiàn)隊(duì)列滿了不能放入數(shù)據(jù),那么會(huì)直接崩潰
# 建議: 放入數(shù)據(jù)統(tǒng)一使用 put 方法
# queue.put_nowait(("xxx": "yyy"))
# 判斷隊(duì)列是否滿了
result = queue.full()
print(result)
# 判斷隊(duì)列是否為空,不靠譜(加延時(shí)可解決)
result = queue.empty()
print("隊(duì)列是否為空:", result)
# 獲取隊(duì)列消息個(gè)數(shù)
size = queue.qsize()
print("消息個(gè)數(shù):", size)
# 獲取隊(duì)列中的數(shù)據(jù)
res = queue.get()
print(res)
# 如果隊(duì)列空了,那么使用get方法會(huì)等待隊(duì)列有消息以后再取值
消息隊(duì)列Queue完成進(jìn)程間通信的演練
import multiprocessing
import time
# 添加數(shù)據(jù)
def add_data(queue):
for i in range(5):
# 判斷隊(duì)列是否滿了
if queue.full():
# 如果滿了跳出循環(huán),不再添加數(shù)據(jù)
print("隊(duì)列滿了")
break
queue.put(i)
print("add:", i)
time.sleep(0.1)
def read_data(queue):
while True:
if queue.qsize == 0:
print("隊(duì)列空了")
break
result = queue.get()
print("read:", result)
if __name__ == "__main__":
# 創(chuàng)建消息隊(duì)列
queue = multiprocessing.Queue(3)
# 創(chuàng)建添加數(shù)據(jù)的子進(jìn)程
add_process = multiprocessing.Process(target=add_data, args=(queue,))
# 創(chuàng)建讀取數(shù)據(jù)的子進(jìn)程
read_process = multiprocessing.Process(target=read_data, args=(queue,))
# 啟動(dòng)進(jìn)程
add_process.start()
# 主進(jìn)程等待寫入進(jìn)程執(zhí)行完成以后代碼再繼續(xù)往下執(zhí)行
add_process.join()
read_process.start()
進(jìn)程池Pool
進(jìn)程池的概念
池子里面放的是進(jìn)程,進(jìn)程池會(huì)根據(jù)任務(wù)執(zhí)行情況自動(dòng)創(chuàng)建進(jìn)程,而且盡量少創(chuàng)建進(jìn)程,合理利用進(jìn)程池中的進(jìn)程完成多任務(wù)
當(dāng)需要?jiǎng)?chuàng)建的子進(jìn)程數(shù)量不多時(shí),可以直接利用multiprocess中的Process動(dòng)態(tài)生成多個(gè)進(jìn)程,但如果是上百甚至上千個(gè)目標(biāo),手動(dòng)的去創(chuàng)建進(jìn)程的工作量巨大,此時(shí)就可以用到multiprocess模塊提供的Pool方法。
初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求,但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)用之前的進(jìn)程來執(zhí)行新的任務(wù)。
進(jìn)程池同步執(zhí)行任務(wù)
進(jìn)程池同步執(zhí)行任務(wù)表示進(jìn)程池中的進(jìn)程在執(zhí)行任務(wù)的時(shí)候一個(gè)執(zhí)行完成另外一個(gè)才能執(zhí)行,如果沒有執(zhí)行完會(huì)等待上一個(gè)進(jìn)程執(zhí)行
進(jìn)程池同步實(shí)例代碼
import multiprocessing
import time
# 拷貝任務(wù)
def work():
print("復(fù)制中...", multiprocessing.current_process().pid)
time.sleep(1)
if __name__ == '__main__':
# 創(chuàng)建進(jìn)程池
#3:進(jìn)程池中進(jìn)程的最大個(gè)數(shù)
pool = multiprocessing.Pool(3)
# 模擬大批量的任務(wù),讓進(jìn)程池去執(zhí)行
for i in range(5):
# 循環(huán)讓進(jìn)程池執(zhí)行對(duì)應(yīng)的work任務(wù)
# 同步執(zhí)行任務(wù),一個(gè)任務(wù)執(zhí)行完成以后另外一個(gè)任務(wù)才能執(zhí)行
pool.apply(work)
運(yùn)行結(jié)果:
復(fù)制中... 6172
復(fù)制中... 972
復(fù)制中... 972
復(fù)制中... 1624
復(fù)制中... 1624
***Repl Closed***
進(jìn)程池異步執(zhí)行任務(wù)
進(jìn)程池異步執(zhí)行任務(wù)表示進(jìn)程池中的進(jìn)程同時(shí)執(zhí)行任務(wù),進(jìn)程之間不會(huì)等待
進(jìn)程池異步實(shí)例代碼
import multiprocessing
import time
# 拷貝任務(wù)
def work():
print("復(fù)制中...", multiprocessing.current_process().pid)
# 獲取當(dāng)前進(jìn)程的守護(hù)狀態(tài)
# 提示:使用進(jìn)程池創(chuàng)建的進(jìn)程時(shí)守護(hù)主進(jìn)程的狀態(tài),默認(rèn)自己通過Process創(chuàng)建的進(jìn)程是不守護(hù)主進(jìn)程的狀態(tài)
# print(multiprocessing.current_process().daemon)
time.sleep(1)
if __name__ == '__main__':
# 創(chuàng)建進(jìn)程池
# 3:進(jìn)程池中進(jìn)程的最大個(gè)數(shù)
pool = multiprocessing.Pool(3)
# 模擬大批量的任務(wù),讓進(jìn)程池去執(zhí)行
for i in range(5):
# 循環(huán)讓進(jìn)程池執(zhí)行對(duì)應(yīng)的work任務(wù)
# 同步執(zhí)行任務(wù),一個(gè)任務(wù)執(zhí)行完成以后另外一個(gè)任務(wù)才能執(zhí)行
# pool.apply(work)
# 異步執(zhí)行,任務(wù)執(zhí)行不會(huì)等待,多個(gè)任務(wù)一起執(zhí)行
pool.apply_async(work)
# 關(guān)閉進(jìn)程池,意思告訴主進(jìn)程以后不會(huì)有新的任務(wù)添加進(jìn)來
pool.close()
# 主進(jìn)程等待進(jìn)程池執(zhí)行完成以后程序再退出
pool.join()
運(yùn)行結(jié)果:
復(fù)制中... 1848
復(fù)制中... 12684
復(fù)制中... 12684
復(fù)制中... 6836
復(fù)制中... 6836
***Repl Closed***
個(gè)人獨(dú)立博客:www.limiao.tech
微信公眾號(hào):TechBoard