一、使用threading模塊實(shí)現(xiàn)線程的創(chuàng)建
實(shí)例1
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.start()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:19 2020 is running
The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:24 2020 is running
Process finished with exit code 0
import threading
首先導(dǎo)入threading 模塊,這是使用多線程的前提。
t = threading.Thread(target=target)
創(chuàng)建線程t,使用threading.Thread()方法。
t.start()
開(kāi)始線程活動(dòng)。
使用threading.current_thread()可以查看到當(dāng)前線程的信息。
從輸出結(jié)果可以看到在線程Thread-1結(jié)束前MainThread已經(jīng)結(jié)束了,但并沒(méi)有殺死子線程Thread-1。
實(shí)例2
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t.join()
join()的作用是,在子線程完成運(yùn)行之前,這個(gè)子線程的父線程將一直被阻塞。Python中,默認(rèn)情況下,如果不加join()語(yǔ)句,那么主線程不會(huì)等到當(dāng)前線程結(jié)束才結(jié)束,但卻不會(huì)立即殺死該線程。如上面的輸出結(jié)果所示。
輸出結(jié)果:
The current threading MainThread---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:47 2020 is running
The current threading MainThread---Wed Oct 14 18:40:47 2020 is running
Process finished with exit code 0
實(shí)例3
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:09 2020 is running
The current threading MainThread---Thu Oct 15 13:07:09 2020 is running
Process finished with exit code 0
t.setDaemon(True)
t.setDaemon(True)將線程聲明為守護(hù)線程,必須在start() 方法調(diào)用之前設(shè)置,如果不設(shè)置為守護(hù)線程程序會(huì)被無(wú)限掛起。如果當(dāng)前python線程是守護(hù)線程,那么意味著這個(gè)線程是“不重要”的,“不重要”意味著如果他的主進(jìn)程結(jié)束了但該守護(hù)線程沒(méi)有運(yùn)行完,守護(hù)進(jìn)程就會(huì)被強(qiáng)制結(jié)束。如果線程是非守護(hù)線程,那么父進(jìn)程只有等到守護(hù)線程運(yùn)行完畢后才能結(jié)束。
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
# t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:08:38 2020 is running
The current threading Thread-1---Thu Oct 15 13:08:38 2020 is running
The current threading MainThread---Thu Oct 15 13:08:38 2020 is running
Process finished with exit code 0
如果為線程實(shí)例添加t.setDaemon(True)之后,如果不加join語(yǔ)句,那么當(dāng)主線程結(jié)束之后,會(huì)殺死子線程。
二、使用threading模塊實(shí)現(xiàn)多線程的創(chuàng)建
1、函數(shù)的方式創(chuàng)建
import threading
from time import ctime, sleep
def code():
print("I'm coding. {}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
def draw():
print("I'm drawing. {}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
if __name__ == "__main__":
threads = []
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t1 = threading.Thread(target=code)
threads.append(t1)
t2 = threading.Thread(target=draw)
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:35:06 2020 is running
I'm coding. Thu Oct 15 13:35:06 2020---Thread-1
I'm drawing. Thu Oct 15 13:35:06 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:35:11 2020 is running
Process finished with exit code 0
給線程傳遞參數(shù)
import threading
from time import ctime, sleep
def code(arg):
print("I'm coding.{}---{}---{}".format(arg, ctime(), threading.current_thread().name))
sleep(5)
def draw(arg):
print("I'm drawing.{}----{}---{}".format(arg, ctime(), threading.current_thread().name))
sleep(5)
if __name__ == "__main__":
threads = []
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t1 = threading.Thread(target=code, args=('敲代碼',))
threads.append(t1)
t2 = threading.Thread(target=draw, args=('畫(huà)畫(huà)',))
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:39:49 2020 is running
I'm coding.敲代碼---Thu Oct 15 13:39:49 2020---Thread-1
I'm drawing.畫(huà)畫(huà)----Thu Oct 15 13:39:49 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:39:54 2020 is running
Process finished with exit code 0
2.類的方式創(chuàng)建線程
繼承自threading.Thread類
為了讓線程代碼更好的封裝,可以使用threading模塊的下的Thread類,繼承自這個(gè)類,然后實(shí)現(xiàn)run方法,線程就會(huì)自動(dòng)運(yùn)行run方法中的代碼。
import threading
from time import ctime, sleep
class CodingThread(threading.Thread):
def run(self):
print("I'm coding.{}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
class DrawingThread(threading.Thread):
def run(self):
print("I'm drawing.{}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
def multi_thread():
t1 = CodingThread()
t2 = DrawingThread()
print(threading.enumerate())
t1.start()
print(threading.enumerate())
t2.start()
print(threading.enumerate())
if __name__ == "__main__":
multi_thread()
輸出結(jié)果
[<_MainThread(MainThread, started 4403457344)>]
I'm coding.Thu Oct 15 13:45:06 2020---Thread-1
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>]
I'm drawing.Thu Oct 15 13:45:06 2020---Thread-2
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>, <DrawingThread(Thread-2, started 123145461420032)>]
Process finished with exit code 0
三、多線程共享全局變量以及鎖機(jī)制
1、多線程共享變量的問(wèn)題
對(duì)于多線程來(lái)說(shuō),最大的特點(diǎn)就是線程之間可以共享數(shù)據(jù),線程的執(zhí)行又是無(wú)序的,那么共享數(shù)據(jù)就會(huì)出現(xiàn)多線程同時(shí)更改一個(gè)變量,使用同樣的資源,而出現(xiàn)死鎖、數(shù)據(jù)錯(cuò)亂等情況。
import threading
value = 0
class AddValueThread(threading.Thread):
def run(self):
global value
for x in range(1000000):
value += 1
print("{}的值是{}".format(threading.current_thread().name, value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1214452
Thread-2的值是1393110
Process finished with exit code 0
這個(gè)結(jié)果是錯(cuò)誤的,正確的結(jié)果應(yīng)該是:
Thread-1的值是1000000
Thread-2的值是2000000
由于兩條線程同時(shí)對(duì)value操作,所以這里就出現(xiàn)數(shù)據(jù)錯(cuò)誤了
2、線程鎖和ThreadLocal
(1)線程鎖
為了解決以上使用共享變量的問(wèn)題。threading提供了一個(gè)Lock類,這個(gè)類可以在某個(gè)線程訪問(wèn)某個(gè)變量的時(shí)候加鎖,其他線程就進(jìn)不來(lái),直到當(dāng)前進(jìn)程處理完成后,釋放了鎖,其他線程才能進(jìn)來(lái)進(jìn)行處理。當(dāng)訪問(wèn)某個(gè)資源之前,用Lock.acquire()鎖住資源,訪問(wèn)之后,用Lock.release()釋放資源。
import threading
value = 0
gLock = threading.Lock()
class AddValueThread(threading.Thread):
def run(self):
global value
gLock.acquire()
for x in range(1000000):
value += 1
gLock.release()
print("{}的值是{}".format(threading.current_thread().name, value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1000000
Thread-2的值是2000000
Process finished with exit code 0
(2)、ThreadLocal
介紹完線程鎖,接下來(lái)出場(chǎng)的是ThreadLocal。當(dāng)不想將變量共享給其他線程時(shí),可以使用局部變量,但在函數(shù)中定義局部變量會(huì)使得在函數(shù)之間傳遞特別麻煩。ThreadLocal是非常牛逼的東西,它解決了全局變量需要枷鎖,局部變量傳遞麻煩的兩個(gè)問(wèn)題。通過(guò)在線程中定義:
local_school = threading.local()
此時(shí)這個(gè)local_school就變成了一個(gè)全局變量,但這個(gè)全局變量只在該線程中為全局變量,對(duì)于其他線程來(lái)說(shuō)是局部變量,別的線程不可更改。
def process_thread(name): # 綁定ThreadLocal的student:
local_school.student = name
這個(gè)student屬性只有本線程可以修改,別的線程不可以。代碼:
import threading
value = 0
gLocal = threading.local()
class AddValueThread(threading.Thread):
def run(self):
gLocal.value = 0
for x in range(1000000):
gLocal.value += 1
print("{}的值是{}".format(threading.current_thread().name, gLocal.value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1000000
Thread-2的值是1000000
Process finished with exit code 0
四、生產(chǎn)者和消費(fèi)者模式
(1)、Lock版
生產(chǎn)者線程專門用來(lái)生產(chǎn)一些數(shù)據(jù),然后存放到中間變量中,消費(fèi)者再?gòu)闹虚g的變量中取出數(shù)據(jù)進(jìn)行消費(fèi)。中間變量經(jīng)常是一些全局變量,所以需要使用鎖來(lái)保證數(shù)據(jù)完整性。
import threading
import random
import time
gMoney = 1000
gTimes = 0
gLock = threading.Lock()
class Producer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
if gTimes >= 3:
gLock.release()
break
gMoney += money
print("{}當(dāng)前存入{}元錢,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gTimes += 1
time.sleep(0.5)
gLock.release()
class Consumer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 500)
gLock.acquire()
if gMoney > money:
gMoney -= money
print("{}當(dāng)前取出{}元錢,剩余{}元錢".format(threading.current_thread(), money, gMoney))
time.sleep(0.5)
else:
if gTimes >= 3:
gLock.release()
break
print("{}當(dāng)前想取出{}元錢,剩余{}元錢,不足!".format(threading.current_thread(), money, gMoney))
gLock.release()
def multi_thread():
for i in range(2):
Consumer(name="消費(fèi)者線程{}".format(i)).start()
for j in range(2):
Producer(name="生產(chǎn)者線程{}".format(j)).start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出128元錢,剩余872元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出420元錢,剩余452元錢
<Producer(生產(chǎn)者線程0, started 123145358331904)>當(dāng)前存入997元錢,剩余1449元錢
<Producer(生產(chǎn)者線程1, started 123145375121408)>當(dāng)前存入700元錢,剩余2149元錢
<Producer(生產(chǎn)者線程1, started 123145375121408)>當(dāng)前存入984元錢,剩余3133元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出221元錢,剩余2912元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出313元錢,剩余2599元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出189元錢,剩余2410元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出356元錢,剩余2054元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出109元錢,剩余1945元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出418元錢,剩余1527元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出381元錢,剩余1146元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出416元錢,剩余730元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出166元錢,剩余564元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前取出111元錢,剩余453元錢
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前取出384元錢,剩余69元錢
<Consumer(消費(fèi)者線程0, started 123145324752896)>當(dāng)前想取出415元錢,剩余69元錢,不足!
<Consumer(消費(fèi)者線程1, started 123145341542400)>當(dāng)前想取出100元錢,剩余69元錢,不足!
Process finished with exit code 0
(2)、Condition版
LOCK版本的生產(chǎn)者和消費(fèi)者存在一個(gè)不足,在消費(fèi)者中總是通過(guò)while True死循環(huán)并且上鎖的方式判斷資源夠不夠。上鎖是一個(gè)很耗費(fèi)cpu資源的行為。因此這種方式不是最好的。還有一種更好的方式是使用threading.Condition來(lái)實(shí)現(xiàn)。threading.Condition消費(fèi)者可以在沒(méi)有數(shù)據(jù)的時(shí)候處于阻塞等待狀態(tài)。生產(chǎn)者一旦有合適的數(shù)據(jù),還可以使用notify相關(guān)的函數(shù)來(lái)通知處于等待阻塞狀態(tài)的線程。這樣就可以避免一些無(wú)用的上鎖、解鎖的操作。
threading.Condition類似threading.Lock,可以在修改全局?jǐn)?shù)據(jù)的時(shí)候進(jìn)行上鎖,也可以在修改完畢后進(jìn)行解鎖。
acquire:上鎖
release:解鎖
wait:將當(dāng)前線程處于等待狀態(tài),并且會(huì)釋放鎖。可以被其他線程使用notify和notify_all函數(shù)喚醒。被喚醒后繼續(xù)等待上鎖,上鎖后繼續(xù)執(zhí)行下面的代碼。
notify:通知某個(gè)正在等待的線程,默認(rèn)是第1個(gè)等待的線程。
notify_all:通知所有正在等待的線程。
注意: notify和notify_all不會(huì)釋放鎖。并且需要在release之前調(diào)用。
import threading
import random
import time
gMoney = 1000
gCondition = threading.Condition() # 鎖
gTimes = 0
class Producer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gCondition.acquire()
if gTimes >= 3:
gCondition.release()
break
gMoney += money
print("{}當(dāng)前存入{}元錢,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gTimes += 1
gCondition.notify_all()
gCondition.release()
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 500)
gCondition.acquire()
while gMoney < money:
print("{}準(zhǔn)備消費(fèi){}元錢,還剩{}元錢,余額不足!".format(threading.current_thread(), money, gMoney))
if gTimes >= 3:
gCondition.release()
return
gCondition.wait()
gMoney -= money
print("{}消費(fèi)了{(lán)}元錢,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gCondition.release()
time.sleep(0.5)
def multi_thread():
for i in range(2):
Consumer(name="消費(fèi)者線程{}".format(i)).start()
for j in range(2):
Producer(name="生產(chǎn)者線程{}".format(j)).start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
<Consumer(消費(fèi)者線程0, started 123145357996032)>消費(fèi)了273元錢,剩余727元錢
<Consumer(消費(fèi)者線程1, started 123145374785536)>消費(fèi)了470元錢,剩余257元錢
<Producer(生產(chǎn)者線程0, started 123145391575040)>當(dāng)前存入181元錢,剩余438元錢
<Producer(生產(chǎn)者線程1, started 123145408364544)>當(dāng)前存入464元錢,剩余902元錢
<Consumer(消費(fèi)者線程0, started 123145357996032)>消費(fèi)了455元錢,剩余447元錢
<Producer(生產(chǎn)者線程0, started 123145391575040)>當(dāng)前存入677元錢,剩余1124元錢
<Consumer(消費(fèi)者線程1, started 123145374785536)>消費(fèi)了400元錢,剩余724元錢
<Consumer(消費(fèi)者線程0, started 123145357996032)>消費(fèi)了485元錢,剩余239元錢
<Consumer(消費(fèi)者線程1, started 123145374785536)>消費(fèi)了159元錢,剩余80元錢
<Consumer(消費(fèi)者線程0, started 123145357996032)>準(zhǔn)備消費(fèi)325元錢,還剩80元錢,余額不足!
<Consumer(消費(fèi)者線程1, started 123145374785536)>準(zhǔn)備消費(fèi)229元錢,還剩80元錢,余額不足!
Process finished with exit code 0
五、線程池
1.什么是線程池
引言:諸如web服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器、文件服務(wù)器和郵件服務(wù)器等許多服務(wù)器應(yīng)用都面向處理來(lái)自某些遠(yuǎn)程來(lái)源的大量短小的任務(wù)。構(gòu)建服務(wù)器應(yīng)用程序的一個(gè)過(guò)于簡(jiǎn)單的模型是:每當(dāng)一個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新的服務(wù)對(duì)象,然后在新的服務(wù)對(duì)象中為請(qǐng)求服務(wù)。但當(dāng)有大量請(qǐng)求并發(fā)訪問(wèn)時(shí),服務(wù)器不斷的創(chuàng)建和銷毀對(duì)象的開(kāi)銷很大。
所以提高服務(wù)器效率的一個(gè)手段就是盡可能減少創(chuàng)建和銷毀對(duì)象的次數(shù),特別是一些很耗資源的對(duì)象創(chuàng)建和銷毀,這樣就引入了“池”的概念,
“池”的概念使得人們可以定制一定量的資源,然后對(duì)這些資源進(jìn)行反復(fù)的使用用,而不是頻繁的創(chuàng)建和銷毀這些資源。
定義:線程池是預(yù)先創(chuàng)建線程的一種技術(shù)。這些線程都是處于睡眠狀態(tài),即均為啟動(dòng),不消耗CPU,而只是占用較小的內(nèi)存空間。當(dāng)請(qǐng)求到來(lái)之后,緩沖池給這次請(qǐng)求分配一個(gè)空閑線程,把請(qǐng)求傳入此線程中運(yùn)行,進(jìn)行處理。當(dāng)預(yù)先創(chuàng)建的線程都處于運(yùn)行狀態(tài),即預(yù)制線程不夠,線程池可以自由創(chuàng)建一定數(shù)量的新線程,用于處理更多的請(qǐng)求。當(dāng)系統(tǒng)比較閑的時(shí)候,也可以通過(guò)移除一部分一直處于停用狀態(tài)的線程。
2. Python的concurrent.futures 線程池進(jìn)程池模塊
python3.2加入了concurrent.futures模塊,實(shí)現(xiàn)了線程池和進(jìn)程池。這個(gè)主要有兩種類型:執(zhí)行器(executor)和任務(wù)容器(Future)。執(zhí)行器(executor)用來(lái)管理工作線程和進(jìn)程池,任務(wù)容器(Feature)直譯是未來(lái)對(duì)象,換句話說(shuō),就是將我們的任務(wù)(函數(shù))進(jìn)行一層包裹,封裝為未來(lái)對(duì)象。簡(jiǎn)單理解就是可以把Future看成是任務(wù)的一個(gè)容器,除了能夠銷毀任務(wù),里面還包含了任務(wù)的執(zhí)行狀態(tài)。
2.1創(chuàng)建一個(gè)Future對(duì)象
我們先手動(dòng)創(chuàng)建一個(gè)Future對(duì)象,分析一下:
from concurrent.futures import Future
# 創(chuàng)建一個(gè)Future對(duì)象
future = Future()
# 定義callback函數(shù)
def callback(future):
print(f"回調(diào)函數(shù)執(zhí)行,結(jié)果是:{future.result()}")
def test_future():
# 在Future對(duì)象中有一個(gè)add_done_callback方法,可以將future綁定一個(gè)回調(diào)函數(shù),在調(diào)用add_done_callback的時(shí)候只需要傳入函數(shù)名,future會(huì)自動(dòng)傳遞給callback的第一個(gè)參數(shù)。
print('添加回調(diào)函數(shù)')
future.add_done_callback(callback)
# 當(dāng)future執(zhí)行set_result的時(shí)候,執(zhí)行回調(diào)
print("觸發(fā)回調(diào)函數(shù)")
future.set_result("哈哈,想不到吧,我就是結(jié)果")
if __name__ == '__main__':
test_future()
值得注意的是:可以多次set_result,但是后面的會(huì)覆蓋前面的,并且result()獲取可以獲取多次。
2.2通過(guò)提交任務(wù)創(chuàng)建一個(gè)Future對(duì)象
2.2.1使用submit提交任務(wù)
submit。提交任務(wù),并返回 Future 對(duì)象代表可調(diào)用對(duì)象的執(zhí)行。
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.currentThread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
future = executor.submit(threadTask, 1)
print(f'線程狀態(tài):{future},運(yùn)行結(jié)果:{future.result()}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{future},運(yùn)行結(jié)果:{future.result()}')
if __name__ == "__main__":
localThreadPool()
運(yùn)行結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
線程狀態(tài):<Future at 0x10b346850 state=running>,運(yùn)行結(jié)果:任務(wù)1:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
線程狀態(tài):<Future at 0x10b346850 state=finished returned str>,運(yùn)行結(jié)果:任務(wù)1:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
Process finished with exit code 0
我們可以同時(shí)提交多個(gè)任務(wù)
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
if __name__ == "__main__":
localThreadPool()
運(yùn)行結(jié)果:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
# 因?yàn)槲覀兊膍ax_workers=3,所以同時(shí)先啟了三條線程
線程狀態(tài):[<Future at 0x10347fd00 state=running>, <Future at 0x1034a4be0 state=running>, <Future at 0x1034a4f70 state=running>, <Future at 0x1034ac370 state=pending>, <Future at 0x1034ac490 state=pending>]
# 我們可以看到前三條線程已經(jīng)啟動(dòng)(running),后面兩條是待定(pending)
任務(wù)3:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
#從線程池中取出兩條線程執(zhí)行任務(wù)3和任務(wù)4
線程狀態(tài):[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=running>, <Future at 0x1034ac490 state=running>]
#我們可以看到當(dāng)前三條任務(wù)已經(jīng)完成(finished),后面兩條啟動(dòng)(running)
線程狀態(tài):[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=finished returned str>, <Future at 0x1034ac490 state=finished returned str>]
#所有任務(wù)運(yùn)行結(jié)束
此外我們可以使用future.running()和future.done()來(lái)判斷當(dāng)前任務(wù)是否執(zhí)行完,這里不做演示了。
2.2.2使用map提交任務(wù)
map。和 Python 自帶的 map 函數(shù)功能類似,只不過(guò)是以異步的方式把函數(shù)依次作用在列表的每個(gè)元素上。
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
futures = executor.map(threadTask, [i for i in range(5)]) # 提交多個(gè)任務(wù),注意這里與submit的區(qū)別
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
if __name__ == "__main__":
localThreadPool()
2.3重要函數(shù)
模塊下有 2 個(gè)重要函數(shù)wait和as_completed。用來(lái)等待所有任務(wù)完成。
2.3.1 wait
wait用來(lái)等待指定的Future實(shí)例完成,它和asyncio.wait意圖很像,返回值有 2 項(xiàng),第一項(xiàng)表示完成的任務(wù)列表 (done),第二項(xiàng)表示為未完成的任務(wù)列表 (not_done):
from concurrent.futures import ThreadPoolExecutor, wait
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
print("沒(méi)有阻塞,我還可以輸出")
fs = wait(futures)
print(fs)
print("任務(wù)跑完了,我才能被放出來(lái)")
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
沒(méi)有阻塞,我還可以輸出
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
DoneAndNotDoneFutures(done={<Future at 0x1104e2c40 state=finished returned str>, <Future at 0x11050f2b0 state=finished returned str>, <Future at 0x110507eb0 state=finished returned str>, <Future at 0x110507b20 state=finished returned str>, <Future at 0x11050f3d0 state=finished returned str>}, not_done=set())
任務(wù)跑完了,我才能被放出來(lái)
Process finished with exit code 0
2.3.2 as_completed
as_completed函數(shù)返回一個(gè)包含指定的 Future 實(shí)例的迭代器,這些實(shí)例會(huì)在完成時(shí)被 yield 出來(lái):
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
for future in as_completed(futures):
print(future)
if __name__ == "__main__":
localThreadPool()
運(yùn)行結(jié)果:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
#上面的是先輸出的內(nèi)容
任務(wù)3:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
<Future at 0x110540c40 state=finished returned str>
<Future at 0x110565eb0 state=finished returned str>
<Future at 0x110565b20 state=finished returned str>
# 當(dāng)前三個(gè)任務(wù)完成后就有上面的輸出了,最后才是下面的輸出
<Future at 0x11056d3d0 state=finished returned str>
<Future at 0x11056d2b0 state=finished returned str>
注意:as_completed只能用于多個(gè)submit組成的列表,不能和map一起使用。
2.3.3等待任務(wù)完成另外兩種方法
方法1:調(diào)用executor的shutdown
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
print("沒(méi)有阻塞,我還可以輸出")
executor.shutdown()
print("任務(wù)跑完了,我才能被放出來(lái)")
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
沒(méi)有阻塞,我還可以輸出
任務(wù)3:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)跑完了,我才能被放出來(lái)
Process finished with exit code 0
方法2:使用上下文管理
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
with ThreadPoolExecutor(max_workers=3) as executor:
print("沒(méi)有阻塞,我還可以輸出")
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
print("任務(wù)跑完了,我才能被放出來(lái)")
for future in futures:
print(future)
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
沒(méi)有阻塞,我還可以輸出
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)跑完了,我才能被放出來(lái)
<Future at 0x10ff82d00 state=finished returned str>
<Future at 0x10ffa7be0 state=finished returned str>
<Future at 0x10ffa7f70 state=finished returned str>
<Future at 0x10ffb0340 state=finished returned str>
<Future at 0x10ffb0460 state=finished returned str>
2.4加入異常處理
2.4.1 submit方式提交的任務(wù)處理異常
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
num = random.randint(0, 2)/taskNum
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
for future in futures:
try:
future.result()
except Exception as exc:
print(f'{future},Generated an exception: {exc}')
if __name__ == "__main__":
localThreadPool()
運(yùn)行結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
<Future at 0x10ba96af0 state=finished raised ZeroDivisionError>,Generated an exception: division by zero
Process finished with exit code 0
2.4.2 map方式提交的任務(wù)異常處理
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random
# 創(chuàng)建單個(gè)任務(wù)
def threadTask(taskNum):
num = random.randint(0, 2) / taskNum
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進(jìn)程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個(gè)線程
# 如果不指定,那么每一個(gè)任務(wù)都會(huì)為其創(chuàng)建一個(gè)線程
with ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(threadTask, [i for i in range(5)]) # 提交多個(gè)任務(wù)
# 通過(guò)submit就直接將任務(wù)提交到線程池里面了,一旦提交,就會(huì)立刻運(yùn)行
# 提交之后,相當(dāng)于開(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下走
while 1:
try:
print(next(futures))
except StopIteration:
break
except Exception as exc:
print(f'Generated an exception: {exc}')
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
Generated an exception: division by zero
Process finished with exit code 0
如果我們采用submit的異常處理方法
輸出結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進(jìn)程id----20任務(wù)2:線程id----ThreadPoolExecutor-0_1,進(jìn)程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進(jìn)程id----20
Traceback (most recent call last):
File "/Users/xianchengchi.py", line 40, in <module>
localThreadPool()
File "/Users/xianchengchi.py", line 31, in localThreadPool
for future in futures:
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 600, in result_iterator
yield fs.pop().result()
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 433, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/xianchengchi.py", line 10, in threadTask
num = random.randint(0, 2)/taskNum
ZeroDivisionError: division by zero
Process finished with exit code 1
可以看到第一次錯(cuò)誤發(fā)生后生成器就結(jié)束了,所以一批任務(wù)中可能會(huì)出現(xiàn)異常是不合適用map的,因?yàn)閘ist(rs)或者對(duì)結(jié)果做循環(huán)是會(huì)由于某個(gè)任務(wù)拋錯(cuò)而獲得不了后面的那些任務(wù)結(jié)果,最好的方式還是submit + as_completed。
最后:善用 max_workers
ProcessPoolExecutor和ThreadPoolExecutor都接受max_workers參數(shù),表示用來(lái)執(zhí)行任務(wù)的進(jìn)程 / 線程數(shù)量。ProcessPoolExecutor 的默認(rèn)值是 CPU 的個(gè)數(shù) (通過(guò) < code>os.cpu_count () 獲得),而 ThreadPoolExecutor 的默認(rèn)值是 CPU 的個(gè)數(shù)的 5 倍!
對(duì)于初學(xué)者或者通常情況下是不需要手動(dòng)設(shè)置max_workers參數(shù),默認(rèn)值是可以足夠好的工作的。但是:
- 根據(jù)不同的業(yè)務(wù)場(chǎng)景,提高 max_workers 可以加快任務(wù)完成。不過(guò)要注意,不是值越高越高,超過(guò)一定閾值會(huì)起到反作用。尤其是在 IO 密集型的任務(wù)上使用 ThreadPoolExecutor,不同的 max_workers 差別會(huì)很大,但是影響網(wǎng)絡(luò)問(wèn)題因素太多,我這里就不舉例了。
- 有時(shí)候服務(wù)器上跑了很多重要服務(wù),不希望某個(gè)任務(wù)影響到全局,還可以按需把 max_workers 的值設(shè)置成小于默認(rèn)值。