07-多線程和協(xié)程

在介紹Python中的線程之前,先明確一個問題,Python中的多線程是假的多線程!
為什么這么說,我們先明確一個概念,全局解釋器鎖(GIL)

一.GIL

  • 什么是GIL
Python代碼的執(zhí)行由Python虛擬機(jī)(解釋器)來控制,同時只有一個線程在執(zhí)行。對Python虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同時只有一個線程在運行。
  • 為什么要GIL
為了線程間數(shù)據(jù)的一致性和狀態(tài)同步的完整性
  • GIL的影響
只有一個線程在運行,無法使用多核。

在多線程環(huán)境中,Python虛擬機(jī)按照以下方式執(zhí)行。
    1.設(shè)置GIL。
    2.切換到一個線程去執(zhí)行。
    3.運行。
    4.把線程設(shè)置為睡眠狀態(tài)。
    5.解鎖GIL。
    6.再次重復(fù)以上步驟。

比方我有一個4核的CPU,那么這樣一來,在單位時間內(nèi)每個核只能跑一個線程,然后時間片輪轉(zhuǎn)切換。
但是Python不一樣,它不管你有幾個核,單位時間多個核只能跑一個線程,然后時間片輪轉(zhuǎn)。
執(zhí)行一段時間后讓出,多線程在Python中只能交替執(zhí)性,10核也只能用到1個核

# 使用線程
from threading import Thread
def loop():
    while True:
        print("親愛的,我錯了,我能吃飯了嗎?")

if __name__ == '__main__':

    for i in range(3):
        t = Thread(target=loop)
        t.start()

    while True:
        pass
        

# 而如果我們變成進(jìn)程呢?cpu --100%
from multiprocessing import Process
def loop():
    while True:
        print("親愛的,我錯了,我能吃飯了嗎?")

if __name__ == '__main__':
    for i in range(3):
        t = Process(target=loop)
        t.start()

    while True:
        pass

二.線程

  • 多線程怎么使用多核
1、重寫python編譯器(官方cpython)如使用:PyPy解釋器
2、調(diào)用C語言的鏈接庫
  • cpu密集型(計算密集型)、I/O密集型
計算密集型任務(wù)由于主要消耗CPU資源,代碼運行效率至關(guān)重要,C語言編寫

IO密集型,涉及到網(wǎng)絡(luò)、磁盤IO的任務(wù)都是IO密集型任務(wù),這類任務(wù)的特點是CPU消耗很少,任務(wù)的大部分時間都在等待IO操作完成,99%的時間花費在IO上,腳本語言是首選,C語言最差。
  • 創(chuàng)建多線程
def doSth(arg):
    # 拿到當(dāng)前線程的名稱和線程號id
    threadName = threading.current_thread().getName()
    tid = threading.current_thread().ident
    for i in range(5):
        print("%s *%d @%s,tid=%d" % (arg, i, threadName, tid))
        time.sleep(2)

1、使用_thread.start_new_thread開辟子線程

def simpleThread():
    # 創(chuàng)建子線程,執(zhí)行doSth
    # 用這種方式創(chuàng)建的線程為【守護(hù)線程】(主線程死去“護(hù)衛(wèi)”也隨“主公”而去)
    _thread.start_new_thread(doSth, ("拍森",))

    mainThreadName = threading.current_thread().getName()
    print(threading.current_thread())
    
    # 5秒的時間以內(nèi),能看到主線程和子線程在并發(fā)打印
    for i in range(5):
        print("勞資是主線程@%s" % (mainThreadName))
        time.sleep(1)

    # 阻塞主線程,以使【守護(hù)線程】能夠執(zhí)行完畢
    while True:
        pass

2、 通過創(chuàng)建threading.Thread對象實現(xiàn)子線程

def threadingThread():
    # 默認(rèn)不是【守護(hù)線程】
    t = threading.Thread(target=doSth, args=("大王派我來巡山",)) # args=(,) 必須是元組
    # t.setDaemon(True)  # 設(shè)置為守護(hù)線程
    t.start()  # 啟動線程,調(diào)用run()方法
    

3、通過繼承threading.Thread類,進(jìn)而創(chuàng)建對象實現(xiàn)子線程

class MyThread(threading.Thread):
    def __init__(self, name, task, subtask):
        super().__init__()

        self.name = name  # 覆蓋了父類的name
        self.task = task  # MyThread自己的屬性
        self.subtask = subtask  # MyThread自己的屬性

    # 覆寫父類的run方法,
    # run方法以內(nèi)為【要跑在子線程內(nèi)的業(yè)務(wù)邏輯】(thread.start()會觸發(fā)的業(yè)務(wù)邏輯)
    def run(self):
        for i in range(5):
            print("【%s】并【%s】 *%d @%s" % (self.task, self.subtask, i, threading.current_thread().getName()))
            time.sleep(2)

