Python進(jìn)階 - 高性能計(jì)算之多線程

寫在前面

這個(gè)系列是筆者學(xué)習(xí)python一些進(jìn)階功能的筆記和思考,水平有限,錯(cuò)漏在所難免,還請(qǐng)方家不吝指教。

什么是線程?

首先應(yīng)該了解操作系統(tǒng)如何支持多任務(wù)運(yùn)行的,這里有“并行”和“并發(fā)”兩個(gè)概念。

  • 并行: 在同一時(shí)刻有多條指令在多個(gè)處理器上同時(shí)執(zhí)行,通俗點(diǎn)就是CPU數(shù)>=任務(wù)數(shù)的情況;
  • 并發(fā):在同一時(shí)刻只能有一條指令執(zhí)行,但是多個(gè)任務(wù)被快速輪換執(zhí)行,使得宏觀上讓人感覺到有多個(gè)任務(wù)同時(shí)執(zhí)行的效果。通俗點(diǎn)來說就是CPU數(shù)<任務(wù)數(shù)的情況。

在操作系統(tǒng)中運(yùn)行的一個(gè)程序就是一個(gè)進(jìn)程(Process),它是代碼+資源的組合。而在一個(gè)進(jìn)程中,有一個(gè)或者多個(gè)線程(Thread)。線程是進(jìn)程的組成部分,一個(gè)進(jìn)程至少有一個(gè)主線程來完成進(jìn)程從開始到結(jié)束的全部操作。

單核CPU如何運(yùn)行多個(gè)線程?

  • 時(shí)間片輪轉(zhuǎn):操作系統(tǒng)讓每個(gè)程序依次運(yùn)行極短的時(shí)間
  • 優(yōu)先級(jí)調(diào)度:讓高優(yōu)先級(jí)的任務(wù)優(yōu)先占用CPU

python中多線程的實(shí)現(xiàn)

threading模塊

t1 = threading.Thread(target = someFunction, args = ()),此時(shí)會(huì)創(chuàng)建一個(gè)線程對(duì)象,但是不會(huì)直接創(chuàng)建線程。用args關(guān)鍵詞可以為函數(shù)傳入?yún)?shù),但是注意這里傳入的參數(shù)因?yàn)閿?shù)量不定,因此一定要是一個(gè)元組

調(diào)用線程對(duì)象的start方法才會(huì)創(chuàng)建線程,并且讓線程開始運(yùn)行。

如下面的例子:

import threading


def my_func(cnt):
    for i in range(cnt):
        print(i)


if __name__ == '__main__':
    t1 = threading.Thread(target=my_func, args=(10,))  # 創(chuàng)建一個(gè)線程
    t1.start() # 啟動(dòng)子線程

主線程需要負(fù)責(zé)回收分配給子線程的資源,因此主線程一定會(huì)晚于子線程結(jié)束。

繼承thread類

用繼承了Thread等類也可以實(shí)現(xiàn)線程創(chuàng)建。但是這個(gè)類中一定需要定義run方法,這樣在start啟動(dòng)線程后會(huì)自動(dòng)調(diào)用run方法,例如以下程序:

from time import sleep
import threading

class myThread(threading.Thread):
    def run(self):
        for i in range(10):
            sleep(1)
            msg = "I'm " + self.name + " @ " + str(i)
            print(msg)

def main():
    testThread = myThread()
    testThread.start()

if __name__ == "__main__":
    main()

會(huì)得到以下結(jié)果:

I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
I'm Thread-1 @ 3
I'm Thread-1 @ 4
I'm Thread-1 @ 5
I'm Thread-1 @ 6
I'm Thread-1 @ 7
I'm Thread-1 @ 8
I'm Thread-1 @ 9

如果需要調(diào)用一系列的函數(shù),那么可以將其封裝在這個(gè)類中,然后在run方法里調(diào)用。

不同線程中全局變量的共享

在不同的線程中,全局變量是共享的,這使得不同線程之間協(xié)作處理數(shù)據(jù)非常方便。但是這種共享也會(huì)有負(fù)面影響 -- 造成資源競(jìng)爭(zhēng),例如如下代碼:

from time import sleep
import threading

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

執(zhí)行后會(huì)得到如下結(jié)果:

g_num in test1: 1221968
g_num in test2: 1323741
g_num in main thread: 1323741

