每周一個(gè) Python 模塊 | multiprocessing

multiprocessing 是 Python 的標(biāo)準(zhǔn)模塊,它既可以用來(lái)編寫多進(jìn)程,也可以用來(lái)編寫多線程。如果是多線程的話,用 multiprocessing.dummy 即可,用法與 multiprocessing 基本相同。

基礎(chǔ)

利用 multiprocessing.Process 對(duì)象可以創(chuàng)建一個(gè)進(jìn)程,Process 對(duì)象與 Thread 對(duì)象的用法相同,也有 start(), run(), join() 等方法。Process 類適合簡(jiǎn)單的進(jìn)程創(chuàng)建,如需資源共享可以結(jié)合 multiprocessing.Queue 使用;如果想要控制進(jìn)程數(shù)量,則建議使用進(jìn)程池 Pool 類。

Process 介紹:

構(gòu)造方法:

  • Process([group [, target [, name [, args [, kwargs]]]]])
  • group: 線程組,目前還沒(méi)有實(shí)現(xiàn),庫(kù)引用中提示必須是 None;
  • target: 要執(zhí)行的方法;
  • name: 進(jìn)程名;
  • args/kwargs: 要傳入方法的參數(shù)。

實(shí)例方法:

  • is_alive():返回進(jìn)程是否在運(yùn)行。
  • join([timeout]):阻塞當(dāng)前上下文環(huán)境的進(jìn)程程,直到調(diào)用此方法的進(jìn)程終止或到達(dá)指定的 timeout(可選參數(shù))。
  • start():進(jìn)程準(zhǔn)備就緒,等待 CPU 調(diào)度。
  • run():strat() 調(diào)用 run 方法,如果實(shí)例進(jìn)程時(shí)未制定傳入 target,start 執(zhí)行默認(rèn) run() 方法。
  • terminate():不管任務(wù)是否完成,立即停止工作進(jìn)程。

屬性:

  • authkey
  • daemon:和線程的 setDeamon 功能一樣(將父進(jìn)程設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束時(shí),子進(jìn)程也結(jié)束)。
  • exitcode(進(jìn)程在運(yùn)行時(shí)為 None、如果為 –N,表示被信號(hào) N 結(jié)束)。
  • name:進(jìn)程名字。
  • pid:進(jìn)程號(hào)。

下面看一個(gè)簡(jiǎn)單的例子:

import multiprocessing


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

# 輸出
# Worker
# Worker
# Worker
# Worker
# Worker        

輸出結(jié)果是打印了五次 Worker,我們并不知道哪個(gè) Worker 是由哪個(gè)進(jìn)程打印的,具體取決于執(zhí)行順序,因?yàn)槊總€(gè)進(jìn)程都在競(jìng)爭(zhēng)訪問(wèn)輸出流。

那怎樣才能知道具體執(zhí)行順序呢?可以通過(guò)給進(jìn)程傳參來(lái)實(shí)現(xiàn)。與 threading 不同,傳遞給 multiprocessing Process 的參數(shù)必需是可序列化的,來(lái)看一下代碼:

import multiprocessing


def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
        
# 輸出
# Worker: 1
# Worker: 0
# Worker: 2
# Worker: 3
# Worker: 4

可導(dǎo)入的目標(biāo)函數(shù)

threading 和 multiprocessing 的一處區(qū)別是在 __main__ 中使用時(shí)的額外保護(hù)。由于進(jìn)程已經(jīng)啟動(dòng),子進(jìn)程需要能夠?qū)氚繕?biāo)函數(shù)的腳本。在 __main__ 中包裝應(yīng)用程序的主要部分,可確保在導(dǎo)入模塊時(shí)不會(huì)在每個(gè)子項(xiàng)中遞歸運(yùn)行它。另一種方法是從單獨(dú)的腳本導(dǎo)入目標(biāo)函數(shù)。例如:multiprocessing_import_main.py使用在第二個(gè)模塊中定義的 worker 函數(shù)。

