多個(gè)進(jìn)程讀寫一個(gè)文件,怎樣不出岔子?(進(jìn)程鎖、代理進(jìn)程、進(jìn)程通信)

Dec. 18th, 2018

題圖,多進(jìn)程

案例故事:

我是一個(gè)快樂的菜鳥程序員,每天寫著簡單的代碼。聽說過多進(jìn)程/多線程,但是一直覺得是很高深的技術(shù),遲遲不敢涉足。慢慢地,我開始不滿足于單進(jìn)程代碼的速度。很多時(shí)候,我的任務(wù)是檢索一個(gè)數(shù)據(jù)庫,對里面每一條數(shù)據(jù)進(jìn)行簡單處理并輸出一些結(jié)果。對每一條數(shù)據(jù)的處理都是相互獨(dú)立的,和其他數(shù)據(jù)完全沒有關(guān)系。這時(shí)我開始思考或許把不同數(shù)據(jù)交給不同線程操作,會(huì)大大加快程序運(yùn)行速度。

事實(shí)證明我是對的,而且利用Python的multiprocessing模塊,并發(fā)地執(zhí)行多個(gè)獨(dú)立任務(wù)是個(gè)非常簡單的事情。你要做的,只是將需要執(zhí)行的任務(wù)做成函數(shù),用一個(gè)multiprocessing.Pool就可以了。(當(dāng)然multiprocessing.Process也可以實(shí)現(xiàn)并發(fā)。具體請參照這個(gè)模塊的文檔。如果有需求,我也可以寫一點(diǎn)multiprocessing包的入門教程。嗯,如果有需求的話。。。)我非常開心,感覺自己已經(jīng)是一頭具備了多線程編程能力的程序猿了!并發(fā)編程簡直就是小菜一碟!直到我的每一個(gè)進(jìn)程都需要往同一個(gè)文件里寫入數(shù)據(jù)。。。

一個(gè)簡單的并發(fā)程序,各個(gè)進(jìn)程/線程之間相互獨(dú)立,無需信息交流,也無需同步。這樣的程序往往很簡單,因?yàn)槊恳粋€(gè)進(jìn)程只需要執(zhí)行自己的任務(wù),并不需要意識到其它進(jìn)程的存在。但好景不長,很快,我就開始面臨進(jìn)程通信的問題。比如,很多時(shí)候我們需要不同的進(jìn)程對同一個(gè)文件進(jìn)行寫入,或者對同一個(gè)文件進(jìn)行更新操作。這時(shí)候,如果每個(gè)進(jìn)程太無視其他進(jìn)程的操作,就有可能發(fā)生意想不到的事情。

本文以多個(gè)進(jìn)程讀寫同一個(gè)文件為例,探索兩種常見的并發(fā)編程的概念,一個(gè)是進(jìn)程鎖,一個(gè)是進(jìn)程間通信。

進(jìn)程鎖

進(jìn)程鎖的邏輯概念

進(jìn)程鎖的概念很簡單。有一些操作(比如寫入文檔),需要保證每次只能有一個(gè)進(jìn)程在運(yùn)行,直到其運(yùn)行結(jié)束。這樣以來,每個(gè)進(jìn)程要執(zhí)行這個(gè)操作的時(shí)候,先要查看是否有其他進(jìn)程在執(zhí)行改操作。如果有,就等待那個(gè)進(jìn)程完成它的操作。如何實(shí)現(xiàn)這個(gè)想法呢?不難想到,一個(gè)進(jìn)程間的全局變量就可以做到。為了簡化問題,我們考慮一個(gè)整數(shù)類型的全局變量(我們管它叫“current_writing_proc”),每個(gè)進(jìn)程都可以訪問。如何用一個(gè)整數(shù)來給多進(jìn)程寫入同一個(gè)文件的任務(wù)做一個(gè)進(jìn)程鎖呢?我們可以做以下規(guī)定:

  1. 初始狀態(tài)下,沒有任何進(jìn)程訪問文件。此時(shí)current_writing_proc = 0.
  2. 當(dāng)某個(gè)進(jìn)程試圖訪問文件時(shí),它先要檢查current_writing_prod變量的值。如果該值為0, 則表示目前沒有任何進(jìn)程訪問文件。如果其值不為0,則表示有程序正在占有文件,其必須等待該值重新變?yōu)?才可以訪問文件。
  3. 假設(shè)一個(gè)進(jìn)程等到了current_writing_proc == 0,即此時(shí)文件沒有被任何進(jìn)程訪問的時(shí)候。那此進(jìn)程可以開始它的操作:首先要將此變量值修改為自己的"process id" (在Python中,可以使用os.getpid函數(shù)獲取當(dāng)前進(jìn)程的進(jìn)程id),然后對文件進(jìn)行操作。
  4. 當(dāng)該進(jìn)程完成操作后,要將current_writing_proc重新設(shè)為0,以使得其他進(jìn)程可以獲得權(quán)限來操作文件。

