Python多進程運行——Multiprocessing基礎(chǔ)教程2

上篇文章簡單介紹了multiprocessing模塊,本文將要介紹進程之間的數(shù)據(jù)共享和信息傳遞的概念。

1 數(shù)據(jù)共享

在多進程處理中,所有新創(chuàng)建的進程都會有這兩個特點:獨立運行,有自己的內(nèi)存空間。

我們來舉個例子展示一下:

import multiprocessing 

# empty list with global scope 
result = [] 

def square_list(mylist): 
    global result 
    # append squares of mylist to global list result 
    for num in mylist: 
        result.append(num * num) 
    # print global list result 
    print("Result(in process p1): {}".format(result)) 

if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 

    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist,)) 
    # starting process 
    p1.start() 
    # wait until process is finished 
    p1.join() 

    # print global result list 
    print("Result(in main program): {}".format(result)) 

這個程序的輸出結(jié)果是:

Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

在上面的程序中我們嘗試在兩個地方打印全局列表result的內(nèi)容:

  • square_list()函數(shù)中,由于這個函數(shù)是由進程p1調(diào)用的,所以result列表只在進程p1的內(nèi)存空間中更改。
  • 在主程序中的p1進程完成后。由于主程序由不同的進程運行,它的內(nèi)存空間中的result列表仍然是空的。

我們再用一張圖來幫助理解記憶不同進程間的數(shù)據(jù)關(guān)系:

圖1 進程間數(shù)據(jù)關(guān)系.png

1.1 內(nèi)存共享

如果程序需要在不同的進程之間共享一些數(shù)據(jù)的話,該怎么做呢?不用擔心,multiprocessing模塊提供了Array對象和Value對象,用來在進程之間共享數(shù)據(jù)。

所謂Array對象和Value對象分別是指從共享內(nèi)存中分配的ctypes數(shù)組和對象。我們直接來看一個例子,展示如何用Array對象和Value對象在進程之間共享數(shù)據(jù):

import multiprocessing 
  
def square_list(mylist, result, square_sum): 
    # append squares of mylist to result array 
    for idx, num in enumerate(mylist): 
        result[idx] = num * num 
        
    # square_sum value 
    square_sum.value = sum(result) 
    
    # print result Array 
    print("Result(in process p1): {}".format(result[:])) 
    
    # print square_sum Value 
    print("Sum of squares(in process p1): {}".format(square_sum.value)) 
    
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
    
    # creating Array of int data type with space for 4 integers 
    result = multiprocessing.Array('i', 4) 
    
    # creating Value of int data type 
    square_sum = multiprocessing.Value('i') 
    
    # creating new process 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum)) 
    
    # starting process 
    p1.start() 
    
    # wait until process is finished 
    p1.join() 
    
    # print result array 
    print("Result(in main program): {}".format(result[:])) 
    
    # print square_sum Value 
    print("Sum of squares(in main program): {}".format(square_sum.value)) 

程序輸出的結(jié)果如下:

Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30

成功了!主程序和p1進程輸出了同樣的結(jié)果,說明程序中確實完成了不同進程間的數(shù)據(jù)共享。那么我們來詳細看一下上面的程序做了什么:

在主程序中我們首先創(chuàng)建了一個Array對象:

result = multiprocessing.Array('i', 4)

向這個對象輸入的第一個參數(shù)是數(shù)據(jù)類型:i表示整數(shù),d代表浮點數(shù)。第二個參數(shù)是數(shù)組的大小,在這個例子中我們創(chuàng)建了包含4個元素的數(shù)組。

類似的,我們創(chuàng)建了一個Value對象:

square_sum = multiprocessing.Value('i')

我們只對Value對象輸入了一個參數(shù),那就是數(shù)據(jù)類型,與上述的方法一致。當然,我們還可以對其指定一個初始值(比如10),就像這樣:

square_sum = multiprocessing.Value('i', 10)

隨后,我們在創(chuàng)建進程對象時,將剛創(chuàng)建好的兩個對象:result和square_sum作為參數(shù)輸入給進程:

p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))

在函數(shù)中result元素通過索引進行數(shù)組賦值,square_sum通過value屬性進行賦值。

注意:為了完整打印result數(shù)組的結(jié)果,需要使用result[:]進行打印,而square_sum也需要使用value屬性進行打印:

print("Result(in process p1): {}".format(result[:])) 
print("Sum of squares(in process p1): {}".format(square_sum.value)) 

1.2 服務(wù)器進程

每當python程序啟動時,同時也會啟動一個服務(wù)器進程。隨后,只要我們需要生成一個新進程,父進程就會連接到服務(wù)器并請求它派生一個新進程。這個服務(wù)器進程可以保存Python對象,并允許其他進程使用代理來操作它們。

multiprocessing模塊提供了能夠控制服務(wù)器進程的Manager類。所以,Manager類也提供了一種創(chuàng)建可以在不同流程之間共享的數(shù)據(jù)的方法。

服務(wù)器進程管理器比使用共享內(nèi)存對象更靈活,因為它們可以支持任意對象類型,如列表、字典、隊列、值、數(shù)組等。此外,單個管理器可以由網(wǎng)絡(luò)上不同計算機上的進程共享。

但是,服務(wù)器進程管理器的速度比使用共享內(nèi)存要慢。

讓我們來看一個例子:

import multiprocessing 
  
def print_records(records): 
    for record in records: 
        print("Name: {0}\nScore: {1}\n".format(record[0], record[1])) 
        
def insert_record(record, records): 
    records.append(record) 
    print("New record added!\n") 
    