# multiprocessing_import_main.py 
import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()
        
# 輸出
# Worker
# Worker
# Worker
# Worker
# Worker

worker 函數(shù)定義于multiprocessing_import_worker.py

# multiprocessing_import_worker.py 
def worker():
    """worker function"""
    print('Worker')
    return

確定當(dāng)前進(jìn)程

傳參來(lái)識(shí)別或命名進(jìn)程非常麻煩,也不必要。每個(gè)Process實(shí)例都有一個(gè)名稱,其默認(rèn)值可以在創(chuàng)建進(jìn)程時(shí)更改。命名進(jìn)程對(duì)于跟蹤它們非常有用,尤其是在同時(shí)運(yùn)行多種類型進(jìn)程的應(yīng)用程序中。

import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()
    
# output
# worker 1 Starting
# worker 1 Exiting
# Process-3 Starting
# Process-3 Exiting
# my_service Starting
# my_service Exiting

守護(hù)進(jìn)程

默認(rèn)情況下,在所有子進(jìn)程退出之前,主程序不會(huì)退出。有些時(shí)候,啟動(dòng)后臺(tái)進(jìn)程運(yùn)行而不阻止主程序退出是有用的,例如為監(jiān)視工具生成“心跳”的任務(wù)。

要將進(jìn)程標(biāo)記為守護(hù)程序很簡(jiǎn)單,只要將daemon屬性設(shè)置為 True 就可以了。

import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()
    
# output
# Starting: daemon 41838
# Starting: non-daemon 41841
# Exiting : non-daemon 41841

輸出不包括來(lái)自守護(hù)進(jìn)程的“退出”消息,因?yàn)樗蟹鞘刈o(hù)進(jìn)程(包括主程序)在守護(hù)進(jìn)程從兩秒休眠狀態(tài)喚醒之前退出。

守護(hù)進(jìn)程在主程序退出之前自動(dòng)終止,這避免了孤立進(jìn)程的運(yùn)行。這可以通過(guò)查找程序運(yùn)行時(shí)打印的進(jìn)程 ID 值來(lái)驗(yàn)證,然后使用 ps 命令檢查該進(jìn)程。

等待進(jìn)程

要等到進(jìn)程完成其工作并退出,請(qǐng)使用 join()方法。

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()
    
# output
# Starting: non-daemon
# Exiting : non-daemon
# Starting: daemon
# Exiting : daemon

由于主進(jìn)程使用 join() 等待守護(hù)進(jìn)程退出,因此此時(shí)將打印“退出”消息。

默認(rèn)情況下,join()無(wú)限期地阻止。也可以傳遞一個(gè)超時(shí)參數(shù)(一個(gè)浮點(diǎn)數(shù)表示等待進(jìn)程變?yōu)榉腔顒?dòng)狀態(tài)的秒數(shù))。如果進(jìn)程未在超時(shí)期限內(nèi)完成,則join()無(wú)論如何都要返回。

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()
    
# output
# Starting: non-daemon
# Exiting : non-daemon
# d.is_alive() True

由于傳遞的超時(shí)時(shí)間小于守護(hù)進(jìn)程休眠的時(shí)間,因此join()返回后進(jìn)程仍處于“活動(dòng)”狀態(tài)。

終止進(jìn)程

如果想讓一個(gè)進(jìn)程退出,最好使用「poison pill」方法向它發(fā)送信號(hào),如果進(jìn)程出現(xiàn)掛起或死鎖,那么強(qiáng)制終止它是有用的。 調(diào)用 terminate() 來(lái)殺死子進(jìn)程。

import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())
    
# output
# BEFORE: <Process(Process-1, initial)> False
# DURING: <Process(Process-1, started)> True
# TERMINATED: <Process(Process-1, started)> True
# JOINED: <Process(Process-1, stopped[SIGTERM])> False