總的來說,該變量存儲著當(dāng)前擁有文件操作權(quán)限的進(jìn)程ID號。此模型雖然簡單,但是已經(jīng)具備了進(jìn)程鎖的幾大要素:等待進(jìn)程鎖(第2步),獲取進(jìn)程鎖(第3步),釋放進(jìn)程鎖(第4步)。

為何一般的全局變量不能用來作進(jìn)程鎖?

有同學(xué)可能會(huì)問,既然進(jìn)程鎖這么簡單,為何multiprocessing包要專門實(shí)現(xiàn)Lock這個(gè)類?我自己定義一個(gè)整形全局變量不就好了?

可達(dá)鴨眉頭一皺,發(fā)現(xiàn)事情并不簡單

這里涉及到多進(jìn)程的一個(gè)核心問題:當(dāng)一個(gè)進(jìn)程產(chǎn)生(fork)出多個(gè)進(jìn)程時(shí),可以認(rèn)為,其所有變量都會(huì)被拷貝,每個(gè)進(jìn)程擁有一個(gè)這個(gè)變量的副本。假設(shè)我們在父進(jìn)程中定義一個(gè)全局變量current_writing_proc。緊接著,我們創(chuàng)建10個(gè)子進(jìn)程,那么基本上,變量current_writing_proc會(huì)被復(fù)制出10個(gè)副本,每個(gè)子進(jìn)程中擁有一個(gè)。既然是副本,那么進(jìn)程A中修改該變量的值,將不會(huì)影響到進(jìn)程B中的值。換句話說,進(jìn)程B將無法通過這個(gè)變量得知進(jìn)程A的任何信息。

這跟我們定義此變量的初衷是違背的。為了實(shí)現(xiàn)所有進(jìn)程間共享一個(gè)變量的值,我們就要用到進(jìn)程通信。當(dāng)這個(gè)變量值被修改的時(shí)候,進(jìn)程需要通過進(jìn)程間的通信渠道,告知其他進(jìn)程這個(gè)變量值被修改了。multiprocessing包里的Lock變量就是通過一個(gè)進(jìn)程通信機(jī)制保證了每一個(gè)鎖都是真正意義上的進(jìn)程級別的全局變量,從而實(shí)現(xiàn)上面提到的邏輯。(當(dāng)然,Lock里還有一些進(jìn)程鎖操作合法性的代碼。有興趣的同學(xué)不妨研究一下Python的multiprocessing的官方文檔。)

注意:這里的討論只適用于多進(jìn)程,不適用于多線程。同一個(gè)進(jìn)程的不同線程之間,可以方便的共享內(nèi)存。這并不需要通過其他通信手段來完成。這也是為什么多線程編程往往比多進(jìn)程編程更簡單。若不是Python中有一個(gè)全局線程鎖(Global Interpretator Lock), 多線程編程在Python世界里應(yīng)該遠(yuǎn)比現(xiàn)在更受歡迎。

簡單的進(jìn)程鎖文件寫入代碼

有了進(jìn)程鎖,我們展示一個(gè)簡單的文檔寫入的代碼。


from multiprocessing import Pool, Lock

def write_with_lock(lock, filename, s): # 在獲取lock后,將字符串s的內(nèi)容寫入到文件filename中。
    lock.acquire() # 等待獲取進(jìn)程鎖。
    # 執(zhí)行文件操作。比如:
    open(filename, 'a').write(s) # 為了保證程序流暢運(yùn)行,應(yīng)使獲取到釋放進(jìn)程鎖中間的代碼盡量簡潔。
    lock.release() #完成操作后,釋放進(jìn)程鎖
    # 執(zhí)行其他不需要進(jìn)程鎖的代碼
    
def main():
    filename = "hello.txt"
    s = "<你需要寫入的字符串>"
    lock = Lock()
    pool = Pool(processes=20) # 創(chuàng)建20個(gè)進(jìn)程
    pool.starmap(write_with_lock, [[lock, filename, s]]*20) # 每個(gè)進(jìn)程都執(zhí)行write_with_lock函數(shù)。
    pool.close()
    pool.join()

