Python的多進程模塊multiprocessing

眾所周知,Python中不存在真正的多線程,Python中的多線程是一個并發(fā)過程。如果想要并行的執(zhí)行程序,充分的利用cpu資源(cpu核心),還是需要使用多進程解決的。其中multiprocessing模塊應(yīng)該是Python中最常用的多進程模塊了。

創(chuàng)建進程

基本上multiprocessing這個模塊和threading這個模塊用法是相同的,也是可以通過函數(shù)和類創(chuàng)建進程。

""" 案例1:函數(shù)式創(chuàng)建進程 """

import multiprocessing
import time


# 進程執(zhí)行函數(shù)
def run(num):
    time.sleep(1)
    print(f'i am process{num}')


if __name__ == '__main__':
    # 獲取開始的時間戳
    start = time.time()
    # 存放進程的列表,用于阻塞進程
    process_list = list()
    # 創(chuàng)建4個進程,并將每個創(chuàng)建好的進程對象放到process_list中
    for i in range(1, 5):
        process = multiprocessing.Process(target=run, args=(i,))
        # 啟動該進程
        process.start()
        process_list.append(process)

    # 只有進程全部結(jié)束,再向下執(zhí)行
    for j in process_list:
        j.join()

    # 結(jié)束的時間戳
    end = time.time()
    # 打印該程序運行了幾秒
    print(end-start)

# i am process1
# i am process2
# i am process3
# i am process4
# 1.122110366821289

上述案例基本上就是筆者搬用了上篇文章多線程的案例,可見其使用的相似之處。導(dǎo)入multiprocessing后實例化Process就可以創(chuàng)建一個進程,參數(shù)的話也是和多線程一樣,target放置進程執(zhí)行函數(shù),args存放該函數(shù)的參數(shù)。

""" 案例2:類繼承創(chuàng)建進程 """

import multiprocessing
import time


# 繼承Process,轉(zhuǎn)變?yōu)檫M程類
class MyProcess(multiprocessing.Process):
    def __init__(self, process_id):
        # 必須實現(xiàn)Process類的init方法
        super().__init__()
        self.process_id = process_id

    # 重寫執(zhí)行函數(shù)
    def run(self):
        time.sleep(1)
        print(f'i am process{self.process_id}')


if __name__ == '__main__':
    # 獲取開始的時間戳
    start = time.time()
    # 存放進程的列表,用于阻塞進程
    process_list = list()
    # 創(chuàng)建4個進程,并將每個創(chuàng)建好的進程對象放到process_list中
    for i in range(1, 5):
        process = MyProcess(i)
        # 啟動該進程
        process.start()
        process_list.append(process)

    # 只有進程全部結(jié)束,再向下執(zhí)行
    for j in process_list:
        j.join()

    # 結(jié)束的時間戳
    end = time.time()
    # 打印該程序運行了幾秒
    print(end - start)

# i am process1
# i am process2
# i am process3
# i am process4
# 1.1184189319610596

使用類來創(chuàng)建進程也是需要先繼承multiprocessing.Process并且實現(xiàn)其init方法。

進程池

Pool可以提供指定數(shù)量的進程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求。

但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程。

""" 案例3:進程池Pool """

import multiprocessing
import time


# 進程執(zhí)行函數(shù)
def func(msg):
    print('process start...', msg)
    time.sleep(3)
    print('process end...')


if __name__ == '__main__':
    print('ready~~~~~~~~~~~~~~~~~go~~~~~')

    # 創(chuàng)建進程池,最大進程數(shù)量為3
    pool = multiprocessing.Pool(processes=3)
    for i in range(5):
        msg = f'hello {i}'
        # 以非阻塞的方式,維持執(zhí)行的進程總數(shù)為processes并在進程結(jié)束后自動添加新的進程
        pool.apply_async(func, (msg,))

    # 阻止后續(xù)任務(wù)提交到進程池
    pool.close()
    # 等待工作進程結(jié)束
    pool.join()

    print('game~~~~~~~~~~~~~~~~~over~~~~~')

# ready~~~~~~~~~~~~~~~~~go~~~~~
# process start... hello 0
# process start... hello 1
# process start... hello 2
# process end...
# process start... hello 3
# process end...
# process end...
# process end...
# game~~~~~~~~~~~~~~~~~over~~~~~

需要注意的是,在調(diào)用join方法阻塞進程前,需要先調(diào)用close方法,,否則程序會出錯。

在上述案例中,提到了非阻塞,當(dāng)把創(chuàng)建進程的方法換為pool.apply(func, (msg,))時,就會阻塞進程,出現(xiàn)下面的狀況。

