python之concurrent.futures — 啟動并行任務

注本文摘自https://docs.python.org/3/library/concurrent.futures.html

1.概論

concurrent.futures 模塊提供了一個高水平的接口用于異步執(zhí)行調用。
異步執(zhí)行可以使用線程實現(xiàn),使用ThreadPoolExecutor,或者獨立的進程,使用ProcessPoolExecutor 實現(xiàn)。兩者都實現(xiàn)了相同的接口, 都是由抽象 Executor 類定義的.

2. Executor對象

class concurrent.futures.Executor
一個提供執(zhí)行異步調用方法的抽象類。它不應該直接使用,而是通過它的具體子類。

2.1 submit(fn, *args, **kwargs)

調度可調用的fn,作為fn(args kwargs)執(zhí)行,并返回一個表示可調用的執(zhí)行的Future對象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

2.2 map(func, *iterables, timeout=None, chunksize=1)

與map(func,*iterables)相對應的異步執(zhí)行的版本,并且對func的幾個調用可以并發(fā)地執(zhí)行。調用Executor.map()指定的timeout時間后,如果_next_()調用后結果不可用,迭代器將會拋出concurrent.futures.TimeoutError異常。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果調用引發(fā)異常,那么當從迭代器中檢索值時將會拋出異常。當使用ProcessPoolExecutor時,該方法將iterables分成若干塊,并將其作為單獨的任務提交到池中。通過將塊大小設置為一個正整數(shù),可以指定這些塊的(近似)大小。對于非常長的迭代器,使用一個較大的值來進行塊大小可以顯著地提高性能,而不是默認的1。對于ThreadPoolExecutor,chunksize參數(shù)沒有影響。

2.3 shutdown(wait=True)

告訴執(zhí)行程序,當當前正在執(zhí)行的Futures結束執(zhí)行時,它應該釋放它正在使用的任何資源。在shutdown之后調用 Executor.submit() 和Executor.map()將會報 RuntimeError.

如果wait等于True,那么這個方法將不會返回,直到所有的待完成的Futures執(zhí)行完,并且與執(zhí)行程序關聯(lián)的資源已經被釋放。如果wait等于False,那么該方法將立即返回,與執(zhí)行器相關聯(lián)的資源將在所有等待的Futures執(zhí)行時被釋放。不管wait的值是什么,整個Python程序都不會退出,直到所有的待完成的Futures都被執(zhí)行完成。

你可以避免顯示的調用該方法通過使用with 語句。當然了,后臺使用的wait參數(shù)是True.

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

3.ThreadPoolExecutor

ThreadPoolExecutor是一個Executor的子類,它使用線程池來異步執(zhí)行調用。
當一個Future關聯(lián)的調用等待其他Future的結果時,就可能出現(xiàn)死鎖。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

Executor子類,使用max_workers規(guī)格的線程池來執(zhí)行異步調用。
例子:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

4.ProcessPoolExecutor

ProcessPoolExecutor 類是Executor 的一個子類,使用進程池執(zhí)行異步調用。
ProcessPoolExecutor使用 multiprocessing 模塊,該模塊允許它避開全局解釋器鎖,但也意味著只能執(zhí)行和返回可執(zhí)行的對象。

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

5. Future Objects

Future類封裝了可調用的異步執(zhí)行.Future 實例通過 Executor.submit()方法創(chuàng)建。

5.1 cancel()

試圖取消調用。如果調用當前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調用將被取消,方法將返回True。

5.2 cancelled()

如果成功取消調用,返回True。

5.3 running()

如果調用當前正在執(zhí)行并且不能被取消,返回True。

5.4 done()

如果調用成功地取消或結束了,返回True。

5.5 result(timeout=None)

返回調用返回的值。如果調用還沒有完成,那么這個方法將等待超時秒。如果調用在超時秒內沒有完成,那么就會有一個concurrent.Futures.TimeoutError將報出。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。

5.6 exception(timeout=None)

返回調用拋出的異常,如果調用還未完成,該方法會等待timeout指定的時長,如果該時長后調用還未完成,就會報出超時錯誤
concurrent.futures.TimeoutError。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。
如果調用完成并且無異常報出,返回None.

5.7 add_done_callback(fn)

將可調用fn捆綁到future上,當Future被取消或者結束運行,fn作為future的唯一參數(shù)將會被調用。如果future已經運行完成或者取消,fn將會被立即調用。

6.Module Functions

6.1 concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待fs提供的 Future 實例(possibly created by different Executor instances) 運行結束。返回一個命名的2元集合,分表代表已完成的和未完成的futures.
timeout 可用于控制等待的最大時長. timeout 可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。
return_when 表明什么時候函數(shù)應該返回。它的值必須是一下值之一:

FIRST_COMPLETED :函數(shù)在任何future結束或者取消的時候返回。
FIRST_EXCEPTION :函數(shù)在任何future因為異常結束的時候返回,如果沒有future報錯,效果等于ALL_COMPLETED.
ALL_COMPLETED :函數(shù)在所有future結束后才會返回。

6.2 concurrent.futures.as_completed(fs, timeout=None)

def as_completed(fs, timeout=None):
    """An iterator over the given futures that yields each as it completes.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            iterate over.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.

    Returns:
        An iterator that yields the given Futures as they complete (finished or
        cancelled). If any given Futures are duplicated, they will be returned
        once.

    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
    """
    if timeout is not None:
        end_time = timeout + time.time()

    fs = set(fs)
    with _AcquireFutures(fs):
        finished = set(
                f for f in fs
                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        pending = fs - finished
        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

    try:
        yield from finished

        while pending:
            if timeout is None:
                wait_timeout = None
            else:
                wait_timeout = end_time - time.time()
                if wait_timeout < 0:
                    raise TimeoutError(
                            '%d (of %d) futures unfinished' % (
                            len(pending), len(fs)))

            waiter.event.wait(wait_timeout)

            with waiter.lock:
                finished = waiter.finished_futures
                waiter.finished_futures = []
                waiter.event.clear()

            for future in finished:
                yield future
                pending.remove(future)

    finally:
        for f in fs:
            with f._condition:
                f._waiters.remove(waiter)

配合 for 使用可以循環(huán)得到已經完成的 Future.

import concurrent.futures
# from concurrent.futures import ThreadPoolExecutor
def wait_on_future():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.、
    for future in concurrent.futures.as_completed((f,)):
        print(future.result())

executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
def main_thread(rlt):
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(get_rlt_from_allfile, rlt[i]): rlt[i] for i in range(0,len(rlt))}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                pass
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容