文件操作的代理進(jìn)程

有些時(shí)候,我們會(huì)創(chuàng)建很多進(jìn)程來執(zhí)行一些計(jì)算任務(wù)。每個(gè)進(jìn)程都會(huì)偶爾對一個(gè)文件進(jìn)行一些寫入操作,跟花在計(jì)算上的時(shí)間相比,這些文件寫入發(fā)生的頻率并不大(比如每個(gè)進(jìn)程執(zhí)行自己的任務(wù),只有發(fā)生某些錯(cuò)誤時(shí)才會(huì)將該錯(cuò)誤記錄到某個(gè)日志文件中)。在這種情況下,可以考慮用一個(gè)進(jìn)程來完成所有的文件讀寫操作。其他的任務(wù)進(jìn)程,只需要將其需要寫入的內(nèi)容發(fā)送給這個(gè)負(fù)責(zé)文件讀寫的進(jìn)程,由它代為操作未見就可以了。這樣的進(jìn)程,我們稱它為文件操作代理進(jìn)程,簡稱“代理進(jìn)程”。其他的進(jìn)程,我們稱它們?yōu)椤叭蝿?wù)進(jìn)程”。(注意:該命名只是方便在本文中討論問題時(shí)容易區(qū)分,并不一定是標(biāo)準(zhǔn)的命名,請勿對名字當(dāng)真。)

由于代理進(jìn)程和任務(wù)進(jìn)程之間要進(jìn)行通信,這里我們先講一下進(jìn)程間通信隊(duì)列。

進(jìn)程間通信,消息隊(duì)列

如何從進(jìn)程A向進(jìn)程B發(fā)送消息?想象在兩個(gè)進(jìn)程之間架設(shè)一根單向管道。進(jìn)程A可以將消息通過管道發(fā)給進(jìn)程B。在這個(gè)過程中,A最先發(fā)出去的消息肯定最先到達(dá)B,后發(fā)出去的消息后到達(dá)B。程序員們管這種“先到先得”的數(shù)據(jù)結(jié)構(gòu)叫做隊(duì)列。

Python的multiprocessing.Queuemultiprocessing.SimpleQueue都是實(shí)現(xiàn)進(jìn)程間消息隊(duì)列的類。通過調(diào)用這兩個(gè)類的putget函數(shù),就可以向隊(duì)列中發(fā)送數(shù)據(jù),或者從隊(duì)列中獲取數(shù)據(jù)。具體例子見下一小節(jié)的演示。

利用代理進(jìn)程的思想建立多進(jìn)程文檔讀寫模型

接下來,我們就利用代理進(jìn)程的思想,來建立一個(gè)多進(jìn)程任務(wù)中文檔讀寫的邏輯模型。

  1. 建立代理進(jìn)程。該進(jìn)程函數(shù)有兩個(gè)參數(shù):文件寫入隊(duì)列writing_queue和目標(biāo)文件名filename。該進(jìn)程用一個(gè)死循環(huán)不斷從隊(duì)列writing_queue中獲取消息。獲取的數(shù)據(jù)存入變量s中。如果s是整數(shù)0,則表示收到進(jìn)程結(jié)束信號。代理進(jìn)程將跳出死循環(huán),結(jié)束運(yùn)行。如果s是個(gè)字符串,則代理進(jìn)程打開文件filename,將s寫入文件中。
  2. 建立任務(wù)進(jìn)程。任務(wù)進(jìn)程執(zhí)行某個(gè)任務(wù),在需要向文件寫入信息時(shí),將其需要寫入的信息封裝成字符串,投入到writing_queue隊(duì)列中。
  3. 當(dāng)所有任務(wù)進(jìn)程都結(jié)束運(yùn)行,并入住進(jìn)程之后,主進(jìn)程需要告知代理進(jìn)程結(jié)束工作。按照步驟1中的規(guī)定,主進(jìn)程只需往writing_queue隊(duì)列中投入一個(gè)整數(shù)0,并且等待代理進(jìn)程結(jié)束即可。

Python的代碼實(shí)現(xiàn)

最后這個(gè)小節(jié),介紹如何用Python實(shí)現(xiàn)上一節(jié)的模型。首先,我們來編寫代理進(jìn)程的任務(wù)代碼。