def classThread():
    mt = MyThread("小分隊I", "巡山", "掃黃")
    mt.start()  #  啟動線程

4、幾個重要的API

def importantAPI():
    print(threading.currentThread())  # 返回當(dāng)前的線程變量
    # 創(chuàng)建五條子線程
    t1 = threading.Thread(target=doSth, args=("巡山",))
    t2 = threading.Thread(target=doSth, args=("巡水",))
    t3 = threading.Thread(target=doSth, args=("巡鳥",))

    t1.start()  # 開啟線程
    t2.start()
    t3.start()

    print(t1.isAlive())  # 返回線程是否活動的
    print(t2.isDaemon())  # 是否是守護(hù)線程
    print(t3.getName())  # 返回線程名
    t3.setName("巡鳥")  # 設(shè)置線程名
    print(t3.getName())
    print(t3.ident)  # 返回線程號

    # 返回一個包含正在運行的線程的list
    tlist = threading.enumerate()
    print("當(dāng)前活動線程:", tlist)

    # 返回正在運行的線程數(shù)量(在數(shù)值上等于len(tlist))
    count = threading.active_count()
    print("當(dāng)前活動線程有%d條" % (count))

  • 線程沖突
'''
【線程沖突】示例:
    多個線程并發(fā)訪問同一個變量而互相干擾
'''
import threading
import time
money = 0

# CPU分配的時間片不足以完成一百萬次加法運算,
# 因此結(jié)果還沒有被保存到內(nèi)存中就被其它線程所打斷
def addMoney():
    global money
    for i in range(1000000):
        money += 1
    print(money)


# 創(chuàng)建線程鎖
lock = threading.Lock()

def addMoneyWithLock():
    # print("addMoneyWithLock")
    time.sleep(1)
    global money
    # print(lock.acquire())
    # if lock.acquire():
    #     for i in range(1000000):
    #         money += 1
    # lock.release()
    # 獨占線程鎖
    with lock:  # 阻塞直到拿到線程鎖

        # -----下面的代碼只有拿到lock對象才能執(zhí)行-----
        for i in range(1000000):
            money += 1
        # 釋放線程鎖,以使其它線程能夠拿到并執(zhí)行邏輯
        # ----------------鎖已被釋放-----------------

    print(money)

# 5條線程同時訪問money變量,導(dǎo)致結(jié)果不正確
def conflictDemo():
    for i in range(5):
        t = threading.Thread(target=addMoney)
        t.start()

# 通過線程同步(依次執(zhí)行)解決線程沖突
def handleConflictBySync():
    for i in range(5):
        t = threading.Thread(target=addMoney)
        t.start()
        t.join()  # 一直阻塞到t運行完畢

# 通過依次獨占線程鎖解決線程沖突
def handleConflictByLock():
    # 并發(fā)5條線程
    for i in range(5):
        t = threading.Thread(target=addMoneyWithLock)
        t.start()

if __name__ == '__main__':
    # conflictDemo()
    # handleConflictBySync()
    handleConflictByLock()

    pass
  • 死鎖
死鎖:是指一個資源被多次調(diào)用,而多次調(diào)用方都未能釋放該資源就會造成一種互相等待的現(xiàn)象,若無外力作用,它們都將無法推進(jìn)下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖。

互相鎖住對方線程需要的資源,造成死鎖局面
  • 線程安全
1.互斥鎖
互斥鎖
    狀態(tài):鎖定/非鎖定
    # 創(chuàng)建鎖
        lock = threading.Lock()
    # 鎖定
        lock.acquire()
    # 釋放
        lock.release()
2.遞歸鎖
遞歸鎖,重用鎖,用于解決死鎖的問題,可重復(fù)鎖

# 遞歸鎖
rlock = threading.RLOCK()

  • 信號量Semaphore調(diào)度線程:控制最大并發(fā)量
'''
使用Semaphore調(diào)度線程:控制最大并發(fā)量
'''
import threading
import time

# 允許最大并發(fā)量3
sem = threading.Semaphore(3)

def doSth(arg):
    with sem:
        tname = threading.current_thread().getName()
        print("%s正在執(zhí)行【%s】" % (tname, arg))
        time.sleep(1)
        print("-----%s執(zhí)行完畢!-----\n" % (tname))
        time.sleep(0.1)

if __name__ == '__main__':

    # 開啟10條線程
    for i in range(10):
        threading.Thread(target=doSth, args=("巡山",), name="小分隊%d" % (i)).start()
    

三. 協(xié)程

協(xié)程,又稱微線程,纖程。英文名Coroutine。
首先我們得知道協(xié)程是啥?協(xié)程其實可以認(rèn)為是比線程更小的執(zhí)行單元。為啥說他是一個執(zhí)行單元,因為他自帶CPU上下文。這樣只要在合適的時機(jī),我們可以把一個協(xié)程切換到另一個協(xié)程,只要這個過程中保存或恢復(fù)CPU上下文那么程序還是可以運行的。