在終止它之后對(duì)該進(jìn)程使用 join() 很重要,可以為進(jìn)程管理代碼提供時(shí)間來(lái)更新對(duì)象狀態(tài),用以反映終止效果。

處理退出狀態(tài)

可以通過(guò)exitcode屬性訪問(wèn)進(jìn)程退出時(shí)生成的狀態(tài)代碼。允許的范圍列于下表中。

退出代碼 含義
== 0 沒(méi)有產(chǎn)生錯(cuò)誤
> 0 該進(jìn)程出錯(cuò),并退出該代碼
< 0 這個(gè)過(guò)程被一個(gè)信號(hào)殺死了 -1 * exitcode
import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('There was an error!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
        
# output
# Starting process for exit_error
# Starting process for exit_ok
# Starting process for return_value
# Starting process for raises
# Starting process for terminated
# Process raises:
# Traceback (most recent call last):
#   File ".../lib/python3.6/multiprocessing/process.py", line 258,
# in _bootstrap
#     self.run()
#   File ".../lib/python3.6/multiprocessing/process.py", line 93,
# in run
#     self._target(*self._args, **self._kwargs)
#   File "multiprocessing_exitcode.py", line 28, in raises
#     raise RuntimeError('There was an error!')
# RuntimeError: There was an error!
#      exit_error.exitcode = 1
#         exit_ok.exitcode = 0
#    return_value.exitcode = 0
#          raises.exitcode = 1
#      terminated.exitcode = -15

記錄日志

在調(diào)試并發(fā)問(wèn)題時(shí),訪問(wèn) multiprocessing 對(duì)象的內(nèi)部結(jié)構(gòu)很有用。有一個(gè)方便的模塊級(jí)功能來(lái)啟用被調(diào)用的日志,叫 log_to_stderr()。它使用logging并添加處理程序來(lái)設(shè)置記錄器對(duì)象 ,以便將日志消息發(fā)送到標(biāo)準(zhǔn)錯(cuò)誤通道。

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
    
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-1] running the remaining "atexit" finalizers
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
# [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
# [DEBUG/MainProcess] running the remaining "atexit" finalizers

默認(rèn)情況下,日志記錄級(jí)別設(shè)置為NOTSET不生成任何消息。傳遞不同的級(jí)別以將記錄器初始化為所需的詳細(xì)程度。

要直接操作記錄器(更改其級(jí)別設(shè)置或添加處理程序),請(qǐng)使用get_logger()。

import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
    
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down    

子類化過(guò)程

雖然在單獨(dú)的進(jìn)程中啟動(dòng)子進(jìn)程的最簡(jiǎn)單方法是使用Process并傳遞目標(biāo)函數(shù),但也可以使用自定義子類。

import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()
        
# output
# In Worker-1
# In Worker-3
# In Worker-2
# In Worker-4
# In Worker-5

派生類應(yīng)該重寫run()以完成其工作。

向進(jìn)程傳遞消息

與線程一樣,多個(gè)進(jìn)程的常見(jiàn)使用模式是將作業(yè)劃分為多個(gè)工作并行運(yùn)行。有效使用多個(gè)流程通常需要在它們之間進(jìn)行一些通信,以便可以劃分工作并匯總結(jié)果。在進(jìn)程之間通信的一種簡(jiǎn)單方法是使用 Queue來(lái)傳遞消息。任何可以通過(guò) pickle 序列化的對(duì)象都可以傳遞給 Queue。

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
    
# output
# Doing something fancy in Process-1 for Fancy Dan!

這個(gè)簡(jiǎn)短的示例僅將單個(gè)消息傳遞給單個(gè)工作程序,然后主進(jìn)程等待工作程序完成。

