07. 多線程之線程池

一、概述

  • 線程池在系統(tǒng)啟動時即創(chuàng)建大量空閑的線程,程序只要將一個函數(shù)提交給線程池,線程池就會啟動一個空閑的線程來執(zhí)行它。當(dāng)該函數(shù)執(zhí)行結(jié)束后,該線程并不會死亡,而是再次返回到線程池中變成空閑狀態(tài),等待執(zhí)行下一個函數(shù)。
  • 此外,使用線程池可以有效地控制系統(tǒng)中并發(fā)線程的數(shù)量。當(dāng)系統(tǒng)中包含有大量的并發(fā)線程時,會導(dǎo)致系統(tǒng)性能急劇下降,甚至導(dǎo)致 Python 解釋器崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程的數(shù)量不超過此數(shù)。
  • 官網(wǎng):https://docs.python.org/dev/library/concurrent.futures.html
  • 從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實(shí)現(xiàn)了對threading和multiprocessing的進(jìn)一步抽象(這里主要關(guān)注線程池),不僅可以幫我們自動調(diào)度線程,還可以做到:
    1. 主線程可以獲取某一個線程(或者任務(wù)的)的狀態(tài),以及返回值。
    2. 當(dāng)一個線程完成的時候,主線程能夠立即知道。
    3. 讓多線程和多進(jìn)程的編碼接口一致。

concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池,提供異步調(diào)用

屬性:max_workers, 線程池容量

1、submit(fn, *args, **kwargs)
異步提交任務(wù)

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環(huán)submit的操作

3、shutdown(wait=True) 
相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作
wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False,立即返回,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值,整個程序都會等到所有任務(wù)執(zhí)行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結(jié)果
5、add_done_callback(fn)
回調(diào)函數(shù)
6、done()
方法用于判定某個任務(wù)是否完成
7、cancel()
cancel方法用于取消某個任務(wù),該任務(wù)沒有放入線程池中才能取消成功

二、創(chuàng)建線程池示例

#!/usr/bin/env python
# coding:utf-8

from concurrent.futures import ThreadPoolExecutor
import time


# 參數(shù)times用來模擬網(wǎng)絡(luò)請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)
    # 通過submit函數(shù)提交執(zhí)行的函數(shù)到線程池中,submit函數(shù)立即返回,不阻塞
    task1 = executor.submit(get_html, times=3)
    task2 = executor.submit(get_html, 2)
    # done方法用于判定某個任務(wù)是否完成
    print(task1.done())
    # cancel方法用于取消某個任務(wù),該任務(wù)沒有放入線程池中才能取消成功
    print(task2.cancel())
    time.sleep(4)
    print(task1.done())
    # result方法可以獲取task的執(zhí)行結(jié)果
    print(task1.result())
  1. ThreadPoolExecutor構(gòu)造實(shí)例的時候,傳入max_workers參數(shù)來設(shè)置線程池中最多能同時運(yùn)行的線程數(shù)目。
  2. 使用submit函數(shù)來提交線程需要執(zhí)行的任務(wù)(函數(shù)名和參數(shù))到線程池中,并返回該任務(wù)的句柄(類似于文件、畫圖),注意submit()不是阻塞的,而是立即返回。
  3. 通過submit函數(shù)返回的任務(wù)句柄,能夠使用done()方法判斷該任務(wù)是否結(jié)束。上面的例子可以看出,由于任務(wù)有2s的延時,在task1提交后立刻判斷,task1還未完成,而在延時4s之后判斷,task1就完成了。
  4. 使用cancel()方法可以取消提交的任務(wù),如果任務(wù)已經(jīng)在線程池中運(yùn)行了,就取消不了。這個例子中,線程池的大小設(shè)置為3,任務(wù)已經(jīng)在運(yùn)行了,所以取消失敗。如果改變線程池的大小為1,那么先提交的是task1,task2還在排隊(duì)等候,這是時候就可以成功取消。
  5. 使用result()方法可以獲取任務(wù)的返回值。查看內(nèi)部代碼,發(fā)現(xiàn)這個方法是阻塞的。

三、as_completed方法獲線程執(zhí)行狀態(tài)

as_completed()方法是一個生成器,在沒有任務(wù)完成的時候,會阻塞,在有某個任務(wù)完成的時候,會yield這個任務(wù),就能執(zhí)行for循環(huán)下面的語句,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束。從結(jié)果也可以看出,先完成的任務(wù)會先通知主線程。

#!/usr/bin/env python
# coding:utf-8

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def get_html(times):
    time.sleep(times)
    print(f'get page {times}s finished.')
    return times


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    urls = [3, 1, 2, 4]
    all_task = (executor.submit(get_html, url) for url in urls)
    for future in as_completed(all_task):
        data = future.result()
        print(f'main: get page {data}s success.')

四、map方法

使用map方法,無需提前使用submit方法,map方法與python標(biāo)準(zhǔn)庫中的map含義相同,都是將序列中的每個元素都執(zhí)行同一個函數(shù)。上面的代碼就是對urls的每個元素都執(zhí)行g(shù)et_html函數(shù),并分配各線程池??梢钥吹綀?zhí)行結(jié)果與上面的as_completed方法的結(jié)果不同,輸出順序和urls列表的順序相同,就算2s的任務(wù)先執(zhí)行完成,也會先打印出3s的任務(wù)先完成,再打印2s的任務(wù)完成。

#!/usr/bin/env python
# coding:utf-8

from concurrent.futures import ThreadPoolExecutor
import time


def get_html(times):
    time.sleep(times)
    print(f'get page {times}s finished.')
    return times


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    urls = [3, 2, 4, 1]
    for data in executor.map(get_html, urls):
        print(f'main: get page {data}s success.')

五、wait方法

  • wait方法可以讓主線程阻塞,直到滿足設(shè)定的要求。
    wait方法接收3個參數(shù),等待的任務(wù)序列、超時時間以及等待條件。等待條件return_when默認(rèn)為ALL_COMPLETED,表明要等待所有的任務(wù)都結(jié)束??梢钥吹竭\(yùn)行結(jié)果中,確實(shí)是所有任務(wù)都完成了,主線程才打印出main。等待條件還可以設(shè)置為FIRST_COMPLETED,表示第一個任務(wù)完成就停止等待。
#!/usr/bin/env python
# coding:utf-8

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time


def get_html(times):
    time.sleep(times)
    print(f'get page {times}s finished.')
    return times


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    urls = [3, 4, 2, 1]
    all_task = (executor.submit(get_html, url) for url in urls)
    # wait(all_task, return_when=ALL_COMPLETED)
    wait(all_task, return_when=FIRST_COMPLETED)
    print('main')

參考:[python] ThreadPoolExecutor線程池

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容