多進(jìn)程、進(jìn)程池

多進(jìn)程

  1. 什么是進(jìn)程

一個(gè)程序運(yùn)行起來(lái)后,代碼+用到的資源 稱之為進(jìn)程,它是操作系統(tǒng)分配資源的基本單元。
不僅可以通過(guò)線程完成多任務(wù),進(jìn)程也是可以的

  1. 進(jìn)程的狀態(tài)

工作中,任務(wù)數(shù)往往大于cpu的核數(shù),即一定有一些任務(wù)正在執(zhí)行,而另外一些任務(wù)在等待cpu進(jìn)行執(zhí)行,因此導(dǎo)致了有了不同的狀態(tài)

  • 就緒態(tài):運(yùn)行的條件都已經(jīng)慢去,正在等在cpu執(zhí)行
  • 執(zhí)行態(tài):cpu正在執(zhí)行其功能
  • 等待態(tài):等待某些條件滿足,例如一個(gè)程序sleep了,此時(shí)就處于等待態(tài)
image.png

進(jìn)程的創(chuàng)建-multiprocessing

multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊,提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象,這個(gè)對(duì)象可以理解為是一個(gè)獨(dú)立的進(jìn)程,可以執(zhí)行另外的事情

例1

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time


def run_proc():
    """子進(jìn)程要執(zhí)行的代碼"""
    while True:
        print("----2----")
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print("----1----")
        time.sleep(1)

注意:創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,用start()方法啟動(dòng)

例2

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def run_proc():
    """子進(jìn)程要執(zhí)行的代碼"""
    print('子進(jìn)程運(yùn)行中,pid=%d...' % os.getpid())  # os.getpid獲取當(dāng)前進(jìn)程的進(jìn)程號(hào)
    print('子進(jìn)程將要結(jié)束...')

if __name__ == '__main__':
    print('父進(jìn)程pid: %d' % os.getpid())  # os.getpid獲取當(dāng)前進(jìn)程的進(jìn)程號(hào)
    p = Process(target=run_proc)
    print(p.name)
    p.start()
  1. Process參數(shù)如下:
    Process(group, target, name, args , kwargs)
  • target:如果傳遞了函數(shù)的引用,可以任務(wù)這個(gè)子進(jìn)程就執(zhí)行這里的代碼
  • args:給target指定的函數(shù)傳遞的參數(shù),以元組的方式傳遞
  • kwargs:給target指定的函數(shù)傳遞命名參數(shù)
  • name:給進(jìn)程設(shè)定一個(gè)名字,可以不設(shè)定
  • group:指定進(jìn)程組,大多數(shù)情況下用不到
    Process創(chuàng)建的實(shí)例對(duì)象的常用方法:
  • start():?jiǎn)?dòng)子進(jìn)程實(shí)例(創(chuàng)建子進(jìn)程)
  • is_alive():判斷進(jìn)程子進(jìn)程是否還在活著
  • join([timeout]):是否等待子進(jìn)程執(zhí)行結(jié)束,或等待多少秒
  • terminate():不管任務(wù)是否完成,立即終止子進(jìn)程
    Process創(chuàng)建的實(shí)例對(duì)象的常用屬性:
  • name:當(dāng)前進(jìn)程的別名,默認(rèn)為Process-N,N為從1開(kāi)始遞增的整數(shù)
  • pid:當(dāng)前進(jìn)程的pid(進(jìn)程號(hào))
  1. 給子進(jìn)程指定的函數(shù)傳遞參數(shù)
 # -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep


def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子進(jìn)程運(yùn)行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
        print(kwargs)
        sleep(0.2)

if __name__=='__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    p.start()
    sleep(1)  # 1秒中之后,立即結(jié)束子進(jìn)程
    p.terminate()
    p.join()
  1. 進(jìn)程間不同享全局變量
from multiprocessing import Process
import time,queue

queue = queue.Queue(200)

def write_data():
    for i in range(0,200):
        global queue
        queue.put(i)

    print(queue.full())

def get_data():
    print('-----')

    global queue
    while queue.empty() is not True:
        print(queue.get())

if __name__ == '__main__':

    process1 = Process(target=write_data)
    process1.start()
    process1.join()

    process2 = Process(target=get_data)
    process2.start()

進(jìn)程間通信-Queue:

Process之間有時(shí)需要通信,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。

  1. Queue的使用 可以使用multiprocessing模塊的Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞,Queue本身是一個(gè)消息列隊(duì)程序
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個(gè)Queue對(duì)象,最多可接收三條put消息
q.put("消息1") 
q.put("消息2")
print(q.full())  #False
q.put("消息3")
print(q.full()) #True

#因?yàn)橄⒘嘘?duì)已滿下面的try都會(huì)拋出異常,第一個(gè)try會(huì)等待2秒后再拋出異常,第二個(gè)Try會(huì)立刻拋出異常
try:
    q.put("消息4",True,2)
except:
    print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s"%q.qsize())

try:
    q.put_nowait("消息4")
except:
    print("消息列隊(duì)已滿,現(xiàn)有消息數(shù)量:%s"%q.qsize())

