管理并發(fā)任務池 concurrent.futures

concurrent.futures — Manage Pools of Concurrent Tasks

concurrent.futures - 管理并發(fā)任務池

Purpose: Easily manage tasks running concurrently and in parallel.
目的:很容易并行管理并發(fā)運行的任務

The concurrent.futures modules provides interfaces for running tasks using pools of thread or process workers. The APIs are the same, so applications can switch between threads and processes with minimal changes.
concurrent.futures 模塊提供了用于使用線程池或者進程池運行任務的接口。API是相同的,因此應用程序可以在線程和進程之間以最小的代價切換。

The module provides two types of classes for interacting with the pools. Executors are used for managing pools of workers, and futures are used for managing results computed by the workers. To use a pool of workers, an application creates an instance of the appropriate executor class and then submits tasks for it to run. When each task is started, a Future instance is returned. When the result of the task is needed, an application can use the Future to block until the result is available. Various APIs are provided to make it convenient to wait for tasks to complete, so that the Future objects do not need to be managed directly.
模塊提供了兩種類型的類來與池交互,Executors用于管理工人池,future用于管理由工人計算得到的結果。想要使用工人池,程序應用就必須創(chuàng)建一個合適的執(zhí)行類的實例,然后向其提交任務來運行。當每一個任務開始(也許是想說結束)時,就返回一個Future實例。當需要任務當結果,應用程序可以使用Future來阻塞,直到得到結果。對于等待任務運行結束,提供了大量的API,因此Future對象并不需要直接管理。

Using map() with a Basic Thread Pool

使用map()操作一個基本的線程池

The ThreadPoolExecutor manages a set of worker threads, passing tasks to them as they become available for more work. This example uses map() to concurrently produce a set of results from an input iterable. The task uses time.sleep() to pause a different amount of time to demonstrate that, regardless of the order of execution of concurrent tasks, map() always returns the values in order based on the inputs.
ThreadPoolExecutor管理了一個工人線程集合,一旦該集合能夠接受任務,就向其傳遞更多的任務。本示例使用map()通過迭代的輸入來并發(fā)的生成結果的結合。任務使用time.sleep()來暫停不同數(shù)量的時間來展示,與并發(fā)任務執(zhí)行的順序相比,map()總是返回基于輸入順序的值。

futures_thread_pool_map.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

The return value from map() is actually a special type of iterator that knows to wait for each response as the main program iterates over it.
來自于map()的返回值實際上是一個特殊類型的迭代器,該迭代器知道為每次響應等待,正如主程序迭代一樣。

$ python3 futures_thread_pool_map.py

main: starting
Thread-1: sleeping 5
Thread-2: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x1013c80a0>
main: waiting for real results
Thread-2: done with 4
Thread-2: sleeping 3
Thread-1: done with 5
Thread-1: sleeping 2
Thread-1: done with 2
Thread-1: sleeping 1
Thread-2: done with 3
Thread-1: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
Scheduling Individual Tasks

In addition to using map(), it is possible to schedule an individual task with an executor using submit(), and use the Future instance returned to wait for that task’s results.
想要額外的使用map()函數(shù),可以調度一個獨立的任務,使用submit()執(zhí)行,然后使用Future示例返回等待任務的結果。

futures_thread_pool_submit.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

The status of the future changes after the tasks is completed and the result is made available.
future的狀態(tài)在任務完成之后將會改變,結果就變的可見。

$ python3 futures_thread_pool_submit.py

main: starting
Thread-1: sleeping 5
main: future: <Future at 0x1010e6080 state=running>
main: waiting for results
Thread-1: done with 5
main: result: 0.5
main: future after result: <Future at 0x1010e6080 state=finished
returned float>
Waiting for Tasks in Any Order

Invoking the result() method of a Future blocks until the task completes (either by returning a value or raising an exception), or is canceled. The results of multiple tasks can be accessed in the order the tasks were scheduled using map(). If it does not matter what order the results should be processed, use as_completed() to process them as each task finishes.
調用Future語句塊的result()方法,直到任務完成(要么通過返回一個值,或者拋出異常),或者取消任務。多個任務的結果能夠以使用map()函數(shù)調度的順序訪問。如果無所謂以什么樣子的順序處理結果,使用as_completed()函數(shù)處理它們,作為任務已經(jīng)結束。

futures_as_completed.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

Because the pool has as many workers as tasks, all of the tasks can be started. They finish in a random order so the values generated by as_completed() are different each time the example runs.
由于pool中擁有和任務一樣多的workers,所有的入圍都能夠開始。而且以隨機的順序結束,因此,在每次運行as_completed()生成的值都是不同的。

$ python3 futures_as_completed.py

main: starting
main: result: (3, 0.3)
main: result: (5, 0.5)
main: result: (4, 0.4)
main: result: (2, 0.2)
main: result: (1, 0.1)

Future Callbacks

Future回調