def writing_proc(writing_queue, filename):
    while True: # 開啟處理消息的死循環(huán)。直到接收到終止消息(數(shù)字0)方才跳出。
        s = get()
        if isinstance(s, str):
            open(filename, 'a').write(s)
        elif isinstance(s, int) and s == 0:
            break
        else:
            continue # 忽略掉錯(cuò)誤格式的消息。
        

接下來是任務(wù)進(jìn)程代碼。


def task_proc(writing_queue):
    # 執(zhí)行其自己的任務(wù)。當(dāng)需要寫入文件時(shí):
    s = "需要寫入的字符串"
    writing_queue.put(s)
    # 任務(wù)的其他代碼
    

可以看到,任務(wù)進(jìn)程中寫入文件的操作非常簡單,只需將要寫入的字符串put到寫入隊(duì)列中就可以了。真正的文件寫入是上面的代理進(jìn)程的工作。最后,給出主進(jìn)程的代碼:

 # don't forget to import Process, Manager (to use the Queue object) and Pool (if you like) from multiprocessing.
 
def main():
    m = Manager() # 從這個(gè)類中獲取Queue類。(見下面討論)
    writing_queue = m.Queue()
    filename = "log.txt"
    p_write = Process(writing_proc, args=(writing_queue, filename))
    p_write.start()
    p_task_list = [] # 這里演示用Process而非Pool建立任務(wù)進(jìn)程
    for i in range(20):
        p = Process(task_proc, args=(writing_queue,))
        p_task_list.append(p)
        p.start()
    
    # 主進(jìn)程開啟代理進(jìn)程和所有任務(wù)進(jìn)程后,執(zhí)行其自己的操作,然后等待任務(wù)進(jìn)程結(jié)束
    for p in p_task_list:
        p.join()
        
    # 現(xiàn)在任務(wù)進(jìn)程全部結(jié)束。但代理進(jìn)程還在死循環(huán)中。
    # 主進(jìn)程需要告知代理進(jìn)程結(jié)束運(yùn)行。
    writing_queue.put(0)
    p_write.join() # 等待代理進(jìn)程結(jié)束。
    
    # 現(xiàn)在所有子進(jìn)程均已完成工作。主進(jìn)程可以繼續(xù)執(zhí)行其他代碼,或者退出程序。
    
        

上面代碼中提到了要使用multiprocessing.Manager來創(chuàng)建Queue,而不是直接從multiprocess中引入Queue,即

from multiprocessing import Queue # 有可能引發(fā)錯(cuò)誤。

直接使用這一行代碼,會(huì)引發(fā)錯(cuò)誤:Queue can only be used through inheritance. 錯(cuò)誤提示表示,Python只允許不同的進(jìn)程通過集成同一個(gè)基類(Process)的方式使用Queue。也就是說,我們需要給每一個(gè)任務(wù)都創(chuàng)建一個(gè)進(jìn)程類,而非一個(gè)簡單的進(jìn)程函數(shù)。其實(shí),這確實(shí)是一個(gè)很好的編碼習(xí)慣。特別是編寫比較大的程序時(shí),給每一個(gè)任務(wù)定義一個(gè)進(jìn)程類會(huì)讓代碼的組織更清晰,更便于管理和重用。但是這里,我們只做簡單的演示,固沒有定義任務(wù)類,而是使用任務(wù)函數(shù)。為了避免出現(xiàn)錯(cuò)誤,我們就引入Manager類,然后從Manager對象中引入Queue類,見下面代碼。

from multiprocessing import Manager

m = Manager()
queue = m.Queue()

總結(jié)

本文通過”多進(jìn)程同時(shí)對同一個(gè)文件進(jìn)行讀寫“這個(gè)問題,樸素講解了進(jìn)程鎖、進(jìn)程通信和進(jìn)程間通信的概念,展示了用不同方式構(gòu)建出的不同的并發(fā)執(zhí)行模型。本人的經(jīng)驗(yàn)是,當(dāng)在進(jìn)行并發(fā)編程時(shí),在開始寫代碼之前,先設(shè)計(jì)一下程序的并發(fā)執(zhí)行模型是很重要的。這個(gè)模型可以類似于上面幾個(gè)小節(jié)中的列表,規(guī)定進(jìn)程間通信的格式,以及規(guī)定不同進(jìn)程對統(tǒng)一資源進(jìn)行操作的先后步驟。把這個(gè)模型搞清楚之后,再去考慮寫代碼實(shí)現(xiàn)這個(gè)模型。

感謝閱讀,歡迎在留言區(qū)討論,或者談一下你在設(shè)計(jì)并發(fā)程序時(shí)遇到的問題。Bye!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容