這里得到的g_num并不是我們想象的2000000,因?yàn)樵趫?zhí)行python代碼時(shí),我們使得數(shù)據(jù)自加的一句python代碼會(huì)被翻譯為好幾句機(jī)器碼:

  • 讀取數(shù)據(jù)
  • 數(shù)據(jù)+1
  • 存儲(chǔ)數(shù)據(jù)

在執(zhí)行時(shí),由于只使用了一個(gè)CPU,CPU會(huì)采用時(shí)間片輪轉(zhuǎn)的方法來模擬多任務(wù)。因此實(shí)際上會(huì)在任意一步被cpu打斷,例如在完成數(shù)據(jù)+1后,在存儲(chǔ)數(shù)據(jù)時(shí)被打斷,這樣增加后的數(shù)據(jù)就沒有存入內(nèi)存,下一個(gè)線程從內(nèi)存讀取時(shí),讀到的就是沒有自增前的數(shù),這樣可能使得兩個(gè)線程中自增的數(shù)據(jù)相互覆蓋,導(dǎo)致加到最后得到的值要比想象中的小。這種問題,也叫做“數(shù)據(jù)不同步”。

線程鎖

當(dāng)多個(gè)線程幾乎同時(shí)修改一個(gè)數(shù)據(jù)時(shí),需要進(jìn)行同步控制。線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入<u>互斥鎖</u>。

互斥鎖會(huì)給資源附加一個(gè)狀態(tài):鎖定/非鎖定。

當(dāng)某個(gè)線程需要修改資源時(shí),先將其鎖定,此時(shí)其他線程不可以修改該資源;到該線程修改結(jié)束后,釋放資源,使其變?yōu)榉擎i定,其他的線程才能再次鎖定該資源,進(jìn)行修改。這樣互斥鎖就保證了每次只有一個(gè)線程進(jìn)行寫入操作,保證了多線程下全局變量的正確性。

為了實(shí)現(xiàn)互斥鎖,threading模塊中提供了Lock和RLock兩個(gè)類:

  • threading.Lock是一個(gè)基本鎖對(duì)象,每次只能鎖定一次,其余的鎖請(qǐng)求需要等鎖釋放后才能獲?。?/li>
  • threading.RLock是可重入鎖(Reentrant Lock),在同一個(gè)線程中可以進(jìn)行多次鎖定和釋放,但是鎖定和釋放的方法必須成堆出現(xiàn),如果調(diào)用了n次鎖定,那么只有n次釋放才能解鎖。

Lock和RLock兩個(gè)類都提供了以下方法來鎖定和釋放:

  • acquire(blocking = True, timeout = -1)進(jìn)行鎖定,blocking為True時(shí)會(huì)堵塞當(dāng)前線程,直到其他線程釋放該鎖,當(dāng)前線程獲取到這個(gè)鎖為止;而blocking為False的情況下則不會(huì)堵塞當(dāng)前線程,而是往下執(zhí)行
  • release()釋放鎖

對(duì)于上面的問題,我們可以用互斥鎖來解決多個(gè)線程之間的資源競(jìng)爭(zhēng)問題,如夏例:

from time import sleep
import threading

g_num = 0
mutex = threading.Lock() # 建立互斥鎖

def test1(num):
    global g_num
    mutex.acquire(True) # 在更改數(shù)據(jù)之前獲得互斥鎖
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完數(shù)據(jù)之后釋放鎖定
    print("g_num in test1: %d" %g_num)


def test2(num):
    global g_num
    mutex.acquire(True) # 在更改數(shù)據(jù)之前獲得互斥鎖
    for i in range(num):
        g_num += 1
    mutex.release() # 更改完數(shù)據(jù)之后釋放鎖定
    print("g_num in test2: %d" %g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    sleep(5)

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

這樣,我們?cè)谧詈蟮玫降木褪俏覀冾A(yù)期的結(jié)果了:

g_num in test1: 1000000
g_num in test2: 2000000
g_num in main thread: 2000000

但是需要注意,用互斥鎖會(huì)帶來以下兩個(gè)問題:

  • 阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實(shí)際上只能以單線程模式運(yùn)行,其他模式處于堵塞的狀態(tài),效率就大大下降了;
  • 由于可以存在多個(gè)鎖,不同的線程持有不同鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖。

死鎖

線程間共享多個(gè)資源時(shí),如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源,就可能造成死鎖。當(dāng)出現(xiàn)死鎖時(shí),會(huì)造成應(yīng)用停止響應(yīng),如下例:

import threading
from time import sleep


lockA = threading.Lock()
lockB = threading.Lock()

def testA():
    # 為進(jìn)程A上鎖
    lockA.acquire()
    print("----Lock A acquired in testA----")
    sleep(1)
    # 中間需要操作另一批數(shù)據(jù),上鎖B
    lockB.acquire()
    print("----Lock B acquired in testA----")
    sleep(2)
    # 數(shù)據(jù)操作完成,解開鎖B
    lockB.release()
    print("----Lock B released in testA----")
    # 完成操作,釋放鎖A
    lockA.release()
    print("----Lock A released in testA----")


def testB():
    # 開始操作時(shí)為進(jìn)程B上鎖
    lockB.acquire()
    print("----Lock B acquired in testB----")
    sleep(1)
    # 進(jìn)行下一步操作前需要獲取鎖A
    lockA.acquire()
    print("----Lock A acquired in testB----")
    sleep(2)
    # 進(jìn)行完數(shù)據(jù)操作釋放鎖A
    lockA.release()
    print("----Lock A released in testB----")
    # 釋放鎖B
    lockB.release()
    print("----Lock B released in testB----")


def main():
    t1 = threading.Thread(target=testA)
    t2 = threading.Thread(target=testB)

    t1.start()
    t2.start()


if __name__ == "__main__":
    main()

在這個(gè)例子中,線程t1開始時(shí),為lockA上鎖,并進(jìn)入睡眠(模擬一些耗時(shí)操作),而同時(shí)開始的線程t2為lockB上鎖,等t1向下執(zhí)行時(shí),需要lockB,lockB處于上鎖狀態(tài),因此線程t1堵塞,等待lockB被釋放;而t2在執(zhí)行時(shí),acquire lockA失敗,也進(jìn)入了堵塞狀態(tài),等待lockA被釋放。

這樣,兩個(gè)線程互相需要對(duì)方釋放互斥鎖,兩個(gè)線程都無法向下執(zhí)行,進(jìn)入了死鎖狀態(tài)。

死鎖的避免

  • 程序設(shè)計(jì)時(shí)盡量避免(例如銀行家算法)
  • 添加超時(shí)時(shí)間限制

ThreadLocal

除了使用互斥鎖以外,還有一種辦法可以實(shí)現(xiàn)各線程間的數(shù)據(jù)隔離,那就是使用threading.local()。它會(huì)為各個(gè)變量創(chuàng)建完全屬于他們自己的副本(也就是線程局部變量),這樣各個(gè)線程操作的就是屬于自己的私有資源,可以杜絕數(shù)據(jù)不同步的問題。

import threading
from time import sleep

from time import sleep
import threading

g_num = 0
local = threading.local()


def test1(num):
    global g_num
    local.g_num = g_num  # 將g_num綁定一個(gè)線程局部變量
    for i in range(num):
        local.g_num += 1
    print("g_num in test1: %d" % local.g_num)


def test2(num):
    global g_num
    local.g_num = g_num  # 將g_num綁定一個(gè)線程局部變量
    for i in range(num):
        local.g_num += 1
    print("g_num in test2: %d" % local.g_num)


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()
    t1.join()  # 等待線程t1執(zhí)行完畢
    t2.join()  # 等待線程t2執(zhí)行完畢

    print("g_num in main thread: %d" % g_num)


if __name__ == "__main__":
    main()

結(jié)果為:

g_num in test1: 1000000
g_num in test2: 1000000
g_num in main thread: 0

可以看到,盡管我們將全局變量g_num綁定在線程局部變量中,但是每個(gè)線程操作的實(shí)際上是自己的線程局部變量,并不會(huì)作用于我們綁定上去的全局變量。

Python中多線程的問題

在Python中(當(dāng)前使用版本3.7,在3.8中據(jù)說會(huì)用局部解釋器繞開GIL的問題),多線程并不能真正有效利用多核,例如如下代碼:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 單線程順序執(zhí)行
    thread_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        t.join()
    end_time = time.time()
    print("Total time for two sequential threads: ", round(end_time - start_time, 2), " s")

    # 雙線程并行
    print("CPU num for current machine: ",multiprocessing.cpu_count())
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = threading.Thread(target=my_counter, args=(count_num,))
        t.start()
        thread_array[tid] = t
    for tid in thread_array.keys():
        thread_array[tid].join()
    end_time = time.time()
    print("Total time for multi-threads: ", round(end_time - start_time, 2), " s")