下面看一個(gè)更復(fù)雜例子,它顯示了如何管理多個(gè)從 JoinableQueue 消耗數(shù)據(jù)的 worker,并將結(jié)果傳遞回父進(jìn)程。「poison pill」技術(shù)用來(lái)終止 workers。設(shè)置實(shí)際任務(wù)后,主程序會(huì)將每個(gè)工作程序的一個(gè)“停止”值添加到隊(duì)列中。當(dāng) worker 遇到特殊值時(shí),它會(huì)從循環(huán)中跳出。主進(jìn)程使用任務(wù)隊(duì)列的join()方法在處理結(jié)果之前等待所有任務(wù)完成。

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1
        
# output
# Creating 8 consumers
# Consumer-1: 0 * 0
# Consumer-2: 1 * 1
# Consumer-3: 2 * 2
# Consumer-4: 3 * 3
# Consumer-5: 4 * 4
# Consumer-6: 5 * 5
# Consumer-7: 6 * 6
# Consumer-8: 7 * 7
# Consumer-3: 8 * 8
# Consumer-7: 9 * 9
# Consumer-4: Exiting
# Consumer-1: Exiting
# Consumer-2: Exiting
# Consumer-5: Exiting
# Consumer-6: Exiting
# Consumer-8: Exiting
# Consumer-7: Exiting
# Consumer-3: Exiting
# Result: 6 * 6 = 36
# Result: 2 * 2 = 4
# Result: 3 * 3 = 9
# Result: 0 * 0 = 0
# Result: 1 * 1 = 1
# Result: 7 * 7 = 49
# Result: 4 * 4 = 16
# Result: 5 * 5 = 25
# Result: 8 * 8 = 64
# Result: 9 * 9 = 81

盡管作業(yè)按順序進(jìn)入隊(duì)列,但它們的執(zhí)行是并行化的,因此無(wú)法保證它們的完成順序。

進(jìn)程間通信

Event類提供一種簡(jiǎn)單的方式進(jìn)行進(jìn)程之間的通信??梢栽谠O(shè)置和未設(shè)置狀態(tài)之間切換事件。事件對(duì)象的用戶可以使用可選的超時(shí)值等待它從未設(shè)置更改為設(shè)置。

import multiprocessing
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')
    
# output
# main: waiting before calling Event.set()
# wait_for_event: starting
# wait_for_event_timeout: starting
# wait_for_event_timeout: e.is_set()-> False
# main: event is set
# wait_for_event: e.is_set()-> True

如果wait()超時(shí),不會(huì)返回錯(cuò)誤。調(diào)用者可以使用 is_set() 檢查事件的狀態(tài)。

控制對(duì)資源的訪問(wèn)

在多個(gè)進(jìn)程之間共享單個(gè)資源的情況下,可以用 Lock 來(lái)避免訪問(wèn)沖突。

import multiprocessing
import sys


def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')


def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()


lock = multiprocessing.Lock()
w = multiprocessing.Process(
    target=worker_with,
    args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
    target=worker_no_with,
    args=(lock, sys.stdout),
)

w.start()
nw.start()

w.join()
nw.join()

# output
# Lock acquired via with
# Lock acquired directly

在此示例中,如果兩個(gè)進(jìn)程不同步它們對(duì)標(biāo)準(zhǔn)輸出的訪問(wèn)與鎖定,則打印到控制臺(tái)的消息可能混雜在一起。

同步操作

Condition 對(duì)象可用于同步工作流的一部分,可以使某些對(duì)象并行運(yùn)行,但其他對(duì)象順序運(yùn)行,即使它們位于不同的進(jìn)程中。

import multiprocessing
import time