#推薦的方式,先判斷消息列隊(duì)是否已滿,再寫(xiě)入
if not q.full():
    q.put_nowait("消息4")

#讀取消息時(shí),先判斷消息列隊(duì)是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

說(shuō)明:

  • Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量;
  • Queue.empty():如果隊(duì)列為空,返回True,反之False ;
  • Queue.full():如果隊(duì)列滿了,返回True,反之False;
  • Queue.get(block, timeout):獲取隊(duì)列中的一條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True;
  • Queue.get_nowait():相當(dāng)Queue.get(False);
  • Queue.put(item,block,timeout):將item消息寫(xiě)入隊(duì)列,block默認(rèn)值為True;
  • Queue.put_nowait(item):相當(dāng)Queue.put(item, False);

進(jìn)程池的概念

python中,進(jìn)程池內(nèi)部會(huì)維護(hù)一個(gè)進(jìn)程序列。當(dāng)需要時(shí),程序會(huì)去進(jìn)程池中獲取一個(gè)進(jìn)程。
如果進(jìn)程池序列中沒(méi)有可供使用的進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。

multiprocessing.Pool常用函數(shù)解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調(diào)用func(并行執(zhí)行,堵塞方式必須等待上一個(gè)進(jìn)程退出才能執(zhí)行下一個(gè)進(jìn)程),args為傳遞給func的參數(shù)列表,kwds為傳遞給func的關(guān)鍵字參數(shù)列表;
  • close():關(guān)閉Pool,使其不再接受新的任務(wù);
  • terminate():不管任務(wù)是否完成,立即終止;
  • join():主進(jìn)程阻塞,等待子進(jìn)程的退出, 必須在close或terminate之后使用;

進(jìn)程池中中進(jìn)程通信Manager().Queue()

如果要使用Pool創(chuàng)建進(jìn)程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會(huì)得到一條如下的錯(cuò)誤信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

代碼

#如何使用進(jìn)程池
# from multiprocessing import Pool
# import requests

# def download_data_by_page(page,name):
#     print(page,name)
#     #http://blog.jobbole.com/all-posts/page/2/
#     full_url = 'http://blog.jobbole.com/all-posts/page/%s/' % str(page)
#     req_headers = {
#         'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
#     }

#     response = requests.get(full_url,headers=req_headers)
#     if response.status_code == 200:
#         print(response.text)
#         #可以在這里做數(shù)據(jù)解析


# if __name__ == '__main__':


#     #創(chuàng)建一個(gè)進(jìn)程池
#     process_pool = Pool(4)


#     for page in range(0,1000):
#         #往進(jìn)程池中添加任務(wù)
#         process_pool.apply_async(download_data_by_page,(1,'進(jìn)程池'))
    

#     #關(guān)閉進(jìn)程池,后面不能再添加任務(wù)了
#     process_pool.close()
#     #子進(jìn)程先執(zhí)行,執(zhí)行完畢后,再繼續(xù)執(zhí)行主進(jìn)程代碼
#     process_pool.join()


#python自帶的進(jìn)程池模塊
from concurrent.futures import ProcessPoolExecutor
import requests

def download_data_by_page(page,name):
    print(page,name)
    
    full_url = 'http://blog.jobbole.com/all-posts/page/%s/' % str(page)
    req_headers = {
        'User-Agent':'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
    }

    
    response = requests.get(full_url,headers=req_headers)
    if response.status_code == 200:
        print(response.status_code)
        #可以在這里做數(shù)據(jù)解析

        return response.text

#可以在回調(diào)函數(shù)中做數(shù)據(jù)解析
def download_done(future):
    print(future.result())


if __name__ == '__main__':
    
    #創(chuàng)建一個(gè)進(jìn)程池
    pool = ProcessPoolExecutor(4)

    
    for page in range(0,100):
        handler = pool.submit(download_data_by_page,page,'下載任務(wù)')
        handler.add_done_callback(download_done)

    pool.shutdown()
    

# python的多線程:
# 有一個(gè)全局解釋器鎖(GIL):意味著python中的多線程其實(shí)是并發(fā)的操作

# 對(duì)比線程和進(jìn)程
# 1.定義:
# 進(jìn)程:是操作系統(tǒng)分配資源和調(diào)度的基本單元
# 線程:是依賴于進(jìn)程執(zhí)行,線程是cpu執(zhí)行調(diào)度的最小單元

# 2.區(qū)別:
# 進(jìn)程是會(huì)分配資源空間,每一個(gè)進(jìn)程之間的資源不共享
# 線程不占用資源空間,線程之間的資源是共享的,為了防止資源錯(cuò)亂,我們一般加線程鎖

# 3.使用場(chǎng)景:
# 進(jìn)程一般情況下處理計(jì)算密集型任務(wù)
# 線程一般處理I/O密集型任務(wù)


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 釣盡斜陽(yáng),誰(shuí)把時(shí)光閑度, 懸起明月,辛人夜夜忙碌。 抹去繁星,笑淡破曉睏窘, 鋪展晨曦,再畫(huà)青海東吳。
    云逸1108閱讀 123評(píng)論 0 0

友情鏈接更多精彩內(nèi)容