if __name__ == '__main__': 
    with multiprocessing.Manager() as manager: 
        # creating a list in server process memory 
        records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)]) 
        # new record to be inserted in records 
        new_record = ('Jeff', 8) 
        
        # creating new processes 
        p1 = multiprocessing.Process(target=insert_record, args=(new_record, records)) 
        p2 = multiprocessing.Process(target=print_records, args=(records,)) 
        
        # running process p1 to insert new record 
        p1.start() 
        p1.join() 
        
        # running process p2 to print records 
        p2.start() 
        p2.join() 

這個程序的輸出結(jié)果是:

New record added!

Name: Sam
Score: 10

Name: Adam
Score: 9

Name: Kevin
Score: 9

Name: Jeff
Score: 8

我們來理解一下這個程序做了什么:首先我們創(chuàng)建了一個manager對象

with multiprocessing.Manager() as manager:

在with語句下的所有行,都是在manager對象的范圍內(nèi)的。接下來我們使用這個manager對象創(chuàng)建了列表(類似的,我們還可以用manager.dict()創(chuàng)建字典)。

最后我們創(chuàng)建了進程p1(用于在records列表中插入一條新的record)和p2(將records打印出來),并將records作為參數(shù)進行傳遞。

服務(wù)器進程的概念再次用下圖總結(jié)一下:

圖2 進程間數(shù)據(jù)共享.png

2 數(shù)據(jù)傳遞

為了能使多個流程能夠正常工作,常常需要在它們之間進行一些通信,以便能夠劃分工作并匯總最后的結(jié)果。multiprocessing模塊支持進程之間的兩種通信通道:Queue和Pipe。

2.1 Queue

使用隊列來回處理多進程之間的通信是一種比較簡單的方法。任何Python對象都可以使用隊列進行傳遞。我們來看一個例子:

import multiprocessing 
  
def square_list(mylist, q): 
    # append squares of mylist to queue 
    for num in mylist: 
        q.put(num * num) 
        
def print_queue(q): 
    print("Queue elements:") 
    while not q.empty(): 
        print(q.get()) 
    print("Queue is now empty!") 
    
if __name__ == "__main__": 
    # input list 
    mylist = [1,2,3,4] 
    
    # creating multiprocessing Queue 
    q = multiprocessing.Queue() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q)) 
    p2 = multiprocessing.Process(target=print_queue, args=(q,)) 
    
    # running process p1 to square list 
    p1.start() 
    p1.join() 
    
    # running process p2 to get queue elements 
    p2.start() 
    p2.join() 

上面這個程序的輸出結(jié)果是:

Queue elements:
1
4
9
16
Queue is now empty!

我們來看一下上面這個程序到底做了什么。首先我們創(chuàng)建了一個Queue對象:

q = multiprocessing.Queue()

然后,將這個空的Queue對象輸入square_list函數(shù)。該函數(shù)會將列表中的數(shù)平方,再使用put()方法放入隊列中:

q.put(num * num)

隨后使用get()方法,將q打印出來,直至q重新稱為一個空的Queue對象:

while not q.empty():
    print(q.get())

我們還是用一張圖來幫助理解記憶:

圖3 用Queue完成進程間數(shù)據(jù)傳輸.png

2.2 Pipe

一個Pipe對象只能有兩個端點。因此,當進程只需要雙向通信時,它會比Queue對象更好用。

multiprocessing模塊提供了Pipe()函數(shù),該函數(shù)返回由管道連接的一對連接對象。Pipe()返回的兩個連接對象分別表示管道的兩端。每個連接對象都有send()recv()方法。

我們來看一個例子:

import multiprocessing 
  
def sender(conn, msgs): 
    for msg in msgs: 
        conn.send(msg) 
        print("Sent the message: {}".format(msg)) 
    conn.close() 
    
def receiver(conn): 
    while 1: 
        msg = conn.recv() 
        if msg == "END": 
            break
        print("Received the message: {}".format(msg)) 
        
if __name__ == "__main__": 
    # messages to be sent 
    msgs = ["hello", "hey", "hru?", "END"] 
    
    # creating a pipe 
    parent_conn, child_conn = multiprocessing.Pipe() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs)) 
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) 
    
    # running processes 
    p1.start() 
    p2.start() 
    
    # wait until processes finish 
    p1.join() 
    p2.join() 

上面這個程序的輸出結(jié)果是:

Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?

我們還是來看一下這個程序到底做了什么。首先創(chuàng)建了一個Pipe對象:

parent_conn, child_conn = multiprocessing.Pipe()

與上文說的一樣,該對象返回了一對管道兩端的兩個連接對象。然后使用send()方法和recv()方法進行信息的傳遞。就這么簡單。在上面的程序中,我們從一端向另一端發(fā)送一串消息。在另一端,我們收到消息,并在收到END消息時退出。

要注意的是,如果兩個進程(或線程)同時嘗試從管道的同一端讀取或?qū)懭牍艿乐械臄?shù)據(jù),則管道中的數(shù)據(jù)可能會損壞。不過不同的進程同時使用管道的兩端是沒有問題的。還要注意,Queue對象在進程之間進行了適當?shù)耐?,但代價是增加了計算復(fù)雜度。因此,Queue對象對于線程和進程是相對安全的。

最后我們還是用一張圖來示意:

圖4 用Pipe完成進程間數(shù)據(jù)傳輸.png

Python的multiprocessing模塊還剩最后一篇文章:多進程的同步與池化

敬請期待啦!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容