寫在前面
這個(gè)系列是筆者學(xué)習(xí)python一些進(jìn)階功能的筆記和思考,水平有限,錯(cuò)漏在所難免,還請(qǐng)方家不吝指教。
什么是線程?
首先應(yīng)該了解操作系統(tǒng)如何支持多任務(wù)運(yùn)行的,這里有“并行”和“并發(fā)”兩個(gè)概念。
- 并行: 在同一時(shí)刻有多條指令在多個(gè)處理器上同時(shí)執(zhí)行,通俗點(diǎn)就是CPU數(shù)>=任務(wù)數(shù)的情況;
- 并發(fā):在同一時(shí)刻只能有一條指令執(zhí)行,但是多個(gè)任務(wù)被快速輪換執(zhí)行,使得宏觀上讓人感覺到有多個(gè)任務(wù)同時(shí)執(zhí)行的效果。通俗點(diǎn)來說就是CPU數(shù)<任務(wù)數(shù)的情況。
在操作系統(tǒng)中運(yùn)行的一個(gè)程序就是一個(gè)進(jìn)程(Process),它是代碼+資源的組合。而在一個(gè)進(jìn)程中,有一個(gè)或者多個(gè)線程(Thread)。線程是進(jìn)程的組成部分,一個(gè)進(jìn)程至少有一個(gè)主線程來完成進(jìn)程從開始到結(jié)束的全部操作。
單核CPU如何運(yùn)行多個(gè)線程?
- 時(shí)間片輪轉(zhuǎn):操作系統(tǒng)讓每個(gè)程序依次運(yùn)行極短的時(shí)間
- 優(yōu)先級(jí)調(diào)度:讓高優(yōu)先級(jí)的任務(wù)優(yōu)先占用CPU
python中多線程的實(shí)現(xiàn)
threading模塊
t1 = threading.Thread(target = someFunction, args = ()),此時(shí)會(huì)創(chuàng)建一個(gè)線程對(duì)象,但是不會(huì)直接創(chuàng)建線程。用args關(guān)鍵詞可以為函數(shù)傳入?yún)?shù),但是注意這里傳入的參數(shù)因?yàn)閿?shù)量不定,因此一定要是一個(gè)元組
調(diào)用線程對(duì)象的start方法才會(huì)創(chuàng)建線程,并且讓線程開始運(yùn)行。
如下面的例子:
import threading
def my_func(cnt):
for i in range(cnt):
print(i)
if __name__ == '__main__':
t1 = threading.Thread(target=my_func, args=(10,)) # 創(chuàng)建一個(gè)線程
t1.start() # 啟動(dòng)子線程
主線程需要負(fù)責(zé)回收分配給子線程的資源,因此主線程一定會(huì)晚于子線程結(jié)束。
繼承thread類
用繼承了Thread等類也可以實(shí)現(xiàn)線程創(chuàng)建。但是這個(gè)類中一定需要定義run方法,這樣在start啟動(dòng)線程后會(huì)自動(dòng)調(diào)用run方法,例如以下程序:
from time import sleep
import threading
class myThread(threading.Thread):
def run(self):
for i in range(10):
sleep(1)
msg = "I'm " + self.name + " @ " + str(i)
print(msg)
def main():
testThread = myThread()
testThread.start()
if __name__ == "__main__":
main()
會(huì)得到以下結(jié)果:
I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9
如果需要調(diào)用一系列的函數(shù),那么可以將其封裝在這個(gè)類中,然后在run方法里調(diào)用。
不同線程中全局變量的共享
在不同的線程中,全局變量是共享的,這使得不同線程之間協(xié)作處理數(shù)據(jù)非常方便。但是這種共享也會(huì)有負(fù)面影響 -- 造成資源競(jìng)爭(zhēng),例如如下代碼:
from time import sleep
import threading
g_num = 0
def test1(num):
global g_num
for i in range(num):
g_num += 1
print("g_num in test1: %d" %g_num)
def test2(num):
global g_num
for i in range(num):
g_num += 1
print("g_num in test2: %d" %g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
sleep(5)
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
執(zhí)行后會(huì)得到如下結(jié)果:
g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741
這里得到的g_num并不是我們想象的2000000,因?yàn)樵趫?zhí)行python代碼時(shí),我們使得數(shù)據(jù)自加的一句python代碼會(huì)被翻譯為好幾句機(jī)器碼:
- 讀取數(shù)據(jù)
- 數(shù)據(jù)+1
- 存儲(chǔ)數(shù)據(jù)
在執(zhí)行時(shí),由于只使用了一個(gè)CPU,CPU會(huì)采用時(shí)間片輪轉(zhuǎn)的方法來模擬多任務(wù)。因此實(shí)際上會(huì)在任意一步被cpu打斷,例如在完成數(shù)據(jù)+1后,在存儲(chǔ)數(shù)據(jù)時(shí)被打斷,這樣增加后的數(shù)據(jù)就沒有存入內(nèi)存,下一個(gè)線程從內(nèi)存讀取時(shí),讀到的就是沒有自增前的數(shù),這樣可能使得兩個(gè)線程中自增的數(shù)據(jù)相互覆蓋,導(dǎo)致加到最后得到的值要比想象中的小。這種問題,也叫做“數(shù)據(jù)不同步”。
線程鎖
當(dāng)多個(gè)線程幾乎同時(shí)修改一個(gè)數(shù)據(jù)時(shí),需要進(jìn)行同步控制。線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入<u>互斥鎖</u>。
互斥鎖會(huì)給資源附加一個(gè)狀態(tài):鎖定/非鎖定。
當(dāng)某個(gè)線程需要修改資源時(shí),先將其鎖定,此時(shí)其他線程不可以修改該資源;到該線程修改結(jié)束后,釋放資源,使其變?yōu)榉擎i定,其他的線程才能再次鎖定該資源,進(jìn)行修改。這樣互斥鎖就保證了每次只有一個(gè)線程進(jìn)行寫入操作,保證了多線程下全局變量的正確性。
為了實(shí)現(xiàn)互斥鎖,threading模塊中提供了Lock和RLock兩個(gè)類:
-
threading.Lock是一個(gè)基本鎖對(duì)象,每次只能鎖定一次,其余的鎖請(qǐng)求需要等鎖釋放后才能獲?。?/li> -
threading.RLock是可重入鎖(Reentrant Lock),在同一個(gè)線程中可以進(jìn)行多次鎖定和釋放,但是鎖定和釋放的方法必須成堆出現(xiàn),如果調(diào)用了n次鎖定,那么只有n次釋放才能解鎖。
Lock和RLock兩個(gè)類都提供了以下方法來鎖定和釋放:
-
acquire(blocking = True, timeout = -1)進(jìn)行鎖定,blocking為True時(shí)會(huì)堵塞當(dāng)前線程,直到其他線程釋放該鎖,當(dāng)前線程獲取到這個(gè)鎖為止;而blocking為False的情況下則不會(huì)堵塞當(dāng)前線程,而是往下執(zhí)行 -
release()釋放鎖
對(duì)于上面的問題,我們可以用互斥鎖來解決多個(gè)線程之間的資源競(jìng)爭(zhēng)問題,如夏例:
from time import sleep
import threading
g_num = 0
mutex = threading.Lock() # 建立互斥鎖
def test1(num):
global g_num
mutex.acquire(True) # 在更改數(shù)據(jù)之前獲得互斥鎖
for i in range(num):
g_num += 1
mutex.release() # 更改完數(shù)據(jù)之后釋放鎖定
print("g_num in test1: %d" %g_num)
def test2(num):
global g_num
mutex.acquire(True) # 在更改數(shù)據(jù)之前獲得互斥鎖
for i in range(num):
g_num += 1
mutex.release() # 更改完數(shù)據(jù)之后釋放鎖定
print("g_num in test2: %d" %g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
sleep(5)
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
這樣,我們?cè)谧詈蟮玫降木褪俏覀冾A(yù)期的結(jié)果了:
g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000
但是需要注意,用互斥鎖會(huì)帶來以下兩個(gè)問題:
- 阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實(shí)際上只能以單線程模式運(yùn)行,其他模式處于堵塞的狀態(tài),效率就大大下降了;
- 由于可以存在多個(gè)鎖,不同的線程持有不同鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖。
死鎖
線程間共享多個(gè)資源時(shí),如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源,就可能造成死鎖。當(dāng)出現(xiàn)死鎖時(shí),會(huì)造成應(yīng)用停止響應(yīng),如下例:
import threading
from time import sleep
lockA = threading.Lock()
lockB = threading.Lock()
def testA():
# 為進(jìn)程A上鎖
lockA.acquire()
print("----Lock A acquired in testA----")
sleep(1)
# 中間需要操作另一批數(shù)據(jù),上鎖B
lockB.acquire()
print("----Lock B acquired in testA----")
sleep(2)
# 數(shù)據(jù)操作完成,解開鎖B
lockB.release()
print("----Lock B released in testA----")
# 完成操作,釋放鎖A
lockA.release()
print("----Lock A released in testA----")
def testB():
# 開始操作時(shí)為進(jìn)程B上鎖
lockB.acquire()
print("----Lock B acquired in testB----")
sleep(1)
# 進(jìn)行下一步操作前需要獲取鎖A
lockA.acquire()
print("----Lock A acquired in testB----")
sleep(2)
# 進(jìn)行完數(shù)據(jù)操作釋放鎖A
lockA.release()
print("----Lock A released in testB----")
# 釋放鎖B
lockB.release()
print("----Lock B released in testB----")
def main():
t1 = threading.Thread(target=testA)
t2 = threading.Thread(target=testB)
t1.start()
t2.start()
if __name__ == "__main__":
main()
在這個(gè)例子中,線程t1開始時(shí),為lockA上鎖,并進(jìn)入睡眠(模擬一些耗時(shí)操作),而同時(shí)開始的線程t2為lockB上鎖,等t1向下執(zhí)行時(shí),需要lockB,lockB處于上鎖狀態(tài),因此線程t1堵塞,等待lockB被釋放;而t2在執(zhí)行時(shí),acquire lockA失敗,也進(jìn)入了堵塞狀態(tài),等待lockA被釋放。
這樣,兩個(gè)線程互相需要對(duì)方釋放互斥鎖,兩個(gè)線程都無法向下執(zhí)行,進(jìn)入了死鎖狀態(tài)。
死鎖的避免
- 程序設(shè)計(jì)時(shí)盡量避免(例如銀行家算法)
- 添加超時(shí)時(shí)間限制
ThreadLocal
除了使用互斥鎖以外,還有一種辦法可以實(shí)現(xiàn)各線程間的數(shù)據(jù)隔離,那就是使用threading.local()。它會(huì)為各個(gè)變量創(chuàng)建完全屬于他們自己的副本(也就是線程局部變量),這樣各個(gè)線程操作的就是屬于自己的私有資源,可以杜絕數(shù)據(jù)不同步的問題。
import threading
from time import sleep
from time import sleep
import threading
g_num = 0
local = threading.local()
def test1(num):
global g_num
local.g_num = g_num # 將g_num綁定一個(gè)線程局部變量
for i in range(num):
local.g_num += 1
print("g_num in test1: %d" % local.g_num)
def test2(num):
global g_num
local.g_num = g_num # 將g_num綁定一個(gè)線程局部變量
for i in range(num):
local.g_num += 1
print("g_num in test2: %d" % local.g_num)
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
t1.join() # 等待線程t1執(zhí)行完畢
t2.join() # 等待線程t2執(zhí)行完畢
print("g_num in main thread: %d" % g_num)
if __name__ == "__main__":
main()
結(jié)果為:
g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0
可以看到,盡管我們將全局變量g_num綁定在線程局部變量中,但是每個(gè)線程操作的實(shí)際上是自己的線程局部變量,并不會(huì)作用于我們綁定上去的全局變量。
Python中多線程的問題
在Python中(當(dāng)前使用版本3.7,在3.8中據(jù)說會(huì)用局部解釋器繞開GIL的問題),多線程并不能真正有效利用多核,例如如下代碼:
import threading, time, multiprocessing
def my_counter(num):
i = 0
for _ in range(num):
i += 1
return True
def main():
# 單線程順序執(zhí)行
thread_array = {}
start_time = time.time()
count_num = int(1e8)
for tid in range(2):
t = threading.Thread(target=my_counter, args=(count_num,))
t.start()
t.join()
end_time = time.time()
print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")
# 雙線程并行
print("CPU num for current machine: ",multiprocessing.cpu_count())
thread_array = {}
start_time = time.time()
for tid in range(2):
t = threading.Thread(target=my_counter, args=(count_num,))
t.start()
thread_array[tid] = t
for tid in thread_array.keys():
thread_array[tid].join()
end_time = time.time()
print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")
if __name__ == '__main__':
main()
運(yùn)行的結(jié)果如下:
Total time for two sequential threads: 11.32 s
CPU num for current machine: 4
Total time for multi-threads: 11.69 s
可以看到,在四核的電腦上,兩個(gè)線程用多線程并行和單線程串行,速度并沒有任何提高。
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。
GIL是Python解釋器設(shè)計(jì)的歷史遺留問題,通常我們用的解釋器是官方實(shí)現(xiàn)的CPython,要真正利用多核,除非重寫一個(gè)不帶GIL的解釋器。
解決方法
- 用多進(jìn)程替代多線程
multiprocess庫的出現(xiàn)很大程度上是為了彌補(bǔ)thread庫因?yàn)镚IL而低效的缺陷。它完整的復(fù)制了一套thread所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭(zhēng)搶。
當(dāng)然multiprocess也不是萬能良藥。它的引入會(huì)增加程序?qū)崿F(xiàn)時(shí)線程間數(shù)據(jù)通訊和同步的困難。就拿計(jì)數(shù)器來舉例子,如果我們要多個(gè)線程累加同一個(gè)變量,對(duì)于thread來說,申明一個(gè)global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于進(jìn)程之間無法看到對(duì)方的數(shù)據(jù),只能通過在主線程申明一個(gè)Queue,put再get或者用share memory的方法。這個(gè)額外的實(shí)現(xiàn)成本使得本來就非常痛苦的多線程程序編碼,變得更加痛苦了。
在將上面用threading實(shí)現(xiàn)的多任務(wù)改為multiprocessing:
import threading, time, multiprocessing
def my_counter(num):
i = 0
for _ in range(num):
i += 1
return True
def main():
# 單進(jìn)程順序執(zhí)行
process_array = {}
start_time = time.time()
count_num = int(1e8)
for pid in range(2):
p = multiprocessing.Process(target=my_counter, args=(count_num,))
p.start()
p.join()
end_time = time.time()
print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")
# 多進(jìn)程并行
print("CPU num for current machine: ", multiprocessing.cpu_count())
process_array = {}
start_time = time.time()
for tid in range(2):
t = multiprocessing.Process(target=my_counter, args=(count_num,))
t.start()
process_array[tid] = t
for tid in process_array.keys():
process_array[tid].join()
end_time = time.time()
print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")
if __name__ == '__main__':
main()
結(jié)果如下,可以看到多進(jìn)程相比單進(jìn)程,速度有了明顯提升:
Total time for two sequential processes: 11.3 s
CPU num for current machine: 4
Total time for multi-processings: 7.91 s
- 用其他解析器
之前也提到了既然GIL只是CPython的產(chǎn)物,那么其他解析器是不是更好呢?沒錯(cuò),像JPython和IronPython這樣的解析器由于實(shí)現(xiàn)語言的特性,他們不需要GIL的幫助。然而由于用了Java/C#用于解析器實(shí)現(xiàn),他們也失去了利用社區(qū)眾多C語言模塊有用特性的機(jī)會(huì)。所以這些解析器也因此一直都比較小眾。畢竟功能和性能大家在初期都會(huì)選擇前者,Done is better than perfect。