To take some action when a task completed, without explicitly waiting for the result, use add_done_callback() to specify a new function to call when the Future is done. The callback should be a callable taking a single argument, the Future instance.
當任務完成后,想要執(zhí)行一些操作,而不必顯式的等待結果,可以使用add_done_callback()在Future完成后,來指定調用一個新的函數(shù)。callback將是一個可調用的參數(shù),F(xiàn)uture的實例。

futures_future_callback.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

The callback is invoked regardless of the reason the Future is considered “done,” so it is necessary to check the status of the object passed in to the callback before using it in any way.
無論Future以任何原因被認定為完成,回調函數(shù)都將會被調用。因此,在任何時候,使用之前都有必要檢查對象是否傳入回調函數(shù)。

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5
Canceling Tasks

A Future can be canceled, if it has been submitted but not started, by calling its cancel() method.
Future可以取消,如果已經(jīng)提交,但并未啟動,就可以通過調用cancel()方法。

futures_future_callback_cancel.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel() returns a Boolean indicating whether or not the task was able to be canceled.
cancel()返回一個布爾值,表明該任務是否能被取消。

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
Exceptions in Tasks

If a task raises an unhandled exception, it is saved to the Future for the task and made available through the result() or exception() methods.
如果任務拋出一個未處理的異常,對于當前task自函數(shù),該異常就被存入Future,然后使其對于result()或者exception()方法均可見。

futures_future_exception.py

from concurrent import futures


def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

If result() is called after an unhandled exception is raised within a task function, the same exception is re-raised in the current context.
在task函數(shù)中,如果一個未處理的異常被拋出,調用result()函數(shù),相同的異常就會在當前上下文中重新拋出。

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

Context Manager

上下文管理器

Executors work as context managers, running tasks concurrently and waiting for them all to complete. When the context manager exits, the shutdown() method of the executor is called.
Executors以上下文管理器的方式工作,同步運行任務,然后等待這些任務全部完成。當上下文管理器退出后,executor的shutdown()方法就被調用。

futures_context_manager.py

from concurrent import futures

def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

This mode of using the executor is useful when the thread or process resources should be cleaned up when execution leaves the current scope.
當執(zhí)行退出當前作用域時,所有的線程和進程資源都被清理,這種使用executor的模式就非常有用。

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

Process Pools

進程池

The ProcessPoolExecutor works in the same way as ThreadPoolExecutor, but uses processes instead of threads. This allows CPU-intensive operations to use a separate CPU and not be blocked by the CPython interpreter’s global interpreter lock.
ProcessPoolExecutor的工作方式與ThreadPoolExecutor一致,但是使用進程替代線程。這允許CPU敏感的操作使用不同的CPU,而不被CPython解釋器的全局線程鎖阻塞。

futures_process_pool_map.py

from concurrent import futures
import os


def task(n):
    return (n, os.getpid())


ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

As with the thread pool, individual worker processes are reused for multiple tasks.
和線程池一樣,獨立的工作進程被用于多重任務。

$ python3 futures_process_pool_map.py

ran task 5 in process 60245
ran task 4 in process 60246
ran task 3 in process 60245
ran task 2 in process 60245
ran task 1 in process 60245

If something happens to one of the worker processes to cause it to exit unexpectedly, the ProcessPoolExecutor is considered “broken” and will no longer schedule tasks.
如果某個工作進程發(fā)生異常,導致意外的退出,ProcessPoolExecutor就被認為“被破壞”,從而不再由新的任務調度產(chǎn)生。

futures_process_pool_broken.py

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

The BrokenProcessPool exception is actually thrown when the results are processed, rather than when the new task is submitted.
當結果處理完成后,BrokenProcessPool 異常被拋出,而不是等到新的任務被提交。

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 62059
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • **2014真題Directions:Read the following text. Choose the be...
    又是夜半驚坐起閱讀 11,018評論 0 23
  • A醫(yī)生,你無意殺我父親,我父親卻因你而死. 在昨天下午之前,我一直都把你當做我的同事. 父親第一次腸穿孔病危之時,...
    世涂花開閱讀 420評論 2 1
  • 茶文化背景 中國是茶的故鄉(xiāng),也是茶文化的發(fā)源地。中國茶的發(fā)現(xiàn)和利用,在中國已有四五千年歷史,且長盛不衰,傳遍全球。...
    水泰水媒體閱讀 1,086評論 0 0
  • 如云朵掠過天空整個世界裝不滿心靈投下一波秋色山在天際肅穆莊嚴 經(jīng)過春秋編織的風景托缽而行一場跋涉如果卷簾就能看到誰...
    幽蘭33閱讀 236評論 -1 11
  • 最近工作上的一通亂忙,昨日終于告一段落,細細想起來好像不全是自己范圍內的工作,別人放手不干的我都囫圇吞棗的拾掇...
    123生活閱讀 193評論 0 0

友情鏈接更多精彩內容