Python的并發(fā)編程(五)- 線程池

之前的文章學(xué)習(xí)了一些多線程的用法,在I/O密集型的程序中,多線程帶來了顯著的性能提升,那我們可以無限制的,大量的創(chuàng)建多線程任務(wù)嗎?

一個(gè)線程創(chuàng)建、銷毀都是需要消耗系統(tǒng)資源的,如果線程數(shù)大于一定的數(shù)量,線程的創(chuàng)建銷毀就會(huì)占用大量的系統(tǒng)性能,就不能充分利用到系統(tǒng)資源了。這時(shí)候,可以選擇使用線程池了,

線程池

創(chuàng)建一定數(shù)量的線程來執(zhí)行任務(wù),任務(wù)結(jié)束之后,該線程不進(jìn)行銷毀,而是繼續(xù)從任務(wù)隊(duì)列中獲取新任務(wù)來執(zhí)行,直到所有任務(wù)都執(zhí)行結(jié)束之后,關(guān)閉所有線程。

concurrent.futures庫中包含有一個(gè)ThreadPoolExecutor類可用來實(shí)現(xiàn)這個(gè)目的。下面來實(shí)現(xiàn)一個(gè)簡單的TCP服務(wù)器,使用線程池來服務(wù)客戶端:

from socket import AF_INET, socket, SOCK_STREAM
from concurrent.futures import ThreadPoolExecutor


def echo_client(sock, client_addr):
    print("{addr}已連接".format(addr=client_addr))
    while True:
        msg = sock.recv(40960)
        if not msg:
            break
        sock.sendall(msg)
    sock.close()
    print("關(guān)閉連接")


def echo_server(addr):
    pool = ThreadPoolExecutor(128)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        pool.submit(echo_client, client_sock, client_addr)

if __name__ == '__main__':
    echo_server(("", 5000))

當(dāng)然,我們也可以手動(dòng)創(chuàng)建線程池,使用Queue來作為任務(wù)隊(duì)列。

from socket import AF_INET, socket, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
    sock, client_addr = q.get()
    print("{addr}已連接".format(addr=client_addr))
    while True:
        msg = sock.recv(40960)
        if not msg:
            break
        sock.sendall(msg)
    sock.close()
    print("關(guān)閉連接")

def echo_server(addr, nworkers):
    q = Queue()
    for n in range(nworkers):
        # 啟動(dòng)nworkers個(gè)線程
        t = Thread(target=echo_client, args=(q, ))
        t.daemon = True
        t.start()
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(addr)
    sock.listen(5)
    while True:
        client_sock, client_addr = sock.accept()
        q.put(client_sock, client_addr)

if __name__ == '__main__':
    echo_server(("", 5000), 10)

盡量使用ThreadPoolExecutor而不是手動(dòng)實(shí)現(xiàn)線程池。這么做的優(yōu)勢(shì)在于使得任務(wù)的提交者能夠更容易從調(diào)用函數(shù)中取得結(jié)果。

import urllib.request
from concurrent.futures import ThreadPoolExecutor


def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
a = pool.submit(fetch_url, "http://www.python.org")
b = pool.submit(fetch_url, "http://www.pypy.org")

x = a.result()
y = b.result()

示例中的結(jié)果對(duì)象(即a和b)負(fù)責(zé)處理所有需要完成的阻塞和同步任務(wù),從工作者線程中取回?cái)?shù)據(jù)。特別是,a.result()操作會(huì)阻塞,直到對(duì)應(yīng)的函數(shù)已經(jīng)由線程執(zhí)行完幷返回了結(jié)果為止。

注意:避免寫允許線程無限增長的程序。如果在web服務(wù)中這么做了,無法阻止惡意用戶對(duì)服務(wù)器發(fā)起拒絕服務(wù)攻擊,從而導(dǎo)致服務(wù)器上創(chuàng)建了大量的線程,耗盡了系統(tǒng)資源而崩潰。通過預(yù)先初始化好的線程池,就可以小心的為所有能支持的并發(fā)總數(shù)設(shè)定一個(gè)上限。

本文最先發(fā)布于:SavingUnhappy

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容