# ready~~~~~~~~~~~~~~~~~go~~~~~
# process start... hello 0
# process end...
# process start... hello 1
# process end...
# process start... hello 2
# process end...
# process start... hello 3
# process end...
# process start... hello 4
# process end...
# game~~~~~~~~~~~~~~~~~over~~~~~

進程隊列

在multiprocessing模塊中還存在Queue對象,這是一個進程的安全隊列,近似queue.Queue。隊列一般也是需要配合多線程或者多進程使用。

下列案例是一個使用進程隊列實現(xiàn)的生產(chǎn)者消費者模式。

""" 案例4:基于進程隊列的生產(chǎn)者消費者模式 """

import random
import time
import multiprocessing


# 生產(chǎn)者
def producer(queue):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = f'物品{i}'
        # 入隊
        queue.put(res)
        print(f'{multiprocessing.current_process().name}生產(chǎn){res}')


# 消費者
def consumer(queue):
    while True:
        # 出隊
        res = queue.get()
        time.sleep(random.randint(1, 3))
        print(f'{multiprocessing.current_process().name}消費{res}')


if __name__ == '__main__':
    # 生產(chǎn)消費隊列
    queue = multiprocessing.Queue()
    # 創(chuàng)建生產(chǎn)進程
    process1 = multiprocessing.Process(target=producer, args=(queue,))
    # 進程名
    process1.name = 'process_1'

    # 創(chuàng)建消費進程
    process2 = multiprocessing.Process(target=consumer, args=(queue,))
    # 進程名
    process2.name = 'process_2'

    print('程序開始?。?!')
    # 啟動進程
    process1.start()
    process2.start()

# 程序開始?。?!
# process_1生產(chǎn)物品0
# process_1生產(chǎn)物品1
# process_2消費物品0
# process_1生產(chǎn)物品2
# process_2消費物品1
# process_1生產(chǎn)物品3
# ......

管道

multiprocessing支持兩種進程間的通信,其中一種便是上述案例的隊列,另一種則稱作管道。在官方文檔的描述中,multiprocessing中的隊列是基于管道實現(xiàn)的,并且擁有更高的讀寫效率。

管道可以理解為進程間的通道,使用Pipe([duplex])創(chuàng)建,并返回一個元組(conn1,conn2)。如果duplex被置為True(默認值),那么該管道是雙向的,如果duplex被置為False,那么該管道是單向的,即conn1只能用于接收消息,而conn2僅能用于發(fā)送消息。

其中conn1、conn2表示管道兩端的連接對象,每個連接對象都有send()和recv()方法。send和recv方法分別是發(fā)送和接受消息的方法。例如,可以調(diào)用conn1.send發(fā)送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經(jīng)被關(guān)閉,那么recv方法會拋出EOFError。

""" 案例5:管道 """

import multiprocessing
import random
import time


# 發(fā)送數(shù)據(jù)
def proc_send(pipe, data):
    for d in data:
        pipe.send(d)
        print(f'{multiprocessing.current_process().name}發(fā)送了數(shù)據(jù)u0z1t8os')
        time.sleep(random.random())


# 接收數(shù)據(jù)
def proc_recv(pipe):
    while True:
        print(f'{multiprocessing.current_process().name}接收了數(shù)據(jù){pipe.recv()}')
        time.sleep(random.random())


if __name__ == '__main__':
    # 創(chuàng)建管道
    pipe1, pipe2 = multiprocessing.Pipe()
    # 數(shù)據(jù)
    data = [i for i in range(10)]
    # 創(chuàng)建進程process1
    process1 = multiprocessing.Process(target=proc_send, args=(pipe1, data))
    process1.name = 'process_1'
    # 創(chuàng)建進程process1
    process2 = multiprocessing.Process(target=proc_recv, args=(pipe2,))
    process2.name = 'process_2'

    # 啟動進程
    process1.start()
    process2.start()

    # 阻塞進程
    process1.join()
    process2.join()

# process_1發(fā)送了數(shù)據(jù)0
# process_2接收了數(shù)據(jù)0
# process_1發(fā)送了數(shù)據(jù)1
# process_2接收了數(shù)據(jù)1
# process_1發(fā)送了數(shù)據(jù)2
# process_2接收了數(shù)據(jù)2
# process_1發(fā)送了數(shù)據(jù)3
# process_2接收了數(shù)據(jù)3
# ......

關(guān)于multiprocessing模塊其實還有很多實用的類和方法,由于篇幅有限(懶),筆者就先寫到這里。該模塊其實用起來很像threading模塊,像鎖對象和守護線程(進程)等multiprocessing模塊也是有的,使用方法也近乎相同。

如果想要更加詳細的了解multiprocessing模塊,請參考官方文檔。

# multiprocessing --- 基于進程的并行
https://docs.python.org/zh-cn/3/library/multiprocessing.html
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容