之前的文章學(xué)習(xí)了一些多線程的用法,在I/O密集型的程序中,多線程帶來了顯著的性能提升,那我們可以無限制的,大量的創(chuàng)建多線程任務(wù)嗎?
一個(gè)線程創(chuàng)建、銷毀都是需要消耗系統(tǒng)資源的,如果線程數(shù)大于一定的數(shù)量,線程的創(chuàng)建銷毀就會(huì)占用大量的系統(tǒng)性能,就不能充分利用到系統(tǒng)資源了。這時(shí)候,可以選擇使用線程池了,
線程池
創(chuàng)建一定數(shù)量的線程來執(zhí)行任務(wù),任務(wù)結(jié)束之后,該線程不進(jìn)行銷毀,而是繼續(xù)從任務(wù)隊(duì)列中獲取新任務(wù)來執(zhí)行,直到所有任務(wù)都執(zhí)行結(jié)束之后,關(guān)閉所有線程。
concurrent.futures庫中包含有一個(gè)ThreadPoolExecutor類可用來實(shí)現(xiàn)這個(gè)目的。下面來實(shí)現(xiàn)一個(gè)簡單的TCP服務(wù)器,使用線程池來服務(wù)客戶端:
from socket import AF_INET, socket, SOCK_STREAM
from concurrent.futures import ThreadPoolExecutor
def echo_client(sock, client_addr):
print("{addr}已連接".format(addr=client_addr))
while True:
msg = sock.recv(40960)
if not msg:
break
sock.sendall(msg)
sock.close()
print("關(guān)閉連接")
def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
if __name__ == '__main__':
echo_server(("", 5000))
當(dāng)然,我們也可以手動(dòng)創(chuàng)建線程池,使用Queue來作為任務(wù)隊(duì)列。
from socket import AF_INET, socket, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_client(q):
sock, client_addr = q.get()
print("{addr}已連接".format(addr=client_addr))
while True:
msg = sock.recv(40960)
if not msg:
break
sock.sendall(msg)
sock.close()
print("關(guān)閉連接")
def echo_server(addr, nworkers):
q = Queue()
for n in range(nworkers):
# 啟動(dòng)nworkers個(gè)線程
t = Thread(target=echo_client, args=(q, ))
t.daemon = True
t.start()
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put(client_sock, client_addr)
if __name__ == '__main__':
echo_server(("", 5000), 10)
盡量使用ThreadPoolExecutor而不是手動(dòng)實(shí)現(xiàn)線程池。這么做的優(yōu)勢(shì)在于使得任務(wù)的提交者能夠更容易從調(diào)用函數(shù)中取得結(jié)果。
import urllib.request
from concurrent.futures import ThreadPoolExecutor
def fetch_url(url):
u = urllib.request.urlopen(url)
data = u.read()
return data
pool = ThreadPoolExecutor(10)
a = pool.submit(fetch_url, "http://www.python.org")
b = pool.submit(fetch_url, "http://www.pypy.org")
x = a.result()
y = b.result()
示例中的結(jié)果對(duì)象(即a和b)負(fù)責(zé)處理所有需要完成的阻塞和同步任務(wù),從工作者線程中取回?cái)?shù)據(jù)。特別是,a.result()操作會(huì)阻塞,直到對(duì)應(yīng)的函數(shù)已經(jīng)由線程執(zhí)行完幷返回了結(jié)果為止。
注意:避免寫允許線程無限增長的程序。如果在web服務(wù)中這么做了,無法阻止惡意用戶對(duì)服務(wù)器發(fā)起拒絕服務(wù)攻擊,從而導(dǎo)致服務(wù)器上創(chuàng)建了大量的線程,耗盡了系統(tǒng)資源而崩潰。通過預(yù)先初始化好的線程池,就可以小心的為所有能支持的并發(fā)總數(shù)設(shè)定一個(gè)上限。
本文最先發(fā)布于:SavingUnhappy