if __name__ == '__main__':
    main()

運(yùn)行的結(jié)果如下:

Total time for two sequential threads:  11.32  s
CPU num for current machine:  4
Total time for multi-threads:  11.69  s

可以看到,在四核的電腦上,兩個(gè)線程用多線程并行和單線程串行,速度并沒有任何提高。

因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。

GIL是Python解釋器設(shè)計(jì)的歷史遺留問題,通常我們用的解釋器是官方實(shí)現(xiàn)的CPython,要真正利用多核,除非重寫一個(gè)不帶GIL的解釋器。

解決方法

  • 用多進(jìn)程替代多線程

multiprocess庫的出現(xiàn)很大程度上是為了彌補(bǔ)thread庫因?yàn)镚IL而低效的缺陷。它完整的復(fù)制了一套thread所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭(zhēng)搶。

當(dāng)然multiprocess也不是萬能良藥。它的引入會(huì)增加程序?qū)崿F(xiàn)時(shí)線程間數(shù)據(jù)通訊和同步的困難。就拿計(jì)數(shù)器來舉例子,如果我們要多個(gè)線程累加同一個(gè)變量,對(duì)于thread來說,申明一個(gè)global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocess由于進(jìn)程之間無法看到對(duì)方的數(shù)據(jù),只能通過在主線程申明一個(gè)Queue,put再get或者用share memory的方法。這個(gè)額外的實(shí)現(xiàn)成本使得本來就非常痛苦的多線程程序編碼,變得更加痛苦了。

在將上面用threading實(shí)現(xiàn)的多任務(wù)改為multiprocessing:

import threading, time, multiprocessing


def my_counter(num):
    i = 0
    for _ in range(num):
        i += 1
    return True


def main():
    # 單進(jìn)程順序執(zhí)行
    process_array = {}
    start_time = time.time()
    count_num = int(1e8)
    for pid in range(2):
        p = multiprocessing.Process(target=my_counter, args=(count_num,))
        p.start()
        p.join()
    end_time = time.time()
    print("Total time for two sequential processes: ", round(end_time - start_time, 2), " s")

    # 多進(jìn)程并行
    print("CPU num for current machine: ", multiprocessing.cpu_count())
    process_array = {}
    start_time = time.time()
    for tid in range(2):
        t = multiprocessing.Process(target=my_counter, args=(count_num,))
        t.start()
        process_array[tid] = t
    for tid in process_array.keys():
        process_array[tid].join()
    end_time = time.time()
    print("Total time for multi-processings: ", round(end_time - start_time, 2), " s")


if __name__ == '__main__':
    main()

結(jié)果如下,可以看到多進(jìn)程相比單進(jìn)程,速度有了明顯提升:

Total time for two sequential processes:  11.3  s
CPU num for current machine:  4
Total time for multi-processings:  7.91  s
  • 用其他解析器

之前也提到了既然GIL只是CPython的產(chǎn)物,那么其他解析器是不是更好呢?沒錯(cuò),像JPython和IronPython這樣的解析器由于實(shí)現(xiàn)語言的特性,他們不需要GIL的幫助。然而由于用了Java/C#用于解析器實(shí)現(xiàn),他們也失去了利用社區(qū)眾多C語言模塊有用特性的機(jī)會(huì)。所以這些解析器也因此一直都比較小眾。畢竟功能和性能大家在初期都會(huì)選擇前者,Done is better than perfect。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念,程序并不能單獨(dú)運(yùn)行,只有...
    go以恒閱讀 1,800評(píng)論 0 6
  • 引言&動(dòng)機(jī) 考慮一下這個(gè)場(chǎng)景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,...
    妄心xyx閱讀 1,000評(píng)論 0 30
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,773評(píng)論 0 5
  • 引言&動(dòng)機(jī) 考慮一下這個(gè)場(chǎng)景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,...
    chen_000閱讀 589評(píng)論 0 0
  • 年前的一場(chǎng)百年難遇的寒潮侵入廣東,和朋友環(huán)海南島騎行的計(jì)劃無奈擱淺,只能做完課程設(shè)計(jì)后,收拾好行李早早回家,從而進(jìn)...
    木薯先生閱讀 2,173評(píng)論 7 7

友情鏈接更多精彩內(nèi)容