多進(jìn)程
- 什么是進(jìn)程
一個(gè)程序運(yùn)行起來(lái)后,代碼+用到的資源 稱之為進(jìn)程,它是操作系統(tǒng)分配資源的基本單元。
不僅可以通過(guò)線程完成多任務(wù),進(jìn)程也是可以的
- 進(jìn)程的狀態(tài)
工作中,任務(wù)數(shù)往往大于cpu的核數(shù),即一定有一些任務(wù)正在執(zhí)行,而另外一些任務(wù)在等待cpu進(jìn)行執(zhí)行,因此導(dǎo)致了有了不同的狀態(tài)
- 就緒態(tài):運(yùn)行的條件都已經(jīng)慢去,正在等在cpu執(zhí)行
- 執(zhí)行態(tài):cpu正在執(zhí)行其功能
- 等待態(tài):等待某些條件滿足,例如一個(gè)程序sleep了,此時(shí)就處于等待態(tài)

進(jìn)程的創(chuàng)建-multiprocessing
multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊,提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象,這個(gè)對(duì)象可以理解為是一個(gè)獨(dú)立的進(jìn)程,可以執(zhí)行另外的事情
例1
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def run_proc():
"""子進(jìn)程要執(zhí)行的代碼"""
while True:
print("----2----")
time.sleep(1)
if __name__=='__main__':
p = Process(target=run_proc)
p.start()
while True:
print("----1----")
time.sleep(1)
注意:創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,用start()方法啟動(dòng)
例2
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
def run_proc():
"""子進(jìn)程要執(zhí)行的代碼"""
print('子進(jìn)程運(yùn)行中,pid=%d...' % os.getpid()) # os.getpid獲取當(dāng)前進(jìn)程的進(jìn)程號(hào)
print('子進(jìn)程將要結(jié)束...')
if __name__ == '__main__':
print('父進(jìn)程pid: %d' % os.getpid()) # os.getpid獲取當(dāng)前進(jìn)程的進(jìn)程號(hào)
p = Process(target=run_proc)
print(p.name)
p.start()
- Process參數(shù)如下:
Process(group, target, name, args , kwargs)
- target:如果傳遞了函數(shù)的引用,可以任務(wù)這個(gè)子進(jìn)程就執(zhí)行這里的代碼
- args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
- kwargs:給target指定的函數(shù)傳遞命名參數(shù)
- name:給進(jìn)程設(shè)定一個(gè)名字,可以不設(shè)定
- group:指定進(jìn)程組,大多數(shù)情況下用不到
Process創(chuàng)建的實(shí)例對(duì)象的常用方法: - start():?jiǎn)?dòng)子進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程)
- is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
- join([timeout]):是否等待子進(jìn)程執(zhí)行結(jié)束,或等待多少秒
- terminate():不管任務(wù)是否完成,立即終止子進(jìn)程
Process創(chuàng)建的實(shí)例對(duì)象的常用屬性: - name:當(dāng)前進(jìn)程的別名,默認(rèn)為Process-N,N為從1開(kāi)始遞增的整數(shù)
- pid:當(dāng)前進(jìn)程的pid(進(jìn)程號(hào))
- 給子進(jìn)程指定的函數(shù)傳遞參數(shù)
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep
def run_proc(name, age, **kwargs):
for i in range(10):
print('子進(jìn)程運(yùn)行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
print(kwargs)
sleep(0.2)
if __name__=='__main__':
p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
p.start()
sleep(1) # 1秒中之后,立即結(jié)束子進(jìn)程
p.terminate()
p.join()
- 進(jìn)程間不同享全局變量
from multiprocessing import Process
import time,queue
queue = queue.Queue(200)
def write_data():
for i in range(0,200):
global queue
queue.put(i)
print(queue.full())
def get_data():
print('-----')
global queue
while queue.empty() is not True:
print(queue.get())
if __name__ == '__main__':
process1 = Process(target=write_data)
process1.start()
process1.join()
process2 = Process(target=get_data)
process2.start()
進(jìn)程間通信-Queue:
Process之間有時(shí)需要通信,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。
- Queue的使用 可以使用multiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,Queue本身是一個(gè)消息列隊(duì)程序
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個(gè)Queue對(duì)象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True
#因?yàn)橄⒘嘘?duì)已滿下面的try都會(huì)拋出異常,第一個(gè)try會(huì)等待2秒后再拋出異常,第二個(gè)Try會(huì)立刻拋出異常
try:
q.put("消息4",True,2)
except:
print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s"%q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s"%q.qsize())
#推薦的方式,先判斷消息列隊(duì)是否已滿,再寫(xiě)入
if not q.full():
q.put_nowait("消息4")
#讀取消息時(shí),先判斷消息列隊(duì)是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
說(shuō)明:
- Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;
- Queue.empty():如果隊(duì)列為空,返回True,反之False ;
- Queue.full():如果隊(duì)列滿了,返回True,反之False;
- Queue.get(block, timeout):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True;
- Queue.get_nowait():相當(dāng)Queue.get(False);
- Queue.put(item,block,timeout):將item消息寫(xiě)入隊(duì)列,block默認(rèn)值為True;
- Queue.put_nowait(item):相當(dāng)Queue.put(item, False);
進(jìn)程池的概念
python中,進(jìn)程池內(nèi)部會(huì)維護(hù)一個(gè)進(jìn)程序列。當(dāng)需要時(shí),程序會(huì)去進(jìn)程池中獲取一個(gè)進(jìn)程。
如果進(jìn)程池序列中沒(méi)有可供使用的進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。
multiprocessing.Pool常用函數(shù)解析:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程),args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;
- close():關(guān)閉Pool,使其不再接受新的任務(wù);
- terminate():不管任務(wù)是否完成,立即終止;
- join():主進(jìn)程阻塞,等待子進(jìn)程的退出, 必須在close或terminate之后使用;
進(jìn)程池中中進(jìn)程通信Manager().Queue()
如果要使用Pool創(chuàng)建進(jìn)程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會(huì)得到一條如下的錯(cuò)誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
代碼
#如何使用進(jìn)程池
# from multiprocessing import Pool
# import requests
# def download_data_by_page(page,name):
# print(page,name)
# #http://blog.jobbole.com/all-posts/page/2/
# full_url = 'http://blog.jobbole.com/all-posts/page/%s/' % str(page)
# req_headers = {
# 'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
# }
# response = requests.get(full_url,headers=req_headers)
# if response.status_code == 200:
# print(response.text)
# #可以在這里做數(shù)據(jù)解析
# if __name__ == '__main__':
# #創(chuàng)建一個(gè)進(jìn)程池
# process_pool = Pool(4)
# for page in range(0,1000):
# #往進(jìn)程池中添加任務(wù)
# process_pool.apply_async(download_data_by_page,(1,'進(jìn)程池'))
# #關(guān)閉進(jìn)程池,后面不能再添加任務(wù)了
# process_pool.close()
# #子進(jìn)程先執(zhí)行,執(zhí)行完畢后,再繼續(xù)執(zhí)行主進(jìn)程代碼
# process_pool.join()
#python自帶的進(jìn)程池模塊
from concurrent.futures import ProcessPoolExecutor
import requests
def download_data_by_page(page,name):
print(page,name)
full_url = 'http://blog.jobbole.com/all-posts/page/%s/' % str(page)
req_headers = {
'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
response = requests.get(full_url,headers=req_headers)
if response.status_code == 200:
print(response.status_code)
#可以在這里做數(shù)據(jù)解析
return response.text
#可以在回調(diào)函數(shù)中做數(shù)據(jù)解析
def download_done(future):
print(future.result())
if __name__ == '__main__':
#創(chuàng)建一個(gè)進(jìn)程池
pool = ProcessPoolExecutor(4)
for page in range(0,100):
handler = pool.submit(download_data_by_page,page,'下載任務(wù)')
handler.add_done_callback(download_done)
pool.shutdown()
# python的多線程:
# 有一個(gè)全局解釋器鎖(GIL):意味著python中的多線程其實(shí)是并發(fā)的操作
# 對(duì)比線程和進(jìn)程
# 1.定義:
# 進(jìn)程:是操作系統(tǒng)分配資源和調(diào)度的基本單元
# 線程:是依賴于進(jìn)程執(zhí)行,線程是cpu執(zhí)行調(diào)度的最小單元
# 2.區(qū)別:
# 進(jìn)程是會(huì)分配資源空間,每一個(gè)進(jìn)程之間的資源不共享
# 線程不占用資源空間,線程之間的資源是共享的,為了防止資源錯(cuò)亂,我們一般加線程鎖
# 3.使用場(chǎng)景:
# 進(jìn)程一般情況下處理計(jì)算密集型任務(wù)
# 線程一般處理I/O密集型任務(wù)