python之ThreadPoolExecutor

在前面的博客中介紹了線程的用法,每次使用都要?jiǎng)?chuàng)建線程,啟動(dòng)線程,有沒有什么辦法簡(jiǎn)單操作呢。

python3.2引入的concurrent.future模塊中有ThreadPoolExecutor和ProcessPoolExecutor兩個(gè)類,這兩個(gè)類內(nèi)部維護(hù)著線程/進(jìn)程池,以及要執(zhí)行的任務(wù)隊(duì)列,使得操作變得非常簡(jiǎn)單,不需要關(guān)心任何實(shí)現(xiàn)細(xì)節(jié)

來看一個(gè)簡(jiǎn)單的例子

#!/usr/bin/env python3.6
from concurrent.futures import ThreadPoolExecutor
import requests
import os

DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "download")
BASE_URL = "http://flupy.org/data/flags"
CC_LIST = ("CN", "US", "JP", "EG")

if not os.path.exists(DEST_DIR):
    os.mkdir(DEST_DIR)


def get_img(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    response = requests.get(url)
    return response.content

def save_img(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as f:
        f.write(img)

def download_one(cc):
    img = get_img(cc)
    save_img(img, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    works = len(cc_list)
    with ThreadPoolExecutor(works) as exector: # 使用with來管理ThreadPoolExecutor
  # map方法和內(nèi)置的map方法類似,不過exector的map方法會(huì)并發(fā)調(diào)用,返回一個(gè)由返回的值構(gòu)成的生成器
        response = exector.map(download_one, cc_list)
    return len(list(response))

if __name__ == "__main__":
    download_many(CC_LIST)

Future

concurrent.futures和asyncio中的Future類的作用相同,****都表示可能己經(jīng)完成或尚未完成的延遲計(jì)算****

Future封裝待完成的操作,可以放入隊(duì)列,完成的狀態(tài)可以查詢,得到結(jié)果后可以獲取結(jié)果

使用exector.submit()方法提交執(zhí)行的函數(shù)并獲取一個(gè)Future,而不是直接創(chuàng)建,傳入的參數(shù)是一個(gè)可調(diào)用的對(duì)象;獲取的Future對(duì)象有一個(gè)done()方法,判斷該Future是否己完成, add_one_callback()設(shè)置回調(diào)函數(shù), result()來獲取Future的結(jié)果。as_completed()傳一個(gè)Future列表,在Future都完成之后返回一個(gè)迭代器

使用submit()方法試試看

def download_many(cc_list):
    with ThreadPoolExecutor(max_workers=5) as exector:
        future_list = []
        for cc in cc_list:
    # 使用submit提交執(zhí)行的函數(shù)到線程池中,并返回futer對(duì)象(非阻塞)
            future = exector.submit(download_one, cc)
            future_list.append(future)
            print(cc, future)

        result = []
    # as_completed方法傳入一個(gè)Future迭代器,然后在Future對(duì)象運(yùn)行結(jié)束之后yield Future
        for future in futures.as_completed(future_list):
    # 通過result()方法獲取結(jié)果
            res = future.result()
            print(res, future)
            result.append(res)
    return len(result)
  
>>>
CN <Future at 0x7f80d32f5400 state=running>
US <Future at 0x7f80d330c320 state=running>
JP <Future at 0x7f80d330c8d0 state=running>
EG <Future at 0x7f80d330ce10 state=running>
JP <Future at 0x7f80d330c8d0 state=finished returned str>
CN <Future at 0x7f80d32f5400 state=finished returned str>
EG <Future at 0x7f80d330ce10 state=finished returned str>
US <Future at 0x7f80d330c320 state=finished returned str>

ProcessPoolExecutor的使用方法是一樣的,唯一需要注意的區(qū)別是傳入的max_workers這個(gè)參數(shù)對(duì)于ProcessPoolExecutor是可選的,在不使用的情況下默認(rèn)值是os.cpu_count()的返回值(cpu的數(shù)量)

exector.submit()和futures.as_completed()這個(gè)組合比exector.map()更靈活,submit()可以處理不同的調(diào)用函數(shù)和參數(shù),而map只能處理同一個(gè)可調(diào)用對(duì)象。

wait()阻塞主線程,直到所有task都完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容