在介紹Python中的線程之前,先明確一個問題,Python中的多線程是假的多線程!
為什么這么說,我們先明確一個概念,全局解釋器鎖(GIL)
一.GIL
- 什么是GIL
Python代碼的執(zhí)行由Python虛擬機(jī)(解釋器)來控制,同時只有一個線程在執(zhí)行。對Python虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同時只有一個線程在運行。
- 為什么要GIL
為了線程間數(shù)據(jù)的一致性和狀態(tài)同步的完整性
- GIL的影響
只有一個線程在運行,無法使用多核。
在多線程環(huán)境中,Python虛擬機(jī)按照以下方式執(zhí)行。
1.設(shè)置GIL。
2.切換到一個線程去執(zhí)行。
3.運行。
4.把線程設(shè)置為睡眠狀態(tài)。
5.解鎖GIL。
6.再次重復(fù)以上步驟。
比方我有一個4核的CPU,那么這樣一來,在單位時間內(nèi)每個核只能跑一個線程,然后時間片輪轉(zhuǎn)切換。
但是Python不一樣,它不管你有幾個核,單位時間多個核只能跑一個線程,然后時間片輪轉(zhuǎn)。
執(zhí)行一段時間后讓出,多線程在Python中只能交替執(zhí)性,10核也只能用到1個核
# 使用線程
from threading import Thread
def loop():
while True:
print("親愛的,我錯了,我能吃飯了嗎?")
if __name__ == '__main__':
for i in range(3):
t = Thread(target=loop)
t.start()
while True:
pass
# 而如果我們變成進(jìn)程呢?cpu --100%
from multiprocessing import Process
def loop():
while True:
print("親愛的,我錯了,我能吃飯了嗎?")
if __name__ == '__main__':
for i in range(3):
t = Process(target=loop)
t.start()
while True:
pass
二.線程
- 多線程怎么使用多核
1、重寫python編譯器(官方cpython)如使用:PyPy解釋器
2、調(diào)用C語言的鏈接庫
- cpu密集型(計算密集型)、I/O密集型
計算密集型任務(wù)由于主要消耗CPU資源,代碼運行效率至關(guān)重要,C語言編寫
IO密集型,涉及到網(wǎng)絡(luò)、磁盤IO的任務(wù)都是IO密集型任務(wù),這類任務(wù)的特點是CPU消耗很少,任務(wù)的大部分時間都在等待IO操作完成,99%的時間花費在IO上,腳本語言是首選,C語言最差。
- 創(chuàng)建多線程
def doSth(arg):
# 拿到當(dāng)前線程的名稱和線程號id
threadName = threading.current_thread().getName()
tid = threading.current_thread().ident
for i in range(5):
print("%s *%d @%s,tid=%d" % (arg, i, threadName, tid))
time.sleep(2)
1、使用_thread.start_new_thread開辟子線程
def simpleThread():
# 創(chuàng)建子線程,執(zhí)行doSth
# 用這種方式創(chuàng)建的線程為【守護(hù)線程】(主線程死去“護(hù)衛(wèi)”也隨“主公”而去)
_thread.start_new_thread(doSth, ("拍森",))
mainThreadName = threading.current_thread().getName()
print(threading.current_thread())
# 5秒的時間以內(nèi),能看到主線程和子線程在并發(fā)打印
for i in range(5):
print("勞資是主線程@%s" % (mainThreadName))
time.sleep(1)
# 阻塞主線程,以使【守護(hù)線程】能夠執(zhí)行完畢
while True:
pass
2、 通過創(chuàng)建threading.Thread對象實現(xiàn)子線程
def threadingThread():
# 默認(rèn)不是【守護(hù)線程】
t = threading.Thread(target=doSth, args=("大王派我來巡山",)) # args=(,) 必須是元組
# t.setDaemon(True) # 設(shè)置為守護(hù)線程
t.start() # 啟動線程,調(diào)用run()方法
3、通過繼承threading.Thread類,進(jìn)而創(chuàng)建對象實現(xiàn)子線程
class MyThread(threading.Thread):
def __init__(self, name, task, subtask):
super().__init__()
self.name = name # 覆蓋了父類的name
self.task = task # MyThread自己的屬性
self.subtask = subtask # MyThread自己的屬性
# 覆寫父類的run方法,
# run方法以內(nèi)為【要跑在子線程內(nèi)的業(yè)務(wù)邏輯】(thread.start()會觸發(fā)的業(yè)務(wù)邏輯)
def run(self):
for i in range(5):
print("【%s】并【%s】 *%d @%s" % (self.task, self.subtask, i, threading.current_thread().getName()))
time.sleep(2)
def classThread():
mt = MyThread("小分隊I", "巡山", "掃黃")
mt.start() # 啟動線程
4、幾個重要的API
def importantAPI():
print(threading.currentThread()) # 返回當(dāng)前的線程變量
# 創(chuàng)建五條子線程
t1 = threading.Thread(target=doSth, args=("巡山",))
t2 = threading.Thread(target=doSth, args=("巡水",))
t3 = threading.Thread(target=doSth, args=("巡鳥",))
t1.start() # 開啟線程
t2.start()
t3.start()
print(t1.isAlive()) # 返回線程是否活動的
print(t2.isDaemon()) # 是否是守護(hù)線程
print(t3.getName()) # 返回線程名
t3.setName("巡鳥") # 設(shè)置線程名
print(t3.getName())
print(t3.ident) # 返回線程號
# 返回一個包含正在運行的線程的list
tlist = threading.enumerate()
print("當(dāng)前活動線程:", tlist)
# 返回正在運行的線程數(shù)量(在數(shù)值上等于len(tlist))
count = threading.active_count()
print("當(dāng)前活動線程有%d條" % (count))
- 線程沖突
'''
【線程沖突】示例:
多個線程并發(fā)訪問同一個變量而互相干擾
'''
import threading
import time
money = 0
# CPU分配的時間片不足以完成一百萬次加法運算,
# 因此結(jié)果還沒有被保存到內(nèi)存中就被其它線程所打斷
def addMoney():
global money
for i in range(1000000):
money += 1
print(money)
# 創(chuàng)建線程鎖
lock = threading.Lock()
def addMoneyWithLock():
# print("addMoneyWithLock")
time.sleep(1)
global money
# print(lock.acquire())
# if lock.acquire():
# for i in range(1000000):
# money += 1
# lock.release()
# 獨占線程鎖
with lock: # 阻塞直到拿到線程鎖
# -----下面的代碼只有拿到lock對象才能執(zhí)行-----
for i in range(1000000):
money += 1
# 釋放線程鎖,以使其它線程能夠拿到并執(zhí)行邏輯
# ----------------鎖已被釋放-----------------
print(money)
# 5條線程同時訪問money變量,導(dǎo)致結(jié)果不正確
def conflictDemo():
for i in range(5):
t = threading.Thread(target=addMoney)
t.start()
# 通過線程同步(依次執(zhí)行)解決線程沖突
def handleConflictBySync():
for i in range(5):
t = threading.Thread(target=addMoney)
t.start()
t.join() # 一直阻塞到t運行完畢
# 通過依次獨占線程鎖解決線程沖突
def handleConflictByLock():
# 并發(fā)5條線程
for i in range(5):
t = threading.Thread(target=addMoneyWithLock)
t.start()
if __name__ == '__main__':
# conflictDemo()
# handleConflictBySync()
handleConflictByLock()
pass
- 死鎖
死鎖:是指一個資源被多次調(diào)用,而多次調(diào)用方都未能釋放該資源就會造成一種互相等待的現(xiàn)象,若無外力作用,它們都將無法推進(jìn)下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖。
互相鎖住對方線程需要的資源,造成死鎖局面
- 線程安全
1.互斥鎖
互斥鎖
狀態(tài):鎖定/非鎖定
# 創(chuàng)建鎖
lock = threading.Lock()
# 鎖定
lock.acquire()
# 釋放
lock.release()
2.遞歸鎖
遞歸鎖,重用鎖,用于解決死鎖的問題,可重復(fù)鎖
# 遞歸鎖
rlock = threading.RLOCK()
- 信號量Semaphore調(diào)度線程:控制最大并發(fā)量
'''
使用Semaphore調(diào)度線程:控制最大并發(fā)量
'''
import threading
import time
# 允許最大并發(fā)量3
sem = threading.Semaphore(3)
def doSth(arg):
with sem:
tname = threading.current_thread().getName()
print("%s正在執(zhí)行【%s】" % (tname, arg))
time.sleep(1)
print("-----%s執(zhí)行完畢!-----\n" % (tname))
time.sleep(0.1)
if __name__ == '__main__':
# 開啟10條線程
for i in range(10):
threading.Thread(target=doSth, args=("巡山",), name="小分隊%d" % (i)).start()
三. 協(xié)程
協(xié)程,又稱微線程,纖程。英文名Coroutine。
首先我們得知道協(xié)程是啥?協(xié)程其實可以認(rèn)為是比線程更小的執(zhí)行單元。為啥說他是一個執(zhí)行單元,因為他自帶CPU上下文。這樣只要在合適的時機(jī),我們可以把一個協(xié)程切換到另一個協(xié)程,只要這個過程中保存或恢復(fù)CPU上下文那么程序還是可以運行的。
通俗的理解:在一個線程中的某個函數(shù),可以在任何地方保存當(dāng)前函數(shù)的一些臨時變量等信息,然后切換到另外一個函數(shù)中執(zhí)行,注意不是通過調(diào)用函數(shù)的方式做到的,并且切換的次數(shù)以及什么時候再切換到原來的函數(shù)都由開發(fā)者自己確定。
- 協(xié)程和線程差異
協(xié)程的特點在于是一個線程執(zhí)行, 那和多線程比,協(xié)程有何優(yōu)勢?
1. 最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
2.第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?
最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。
協(xié)程的缺點: 它不能同時將CPU的多個核用上,只能使用一個核
Python對協(xié)程的支持是通過generator實現(xiàn)的。
在generator中,我們不但可以通過for循環(huán)來迭代,還可以不斷調(diào)用next()函數(shù)獲取由yield語句返回的下一個值。
# 協(xié)程一個簡單實現(xiàn)
def C():
while True:
print("=====C=====")
yield
time.sleep(0.5)
def D(c):
while True:
print("=====D=====")
next(c)
time.sleep(0.5)
if __name__ == "__main__":
c = C()
D(c)
- 使用協(xié)程
1.使用greenlet + switch實現(xiàn)協(xié)程調(diào)度
'''
使用greenlet + switch實現(xiàn)協(xié)程調(diào)度
'''
from greenlet import greenlet
import time
def func1():
print("開門走進(jìn)衛(wèi)生間")
time.sleep(3)
gr2.switch() # 把CPU執(zhí)行權(quán)交給gr2
print("飛流直下三千尺")
time.sleep(3)
gr2.switch()
pass
def func2():
print("一看拖把放旁邊")
time.sleep(3)
gr1.switch()
print("疑是銀河落九天")
pass
if __name__ == '__main__':
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 把CPU執(zhí)行權(quán)先給gr1
pass
2.使用gevent +sleep自動將CPU執(zhí)行權(quán)分配給當(dāng)前未睡眠的協(xié)程
'''
使用gevent + sleep自動將CPU執(zhí)行權(quán)分配給當(dāng)前未睡眠的協(xié)程
'''
import gevent
def func1():
gevent.sleep(1)
print("大夢誰先覺")
gevent.sleep(13)
print("1:over")
pass
def func2():
gevent.sleep(3)
print("平生我自知")
gevent.sleep(9)
print("2:over")
pass
def func3():
gevent.sleep(5)
print("草堂春睡足")
gevent.sleep(5)
print("3:over")
pass
def func4():
gevent.sleep(7)
print("窗外日遲遲")
gevent.sleep(1)
print("4:over")
pass
def simpleGevent():
gr1 = gevent.spawn(func1)
gr2 = gevent.spawn(func2)
gr3 = gevent.spawn(func3)
gr4 = gevent.spawn(func4)
gevent.joinall([
gr1, gr2, gr3, gr4
])
if __name__ == '__main__':
simpleGevent()
3.通過monkey調(diào)度
'''
使用gevent + monkey.patch_all()自動調(diào)度網(wǎng)絡(luò)IO協(xié)程
'''
import gevent
from gevent import monkey
monkey.patch_all() # 將【標(biāo)準(zhǔn)庫-阻塞IO實現(xiàn)】替換為【gevent-非阻塞IO實現(xiàn)】
import requests
import time
def getPageText(url, order=0):
print("No%d:%s請求開始..." % (order, url))
resp = requests.get(url) # 發(fā)起網(wǎng)絡(luò)請求,返回需要時間——阻塞IO
html = resp.text
print("No%d:%s成功返回:長度為%d" % (order, url, len(html)))
pass
if __name__ == '__main__':
start = time.time()
time.clock()
gevent.joinall([
gevent.spawn(getPageText, "http://www.sina.com", order=1),
gevent.spawn(getPageText, "http://www.qq.com", order=2),
gevent.spawn(getPageText, "http://www.baidu.com", order=3),
gevent.spawn(getPageText, "http://www.163.com", order=4),
gevent.spawn(getPageText, "http://www.4399.com", order=5),
gevent.spawn(getPageText, "http://www.sohu.com", order=6),
gevent.spawn(getPageText, "http://www.youku.com", order=7),
gevent.spawn(getPageText, "http://www.iqiyi.com", order=8),
])
end = time.time()
print("over,耗時%d秒" % (end - start))
print(time.clock())