def stage_1(cond):
    """
    perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()


def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()
        
# output
# Starting stage_2[1]
# Starting stage_2[2]
# Starting s1
# s1 done and ready for stage 2
# stage_2[1] running
# stage_2[2] running

在此示例中,兩個(gè)進(jìn)程并行運(yùn)行 stage_2,但僅在 stage_1 完成后運(yùn)行。

控制對(duì)資源的并發(fā)訪問(wèn)

有時(shí),允許多個(gè) worker 同時(shí)訪問(wèn)資源是有用的,同時(shí)仍限制總數(shù)。例如,連接池可能支持固定數(shù)量的并發(fā)連接,或者網(wǎng)絡(luò)應(yīng)用程序可能支持固定數(shù)量的并發(fā)下載。Semaphore 是管理這些連接的一種方法。

import random
import multiprocessing
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)

    def __str__(self):
        with self.lock:
            return str(self.active)


def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print('Activating {} now running {}'.format(name, pool))
        time.sleep(random.random())
        pool.makeInactive(name)


if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        for i in range(10)
    ]

    for j in jobs:
        j.start()

    while True:
        alive = 0
        for j in jobs:
            if j.is_alive():
                alive += 1
                j.join(timeout=0.1)
                print('Now running {}'.format(pool))
        if alive == 0:
            # all done
            break

# output            
# Activating 0 now running ['0', '1', '2']
# Activating 1 now running ['0', '1', '2']
# Activating 2 now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Activating 3 now running ['0', '1', '3']
# Activating 4 now running ['1', '3', '4']
# Activating 6 now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Activating 5 now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Activating 8 now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Activating 7 now running ['5', '8', '7']
# Now running ['5', '8', '7']
# Activating 9 now running ['8', '7', '9']
# Now running ['8', '7', '9']
# Now running ['8', '9']
# Now running ['8', '9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running []            

在此示例中,ActivePool 類僅用作跟蹤在給定時(shí)刻正在運(yùn)行的進(jìn)程的便捷方式。實(shí)際資源池可能會(huì)為新活動(dòng)的進(jìn)程分配連接或其他值,并在任務(wù)完成時(shí)回收該值。這里,pool 只用于保存活動(dòng)進(jìn)程的名稱,以顯示只有三個(gè)并發(fā)運(yùn)行。

管理共享狀態(tài)

在前面的示例中,首先通過(guò) Manager 創(chuàng)建特殊類型的列表,然后活動(dòng)進(jìn)程列表通過(guò) ActivePool 在實(shí)例中集中維護(hù)。Manager負(fù)責(zé)協(xié)調(diào)所有用戶之間共享信息的狀態(tài)。

import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', d)
    
# output
# Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}

通過(guò)管理器創(chuàng)建列表,它將被共享,并且可以在所有進(jìn)程中看到更新。字典也支持。

共享命名空間

除了字典和列表,Manager還可以創(chuàng)建共享Namespace。

import multiprocessing


def producer(ns, event):
    ns.value = 'This is the value'
    event.set()


def consumer(ns, event):
    try:
        print('Before event: {}'.format(ns.value))
    except Exception as err:
        print('Before event, error:', str(err))
    event.wait()
    print('After event:', ns.value)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()
    
# output
# Before event, error: 'Namespace' object has no attribute 'value'
# After event: This is the value

只要添加到命名空間Namespace,那么所有接收Namespace實(shí)例的客戶端都可見(jiàn)。

重要的是,要知道命名空間中可變值內(nèi)容的更新不會(huì)自動(dòng)傳播。

import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append('This is the value')
    event.set()


def consumer(ns, event):
    print('Before event:', ns.my_list)
    event.wait()
    print('After event :', ns.my_list)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()
    
# output
# Before event: []
# After event : []

要更新列表,需要再次將其添加到命名空間。

進(jìn)程池

Pool類可用于管理固定數(shù)量 workers 的簡(jiǎn)單情況。返回值作為列表返回。Pool 參數(shù)包括進(jìn)程數(shù)和啟動(dòng)任務(wù)進(jìn)程時(shí)要運(yùn)行的函數(shù)(每個(gè)子進(jìn)程調(diào)用一次)。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)
    
# output
# Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b2be0>
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-8
# Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

除了各個(gè)任務(wù)并行運(yùn)行外,map()方法的結(jié)果在功能上等同于內(nèi)置map()。由于 Pool 并行處理其輸入,close()join()可用于主處理與任務(wù)進(jìn)程進(jìn)行同步,以確保完全清除。

默認(rèn)情況下,Pool創(chuàng)建固定數(shù)量的工作進(jìn)程并將 jobs 傳遞給它們,直到?jīng)]有其他 jobs 為止。設(shè)置 maxtasksperchild參數(shù)會(huì)告訴 Pool 在完成一些任務(wù)后重新啟動(dòng)工作進(jìn)程,從而防止長(zhǎng)時(shí)間運(yùn)行 workers 消耗更多的系統(tǒng)資源。

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)
    
# output
# Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b21d0>
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-8
# Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

即使沒(méi)有更多工作,Pool 也會(huì)在完成分配的任務(wù)后重新啟動(dòng) workers。在此輸出中,即使只有 10 個(gè)任務(wù),也會(huì)創(chuàng)建 8 個(gè) workers,并且每個(gè) worker 可以一次完成其中兩個(gè)任務(wù)。

實(shí)現(xiàn) MapReduce

Pool類可以用來(lái)創(chuàng)建一個(gè)簡(jiǎn)單的單臺(tái)服務(wù)器的 MapReduce 實(shí)現(xiàn)。雖然它沒(méi)有給出分布式處理的全部好處,但它確實(shí)說(shuō)明了將一些問(wèn)題分解為可分配的工作單元是多么容易。

在基于 MapReduce 的系統(tǒng)中,輸入數(shù)據(jù)被分解為塊以供不同的工作實(shí)例處理。使用簡(jiǎn)單的變換將每個(gè)輸入數(shù)據(jù)塊 映射到中間狀態(tài)。然后將中間數(shù)據(jù)收集在一起并基于鍵值進(jìn)行分區(qū),以使所有相關(guān)值在一起。最后,分區(qū)數(shù)據(jù)減少到結(jié)果。

# multiprocessing_mapreduce.py
import collections
import itertools
import multiprocessing


class SimpleMapReduce:

    def __init__(self, map_func, reduce_func, num_workers=None):
        """
        map_func

          Function to map inputs to intermediate data. Takes as
          argument one input value and returns a tuple with the
          key and a value to be reduced.

        reduce_func

          Function to reduce partitioned version of intermediate
          data to final output. Takes as argument a key as
          produced by map_func and a sequence of the values
          associated with that key.

        num_workers

          The number of workers to create in the pool. Defaults
          to the number of CPUs available on the current host.
        """
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self, mapped_values):
        """Organize the mapped values by their key.
        Returns an unsorted sequence of tuples with a key
        and a sequence of values.
        """
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize=1):
        """Process the inputs through the map and reduce functions
        given.

        inputs
          An iterable containing the input data to be processed.

        chunksize=1
          The portion of the input data to hand to each worker.
          This can be used to tune performance during the mapping
          phase.
        """
        map_responses = self.pool.map(
            self.map_func,
            inputs,
            chunksize=chunksize,
        )
        partitioned_data = self.partition(
            itertools.chain(*map_responses)
        )
        reduced_values = self.pool.map(
            self.reduce_func,
            partitioned_data,
        )
        return reduced_values

下面的示例腳本使用 SimpleMapReduce 來(lái)計(jì)算本文的 reStructuredText 源中的“words”,忽略了一些標(biāo)記。

# multiprocessing_wordcount.py 
import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce


def file_to_words(filename):
    """Read a file and return a sequence of
    (word, occurences) values.
    """
    STOP_WORDS = set([
        'a', 'an', 'and', 'are', 'as', 'be', 'by', 'for', 'if',
        'in', 'is', 'it', 'of', 'or', 'py', 'rst', 'that', 'the',
        'to', 'with',
    ])
    TR = str.maketrans({
        p: ' '
        for p in string.punctuation
    })

    print('{} reading {}'.format(
        multiprocessing.current_process().name, filename))
    output = []

    with open(filename, 'rt') as f:
        for line in f:
            # Skip comment lines.
            if line.lstrip().startswith('..'):
                continue
            line = line.translate(TR)  # Strip punctuation
            for word in line.split():
                word = word.lower()
                if word.isalpha() and word not in STOP_WORDS:
                    output.append((word, 1))
    return output


def count_words(item):
    """Convert the partitioned data for a word to a
    tuple containing the word and the number of occurences.
    """
    word, occurences = item
    return (word, sum(occurences))


if __name__ == '__main__':
    import operator
    import glob

    input_files = glob.glob('*.rst')

    mapper = SimpleMapReduce(file_to_words, count_words)
    word_counts = mapper(input_files)
    word_counts.sort(key=operator.itemgetter(1))
    word_counts.reverse()

    print('\nTOP 20 WORDS BY FREQUENCY\n')
    top20 = word_counts[:20]
    longest = max(len(word) for word, count in top20)
    for word, count in top20:
        print('{word:<{len}}: {count:5}'.format(
            len=longest + 1,
            word=word,
            count=count)
        )

file_to_words() 函數(shù)將每個(gè)輸入文件轉(zhuǎn)換為包含單詞和數(shù)字1(表示單個(gè)匹配項(xiàng))的元組序列。通過(guò)partition() 使用單詞作為鍵來(lái)劃分?jǐn)?shù)據(jù),因此得到的結(jié)構(gòu)由一個(gè)鍵和1表示每個(gè)單詞出現(xiàn)的值序列組成。count_words()在縮小階段,分區(qū)數(shù)據(jù)被轉(zhuǎn)換為一組元組,其中包含一個(gè)單詞和該單詞的計(jì)數(shù)。

$ python3 -u multiprocessing_wordcount.py

ForkPoolWorker-1 reading basics.rst
ForkPoolWorker-2 reading communication.rst
ForkPoolWorker-3 reading index.rst
ForkPoolWorker-4 reading mapreduce.rst

TOP 20 WORDS BY FREQUENCY

process         :    83
running         :    45
multiprocessing :    44
worker          :    40
starting        :    37
now             :    35
after           :    34
processes       :    31
start           :    29
header          :    27
pymotw          :    27
caption         :    27
end             :    27
daemon          :    22
can             :    22
exiting         :    21
forkpoolworker  :    21
consumer        :    20
main            :    18
event           :    16




相關(guān)文檔:

https://pymotw.com/3/multiprocessing/index.html

https://thief.one/2016/11/23/Python-multiprocessing/

http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-%E8%BF%9B%E7%A8%8B%E7%AF%87/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 進(jìn)程、進(jìn)程的使用、進(jìn)程注意點(diǎn)、進(jìn)程間通信-Queue、進(jìn)程池Pool、進(jìn)程與線程對(duì)比、文件夾拷貝器-多任務(wù) 1.進(jìn)...
    Cestine閱讀 1,102評(píng)論 0 0
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,746評(píng)論 0 5
  • 1.進(jìn)程 1.1多線程的引入 現(xiàn)實(shí)生活中 有很多的場(chǎng)景中的事情是同時(shí)進(jìn)行的,比如開(kāi)車的時(shí)候手和腳共同來(lái)駕駛汽車,再...
    TENG書(shū)閱讀 889評(píng)論 0 0
  • 一、總體內(nèi)容 1.1、進(jìn)程、程序的概念 1.2、使用 Process 完成多進(jìn)程- multiprocessing...
    IIronMan閱讀 837評(píng)論 0 1
  • 做每件事,都是為了一個(gè)目標(biāo),好比吃飯是為了充沛體力,睡覺(jué)是為了有更好的精神,溝通是為了解決問(wèn)題。有了目標(biāo)就...
    細(xì)數(shù)人生路閱讀 898評(píng)論 0 1

友情鏈接更多精彩內(nèi)容