注本文摘自https://docs.python.org/3/library/concurrent.futures.html
1.概論
concurrent.futures 模塊提供了一個高水平的接口用于異步執(zhí)行調用。
異步執(zhí)行可以使用線程實現(xiàn),使用ThreadPoolExecutor,或者獨立的進程,使用ProcessPoolExecutor 實現(xiàn)。兩者都實現(xiàn)了相同的接口, 都是由抽象 Executor 類定義的.
2. Executor對象
class concurrent.futures.Executor
一個提供執(zhí)行異步調用方法的抽象類。它不應該直接使用,而是通過它的具體子類。
2.1 submit(fn, *args, **kwargs)
調度可調用的fn,作為fn(args kwargs)執(zhí)行,并返回一個表示可調用的執(zhí)行的Future對象。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
2.2 map(func, *iterables, timeout=None, chunksize=1)
與map(func,*iterables)相對應的異步執(zhí)行的版本,并且對func的幾個調用可以并發(fā)地執(zhí)行。調用Executor.map()指定的timeout時間后,如果_next_()調用后結果不可用,迭代器將會拋出concurrent.futures.TimeoutError異常。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果調用引發(fā)異常,那么當從迭代器中檢索值時將會拋出異常。當使用ProcessPoolExecutor時,該方法將iterables分成若干塊,并將其作為單獨的任務提交到池中。通過將塊大小設置為一個正整數(shù),可以指定這些塊的(近似)大小。對于非常長的迭代器,使用一個較大的值來進行塊大小可以顯著地提高性能,而不是默認的1。對于ThreadPoolExecutor,chunksize參數(shù)沒有影響。
2.3 shutdown(wait=True)
告訴執(zhí)行程序,當當前正在執(zhí)行的Futures結束執(zhí)行時,它應該釋放它正在使用的任何資源。在shutdown之后調用 Executor.submit() 和Executor.map()將會報 RuntimeError.
如果wait等于True,那么這個方法將不會返回,直到所有的待完成的Futures執(zhí)行完,并且與執(zhí)行程序關聯(lián)的資源已經被釋放。如果wait等于False,那么該方法將立即返回,與執(zhí)行器相關聯(lián)的資源將在所有等待的Futures執(zhí)行時被釋放。不管wait的值是什么,整個Python程序都不會退出,直到所有的待完成的Futures都被執(zhí)行完成。
你可以避免顯示的調用該方法通過使用with 語句。當然了,后臺使用的wait參數(shù)是True.
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
3.ThreadPoolExecutor
ThreadPoolExecutor是一個Executor的子類,它使用線程池來異步執(zhí)行調用。
當一個Future關聯(lián)的調用等待其他Future的結果時,就可能出現(xiàn)死鎖。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
Executor子類,使用max_workers規(guī)格的線程池來執(zhí)行異步調用。
例子:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
4.ProcessPoolExecutor
ProcessPoolExecutor 類是Executor 的一個子類,使用進程池執(zhí)行異步調用。
ProcessPoolExecutor使用 multiprocessing 模塊,該模塊允許它避開全局解釋器鎖,但也意味著只能執(zhí)行和返回可執(zhí)行的對象。
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
5. Future Objects
Future類封裝了可調用的異步執(zhí)行.Future 實例通過 Executor.submit()方法創(chuàng)建。
5.1 cancel()
試圖取消調用。如果調用當前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調用將被取消,方法將返回True。
5.2 cancelled()
如果成功取消調用,返回True。
5.3 running()
如果調用當前正在執(zhí)行并且不能被取消,返回True。
5.4 done()
如果調用成功地取消或結束了,返回True。
5.5 result(timeout=None)
返回調用返回的值。如果調用還沒有完成,那么這個方法將等待超時秒。如果調用在超時秒內沒有完成,那么就會有一個concurrent.Futures.TimeoutError將報出。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。
5.6 exception(timeout=None)
返回調用拋出的異常,如果調用還未完成,該方法會等待timeout指定的時長,如果該時長后調用還未完成,就會報出超時錯誤
concurrent.futures.TimeoutError。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。
如果調用完成并且無異常報出,返回None.
5.7 add_done_callback(fn)
將可調用fn捆綁到future上,當Future被取消或者結束運行,fn作為future的唯一參數(shù)將會被調用。如果future已經運行完成或者取消,fn將會被立即調用。
6.Module Functions
6.1 concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待fs提供的 Future 實例(possibly created by different Executor instances) 運行結束。返回一個命名的2元集合,分表代表已完成的和未完成的futures.
timeout 可用于控制等待的最大時長. timeout 可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。
return_when 表明什么時候函數(shù)應該返回。它的值必須是一下值之一:
FIRST_COMPLETED :函數(shù)在任何future結束或者取消的時候返回。
FIRST_EXCEPTION :函數(shù)在任何future因為異常結束的時候返回,如果沒有future報錯,效果等于ALL_COMPLETED.
ALL_COMPLETED :函數(shù)在所有future結束后才會返回。
6.2 concurrent.futures.as_completed(fs, timeout=None)
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled). If any given Futures are duplicated, they will be returned
once.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = set(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
yield from finished
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
for future in finished:
yield future
pending.remove(future)
finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)
配合 for 使用可以循環(huán)得到已經完成的 Future.
import concurrent.futures
# from concurrent.futures import ThreadPoolExecutor
def wait_on_future():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.、
for future in concurrent.futures.as_completed((f,)):
print(future.result())
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
def main_thread(rlt):
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(get_rlt_from_allfile, rlt[i]): rlt[i] for i in range(0,len(rlt))}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
pass