通俗的理解:在一個線程中的某個函數(shù),可以在任何地方保存當(dāng)前函數(shù)的一些臨時變量等信息,然后切換到另外一個函數(shù)中執(zhí)行,注意不是通過調(diào)用函數(shù)的方式做到的,并且切換的次數(shù)以及什么時候再切換到原來的函數(shù)都由開發(fā)者自己確定。
  • 協(xié)程和線程差異
協(xié)程的特點在于是一個線程執(zhí)行, 那和多線程比,協(xié)程有何優(yōu)勢?
    1. 最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
    2.第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?
    最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

協(xié)程的缺點: 它不能同時將CPU的多個核用上,只能使用一個核

Python對協(xié)程的支持是通過generator實現(xiàn)的。
在generator中,我們不但可以通過for循環(huán)來迭代,還可以不斷調(diào)用next()函數(shù)獲取由yield語句返回的下一個值。

# 協(xié)程一個簡單實現(xiàn)
def C():
    while True:
        print("=====C=====")
        yield
        time.sleep(0.5)

def D(c):
    while True:
        print("=====D=====")
        next(c)
        time.sleep(0.5)

if __name__ == "__main__":
    c = C()
    D(c)
  • 使用協(xié)程

1.使用greenlet + switch實現(xiàn)協(xié)程調(diào)度

'''
    使用greenlet + switch實現(xiàn)協(xié)程調(diào)度
'''
from greenlet import greenlet
import time


def func1():
    print("開門走進(jìn)衛(wèi)生間")
    time.sleep(3)
    gr2.switch()  # 把CPU執(zhí)行權(quán)交給gr2

    print("飛流直下三千尺")
    time.sleep(3)
    gr2.switch()
    pass


def func2():
    print("一看拖把放旁邊")
    time.sleep(3)
    gr1.switch()

    print("疑是銀河落九天")
    pass


if __name__ == '__main__':
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch()  # 把CPU執(zhí)行權(quán)先給gr1
    pass

2.使用gevent +sleep自動將CPU執(zhí)行權(quán)分配給當(dāng)前未睡眠的協(xié)程

'''
    使用gevent + sleep自動將CPU執(zhí)行權(quán)分配給當(dāng)前未睡眠的協(xié)程
'''
import gevent

def func1():
    gevent.sleep(1)
    print("大夢誰先覺")

    gevent.sleep(13)
    print("1:over")
    pass

def func2():
    gevent.sleep(3)
    print("平生我自知")

    gevent.sleep(9)
    print("2:over")
    pass

def func3():
    gevent.sleep(5)
    print("草堂春睡足")

    gevent.sleep(5)
    print("3:over")
    pass

def func4():
    gevent.sleep(7)
    print("窗外日遲遲")

    gevent.sleep(1)
    print("4:over")
    pass

def simpleGevent():
    gr1 = gevent.spawn(func1)
    gr2 = gevent.spawn(func2)
    gr3 = gevent.spawn(func3)
    gr4 = gevent.spawn(func4)
    gevent.joinall([
        gr1, gr2, gr3, gr4
    ])

if __name__ == '__main__':
    simpleGevent()
    

3.通過monkey調(diào)度

'''
    使用gevent + monkey.patch_all()自動調(diào)度網(wǎng)絡(luò)IO協(xié)程
'''
import gevent
from gevent import monkey
monkey.patch_all()  # 將【標(biāo)準(zhǔn)庫-阻塞IO實現(xiàn)】替換為【gevent-非阻塞IO實現(xiàn)】

import requests
import time

def getPageText(url, order=0):
    print("No%d:%s請求開始..." % (order, url))
    resp = requests.get(url)  # 發(fā)起網(wǎng)絡(luò)請求,返回需要時間——阻塞IO

    html = resp.text
    print("No%d:%s成功返回:長度為%d" % (order, url, len(html)))
    pass

if __name__ == '__main__':
    start = time.time()
    time.clock()
    gevent.joinall([
        gevent.spawn(getPageText, "http://www.sina.com", order=1),
        gevent.spawn(getPageText, "http://www.qq.com", order=2),
        gevent.spawn(getPageText, "http://www.baidu.com", order=3),
        gevent.spawn(getPageText, "http://www.163.com", order=4),
        gevent.spawn(getPageText, "http://www.4399.com", order=5),
        gevent.spawn(getPageText, "http://www.sohu.com", order=6),
        gevent.spawn(getPageText, "http://www.youku.com", order=7),
        gevent.spawn(getPageText, "http://www.iqiyi.com", order=8),
    ])

    end = time.time()
    print("over,耗時%d秒" % (end - start))
    print(time.clock())
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容