Num01-->多線程threading
Python中建議使用threading模塊,而不要使用thread模塊。原因如下:
1,Python中threading模塊對(duì)thread進(jìn)行了一些包裝,可以更加方便的使用。
2,Python中threading模塊能確保重要的子線程在進(jìn)程退出前結(jié)束。
3,Python中thread模塊,當(dāng)主線程技術(shù),同一主線程下的其他所有子線程都被強(qiáng)制退出。
4,Python中thread模塊,不支持守護(hù)(daemon)線程。
Test01-->多線程執(zhí)行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
import time
def say():
print("我是子線程")
time.sleep(1)
if __name__ == "__main__":
print("我是主線程")
# 創(chuàng)建三個(gè)子線程
for i in range(3):
t = threading.Thread(target=say)
# 啟動(dòng)線程
t.start()
# 結(jié)果如下:
# 我是主線程
# 我是子線程
# 我是子線程
# 我是子線程
Test02-->主線程會(huì)等待所有子線程結(jié)束后,再結(jié)束
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2017/4/28 14:05
# @Author : xiaoke
import threading
from time import sleep, ctime
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子線程結(jié)束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子線程結(jié)束")
if __name__ == '__main__':
print('主線程---開(kāi)始時(shí)間---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
# 屏蔽以下代碼,試試看,程序是否會(huì)立馬結(jié)束?
# t1.join()
# t2.join()
print('主線程---結(jié)束時(shí)間---:%s' % ctime())
# 沒(méi)有屏蔽join()函數(shù)的結(jié)果如下:
# 主線程---開(kāi)始時(shí)間---:Fri Apr 28 14:19:29 2017
# 正在唱歌...0
# 正在跳舞...0
# 正在唱歌...1
# 正在跳舞...1
# 正在跳舞...2
# 正在唱歌...2
# 跳舞子線程結(jié)束
# 唱歌子線程結(jié)束
# 主線程---結(jié)束時(shí)間---:Fri Apr 28 14:19:32 2017
# 屏蔽join()函數(shù),的結(jié)果如下:
# 主線程---開(kāi)始時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在唱歌...0
# 正在跳舞...0
# 主線程---結(jié)束時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在跳舞...1
# 正在唱歌...1
# 正在跳舞...2
# 正在唱歌...2
# 唱歌子線程結(jié)束
# 跳舞子線程結(jié)束
以上代碼說(shuō)明,子線程在啟動(dòng)后,調(diào)用join()函數(shù),是等待所有的子線程結(jié)束后,主線程才結(jié)束。否則主線程,很快就結(jié)束了。
Test03-->查看線程數(shù)量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
from time import sleep, ctime
import gc
def sing():
for i in range(3):
print("正在唱歌...%d" % i)
sleep(1)
print("唱歌子線程結(jié)束")
def dance():
for i in range(3):
print("正在跳舞...%d" % i)
sleep(1)
print("跳舞子線程結(jié)束")
if __name__ == '__main__':
print('---開(kāi)始時(shí)間---:%s' % ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
# enumerate()函數(shù)的意思:返回一個(gè)列表,存放當(dāng)前活著的線程
length = len(threading.enumerate())
print('當(dāng)前運(yùn)行的線程數(shù)為:%d' % length)
if length <= 1:
break
sleep(1)
# t1.join()
# t2.join()
print('---結(jié)束時(shí)間---:%s' % ctime())
# 手動(dòng)垃圾回收
#gc.collect()
print('當(dāng)前還活著的線程是:%s' % threading.current_thread().name)
# 結(jié)果如下:
# ---開(kāi)始時(shí)間---:Fri Apr 28 14:44:50 2017
# 正在唱歌...0
# 正在跳舞...0
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在唱歌...1
# 正在跳舞...1
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在唱歌...2
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在跳舞...2
# 唱歌子線程結(jié)束
# 當(dāng)前運(yùn)行的線程數(shù)為:2
# 跳舞子線程結(jié)束
# 當(dāng)前運(yùn)行的線程數(shù)為:1
# ---結(jié)束時(shí)間---:Fri Apr 28 14:44:54 2017
# 當(dāng)前還活著的線程是:MainThread
以上代碼加以說(shuō)明:在程序的最后,還剩下主線程還在存活,是因?yàn)橐粋€(gè)程序至少要有一個(gè)主線程,一直存活著。當(dāng)然Python垃圾回收器也會(huì)不定時(shí)的回收垃圾(引用計(jì)數(shù)和分代清除兩種機(jī)制),也可以手動(dòng)回收垃圾。
Num02-->第二種方式創(chuàng)建多線程
定義一個(gè)新的子類,繼承threading.Thread就可以,然后重寫run()方法。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm " + self.name + ' @ ' + str(i) # name屬性中保存的是當(dāng)前線程的名字
print(msg)
def main():
for i in range(3):
t = MyThread()
t.start()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# I'm Thread-2 @ 0
# I'm Thread-1 @ 0
# I'm Thread-3 @ 0
# I'm Thread-1 @ 1
# I'm Thread-3 @ 1
# I'm Thread-2 @ 1
# I'm Thread-2 @ 2
# I'm Thread-1 @ 2
# I'm Thread-3 @ 2
以上代碼加以說(shuō)明:
1,Python中threading.Thread類中有一個(gè)run()方法,用于定義線程的功能函數(shù),可以在自己定義的線程類中重寫該方法。而創(chuàng)建自己的線程實(shí)例后,通過(guò)Thread類的start()方法,可以啟動(dòng)該線程,交個(gè)Python虛擬機(jī)進(jìn)行調(diào)度,當(dāng)該線程獲得執(zhí)行的機(jī)會(huì)時(shí),就會(huì)調(diào)用run()方法執(zhí)行線程。
2,多線程程序的執(zhí)行順序是不確定的。當(dāng)執(zhí)行到sleep()語(yǔ)句時(shí),線程將被阻塞(Blocked),到sleep()結(jié)束后,線程進(jìn)入就緒狀態(tài)(Runnable),等待調(diào)度。而線程的調(diào)度將會(huì)隨機(jī)選擇一個(gè)線程執(zhí)行。上面的代碼只能保證每個(gè)線程都運(yùn)行完,整個(gè)run()函數(shù)。但是線程的啟動(dòng)順序,run()函數(shù)中每次循環(huán)的執(zhí)行順序不能確定。
Num03-->線程的幾種狀態(tài)
Num04-->多線程共享全局變量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
g_num = 100
def work1():
global g_num
for i in range(3):
g_num += 1
print("----in work1, g_num is %d---" % g_num)
def work2():
global g_num
print("----in work2, g_num is %d---" % g_num)
print("---線程創(chuàng)建之前g_num is %d---" % g_num)
t1 = Thread(target=work1)
t1.start()
t1.join()
t2 = Thread(target=work2)
t2.start()
t2.join()
以上代碼加以說(shuō)明:
1,優(yōu)點(diǎn):不管是可變類型或者不可變類型的數(shù)據(jù),在一個(gè)進(jìn)程內(nèi)的所有線程共享全局變量,能夠在不使用其他方式的前提下完成多線程之間的數(shù)據(jù)共享。
2,缺點(diǎn):多線程對(duì)全局變量的隨意改變,會(huì)造成對(duì)全局變量值的混亂。也就是多線程非安全的原因。
Num05-->多線程中的互斥鎖
當(dāng)多個(gè)線程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候,需要進(jìn)行同步控制
線程同步能夠保證多個(gè)線程安全訪問(wèn)競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖。
互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定。
某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個(gè)線程進(jìn)行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。
Test01-->互斥鎖的創(chuàng)建
#創(chuàng)建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([blocking])
#釋放
mutex.release()
加以說(shuō)明:
其中,鎖定方法acquire可以有一個(gè)blocking參數(shù)。
如果設(shè)定blocking為True,則當(dāng)前線程會(huì)堵塞,直到獲取到這個(gè)鎖為止。(默認(rèn)為True)
如果設(shè)定blocking為False,則當(dāng)前線程不會(huì)堵塞
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread, Lock
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
# True表示堵塞 即如果這個(gè)鎖在上鎖之前已經(jīng)被上鎖了,那么這個(gè)線程會(huì)在這里一直等待到解鎖為止
# False表示非堵塞,即不管本次調(diào)用能夠成功上鎖,都不會(huì)卡在這,而是繼續(xù)執(zhí)行下面的代碼
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num += 1
mutex.release()
print("---test1---g_num=%d" % g_num)
def test2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) # True表示堵塞
if mutexFlag:
g_num += 1
mutex.release()
print("---test2---g_num=%d" % g_num)
def main():
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---" % g_num)
p1.join()
p2.join()
if __name__ == '__main__':
# 創(chuàng)建一個(gè)互斥鎖
# 這個(gè)所默認(rèn)是未上鎖的狀態(tài)
mutex = Lock()
main()
# 運(yùn)行結(jié)果如下:
# ---g_num=44416---
# ---test1---g_num=1990567
# ---test2---g_num=2000000
Test02-->上鎖解鎖過(guò)程
當(dāng)一個(gè)線程調(diào)用鎖的acquire()方法獲得鎖時(shí),鎖就進(jìn)入“l(fā)ocked”鎖住狀態(tài)。
每次只有一個(gè)線程可以獲得鎖。
如果此時(shí)另一個(gè)線程試圖獲得鎖,該線程就會(huì)變成“blocked”狀態(tài),稱為阻塞。直到擁有鎖的線程調(diào)用release()函數(shù)釋放鎖之后,這時(shí)這個(gè)鎖就進(jìn)入“unlocked”解鎖的狀態(tài)。
線程調(diào)度程序,從處于同步阻塞狀態(tài)的線程中選擇一個(gè)來(lái)獲得鎖,并使得該線程進(jìn)入(running)運(yùn)行狀態(tài)。
Test03-->互斥鎖的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):確保了某段關(guān)鍵代碼只能有一個(gè)線程從頭到尾的執(zhí)行。
缺點(diǎn):
1,阻止了多線程并發(fā)執(zhí)行,包含鎖的代碼實(shí)際上是以單線程的模式執(zhí)行,效率大大降低了。
2,由于可以存在多個(gè)鎖,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖。
Num06-->線程死鎖和遞歸鎖
Test01--> 死鎖的情況
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對(duì)mutexA上鎖
if mutexA.acquire():
print(self.name + '----mutexA上鎖----')
time.sleep(1)
if mutexB.acquire():
print(self.name + '----mutexA中mutexB上鎖----')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì)mutexB上鎖
if mutexB.acquire():
print(self.name + '----mutexB上鎖----')
time.sleep(1)
if mutexA.acquire():
print(self.name + '----mutexB中mutexA上鎖----')
mutexA.release()
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# 死鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-2----mutexB上鎖----
# 我手寫的:一直停留在這里不動(dòng)
Test02--> 遞歸鎖,用于解決死鎖的問(wèn)題,對(duì)以上死鎖代碼加以修改
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對(duì)mutexA上鎖
if mutex.acquire():
print(self.name + '----mutexA上鎖----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexA中mutexB上鎖----')
mutex.release()
mutex.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì)mutexB上鎖
if mutex.acquire():
print(self.name + '----mutexB上鎖----')
time.sleep(1)
if mutex.acquire():
print(self.name + '----mutexB中mutexA上鎖----')
mutex.release()
mutex.release()
mutex = threading.RLock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
t1.join()
t2.join()
# RLock遞歸鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-1----mutexA中mutexB上鎖----
# Thread-2----mutexB上鎖----
# Thread-2----mutexB中mutexA上鎖----
Num07-->利用互斥鎖實(shí)現(xiàn)同步
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
# 利用互斥鎖實(shí)現(xiàn)同步
from threading import Thread, Lock
import time
# 先創(chuàng)建A、B、C三個(gè)鎖
lockA = Lock()
lockB = Lock()
lockC = Lock()
def task_a():
# 申請(qǐng)lockA,獲得鎖才開(kāi)始任務(wù)
while True:
lockA.acquire()
print("-----A--------")
time.sleep(1)
lockB.release()
def task_b():
# 申請(qǐng)lockB,獲得鎖才開(kāi)始任務(wù)
while True:
lockB.acquire()
print("-----B--------")
time.sleep(1)
lockC.release()
def task_c():
# 申請(qǐng)lockC,獲得鎖才開(kāi)始任務(wù)
while True:
lockC.acquire()
print("-----C--------")
time.sleep(1)
lockA.release()
def main():
# lockB和lockC在主線程獲得鎖
lockB.acquire()
lockC.acquire()
thread_a = Thread(target=task_a)
thread_b = Thread(target=task_b)
thread_c = Thread(target=task_c)
thread_a.start()
thread_b.start()
thread_c.start()
thread_a.join()
thread_b.join()
thread_c.join()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# 此處省略......一直在同步的打印A、B、C
Num08-->生產(chǎn)者和消費(fèi)者模式--Queue
Python的Queue模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)(可以理解為原子操作,即要么不做,要么就做完),能夠在多線程中直接使用??梢允褂藐?duì)列來(lái)實(shí)現(xiàn)線程間的同步。
為什么要使用生產(chǎn)者和消費(fèi)者模式???
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問(wèn)題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式???
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的。
Test01-->FIFO先進(jìn)先出隊(duì)列Queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue # 隊(duì)列,解決多線程問(wèn)題
q = queue.Queue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# 我的第一個(gè)進(jìn)來(lái)的
# 我是第二個(gè)進(jìn)來(lái)的
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}
Test02-->LIFO后入先出隊(duì)列LifoQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 后進(jìn)先出
q = queue.LifoQueue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}
# 我是第二個(gè)進(jìn)來(lái)的
# 我的第一個(gè)進(jìn)來(lái)的
Test03-->優(yōu)先級(jí)隊(duì)列PriorityQueue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import queue
# 按優(yōu)先級(jí),按照2,3,4,5這樣從小到大的優(yōu)先級(jí)
q = queue.PriorityQueue()
q.put([5, "我的第一個(gè)進(jìn)來(lái)的"])
q.put([2, "我是第二個(gè)進(jìn)來(lái)的"])
q.put([4, {"name": "我是第三個(gè)進(jìn)來(lái)的"}])
q.put([3, (2, 3, 4, 5)])
while True:
if not q.empty():
data = q.get(block=False)
print(data)
# 結(jié)果如下:
# [2, '我是第二個(gè)進(jìn)來(lái)的']
# [3, (2, 3, 4, 5)]
# [4, {'name': '我是第三個(gè)進(jìn)來(lái)的'}]
# [5, '我的第一個(gè)進(jìn)來(lái)的']
Test04-->Queue隊(duì)列的方法:
創(chuàng)建一個(gè)“隊(duì)列”對(duì)象
from queue import Queue
q = queue.Queue(maxsize = 100)
Queue類即是一個(gè)隊(duì)列的同步實(shí)現(xiàn)。隊(duì)列長(zhǎng)度可為無(wú)限或者有限??赏ㄟ^(guò)Queue的構(gòu)造函數(shù)的可選參數(shù)maxsize來(lái)設(shè)定隊(duì)列長(zhǎng)度。如果maxsize小于1就表示隊(duì)列長(zhǎng)度無(wú)限。
將一個(gè)值放入隊(duì)列中
q.put(100)
調(diào)用隊(duì)列對(duì)象的put()方法在隊(duì)尾插入一個(gè)項(xiàng)目。put()有兩個(gè)參數(shù),第一個(gè)item為必需的,為插入項(xiàng)目的值;第二個(gè)block為可選參數(shù),默認(rèn)為
1。如果隊(duì)列當(dāng)前為空且block為1,put()方法就使調(diào)用線程暫停,直到空出一個(gè)數(shù)據(jù)單元。如果block為0,put方法將引發(fā)Full異常。
將一個(gè)值從隊(duì)列中取出
q.get()
調(diào)用隊(duì)列對(duì)象的get()方法從隊(duì)頭刪除并返回一個(gè)項(xiàng)目。可選參數(shù)為block,默認(rèn)為True。如果隊(duì)列為空且block為True,
get()就使調(diào)用線程暫停,直至有項(xiàng)目可用。如果隊(duì)列為空且block為False,隊(duì)列將引發(fā)Empty異常。
此包中的常用方法(q = queue.Queue()):
q.qsize() 返回隊(duì)列的大小
q.empty() 如果隊(duì)列為空,返回True,反之False
q.full() 如果隊(duì)列滿了,返回True,反之False
q.full 與 maxsize 大小對(duì)應(yīng)
q.get([block[, timeout]]) 獲取隊(duì)列,timeout等待時(shí)間
q.get_nowait() 相當(dāng)q.get(False)
非阻塞 q.put(item) 寫入隊(duì)列,timeout等待時(shí)間
q.put_nowait(item) 相當(dāng)q.put(item, False)
q.task_done() 在完成一項(xiàng)工作之后,q.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
q.join() 實(shí)際上意味著等到隊(duì)列為空,再執(zhí)行別的操作
Test05-->生產(chǎn)者消費(fèi)者案例如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import time
from queue import Queue
# 3個(gè)生產(chǎn)者,當(dāng)產(chǎn)品數(shù)量小于100就生產(chǎn)8個(gè)
# 5個(gè)消費(fèi)者,當(dāng)產(chǎn)品數(shù)量大于10就消費(fèi)3
storageQueue = Queue() # 創(chuàng)建用于保存產(chǎn)品的隊(duì)列
# 保存產(chǎn)品編號(hào)
serial_num = 0
# 生產(chǎn)者
class Producer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
global serial_num
while True:
if storageQueue.qsize() < 100:
# 向隊(duì)列中添加消息模擬生產(chǎn)
for i in range(8):
msg = self.name + ":" + str(serial_num)
storageQueue.put(msg)
print("%s 生產(chǎn)了:%s" % (self.name, msg))
serial_num += 1
time.sleep(1)
# 消費(fèi)者
class Consumer(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
while True:
if storageQueue.qsize() > 10:
# 從消息隊(duì)列中取消息,代表消費(fèi)產(chǎn)品
for i in range(3):
msg = storageQueue.get()
print("%s消費(fèi)了:%s" % (self.name, msg))
time.sleep(1)
def main():
# 3個(gè)生產(chǎn)者
for i in range(3):
t = Producer("Producter--" + str(i))
t.start()
# 5個(gè)消費(fèi)者
for i in range(5):
t = Consumer("Consumer--" + str(i))
t.start()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# Producter--0 生產(chǎn)了:Producter--0:0
# Producter--0 生產(chǎn)了:Producter--0:1
# Producter--0 生產(chǎn)了:Producter--0:2
# Producter--0 生產(chǎn)了:Producter--0:3
# Producter--0 生產(chǎn)了:Producter--0:4
# Producter--0 生產(chǎn)了:Producter--0:5
# Producter--0 生產(chǎn)了:Producter--0:6
# Producter--0 生產(chǎn)了:Producter--0:7
# Producter--1 生產(chǎn)了:Producter--1:8
# Producter--1 生產(chǎn)了:Producter--1:9
# Producter--1 生產(chǎn)了:Producter--1:10
# Producter--1 生產(chǎn)了:Producter--1:11
# Producter--1 生產(chǎn)了:Producter--1:12
# Producter--1 生產(chǎn)了:Producter--1:13
# Producter--2 生產(chǎn)了:Producter--2:13
# Producter--2 生產(chǎn)了:Producter--2:14
# Producter--2 生產(chǎn)了:Producter--2:15
# Producter--2 生產(chǎn)了:Producter--2:16
# Producter--2 生產(chǎn)了:Producter--2:17
# Producter--2 生產(chǎn)了:Producter--2:18
# Producter--2 生產(chǎn)了:Producter--2:19
# Producter--2 生產(chǎn)了:Producter--2:20
# Producter--1 生產(chǎn)了:Producter--1:22
# Producter--1 生產(chǎn)了:Producter--1:23
# Consumer--0消費(fèi)了:Producter--0:0
# Consumer--0消費(fèi)了:Producter--0:1
# Consumer--0消費(fèi)了:Producter--0:2
# Consumer--1消費(fèi)了:Producter--0:3
# Consumer--1消費(fèi)了:Producter--0:4
# Consumer--1消費(fèi)了:Producter--0:5
# Consumer--2消費(fèi)了:Producter--0:6
# Consumer--2消費(fèi)了:Producter--0:7
# Consumer--2消費(fèi)了:Producter--1:8
# Consumer--3消費(fèi)了:Producter--1:9
# Consumer--3消費(fèi)了:Producter--1:10
# Consumer--3消費(fèi)了:Producter--1:11
# Consumer--4消費(fèi)了:Producter--1:12
# Consumer--4消費(fèi)了:Producter--1:13
# Consumer--4消費(fèi)了:Producter--2:13
# Producter--0 生產(chǎn)了:Producter--0:24
# Producter--0 生產(chǎn)了:Producter--0:25
# Producter--0 生產(chǎn)了:Producter--0:26
# Producter--0 生產(chǎn)了:Producter--0:27
# Producter--0 生產(chǎn)了:Producter--0:28
# Producter--0 生產(chǎn)了:Producter--0:29
# 我手動(dòng)寫的:此處省略很多......
Num09-->守護(hù)線程
只要非守護(hù)線程結(jié)束了,不管守護(hù)線程結(jié)束沒(méi)結(jié)束,程序都結(jié)束.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
def run(n):
print("我是守護(hù)線程--", n)
time.sleep(1)
start_time = time.time()
thread_list = []
for i in range(3):
t = threading.Thread(target=run, args=('thread_list-%s' % i,))
# 設(shè)置線程為守護(hù)狀態(tài),非守護(hù)狀態(tài)線程都退出,程序就退出,不等待守護(hù)狀態(tài)線程
t.daemon = True
t.start() # t.daemon=True 必須在 調(diào)用start()函數(shù) 前面
thread_list.append(t)
print("當(dāng)前還活著的線程數(shù)量是:", threading.active_count())
print("當(dāng)前還活著的線程有:", thread_list)
print("當(dāng)前還剩線程是:", threading.current_thread().name)
print("耗時(shí):", time.time() - start_time)
# 結(jié)果是:
# 我是守護(hù)線程-- thread_list-0
# 我是守護(hù)線程-- thread_list-1
# 我是守護(hù)線程-- thread_list-2
# 當(dāng)前還活著的線程數(shù)量是: 4
# 當(dāng)前還活著的線程有: [<Thread(Thread-1, started daemon 12852)>, <Thread(Thread-2, started daemon 6696)>, <Thread(Thread-3, started daemon 11940)>]
# 當(dāng)前還剩線程是: MainThread
# 耗時(shí): 0.0
Num10-->多線程--非共享數(shù)據(jù)
在多線程開(kāi)發(fā)中,全局變量是多個(gè)線程都共享的數(shù)據(jù),而局部變量則是各個(gè)線程的,是非共享的。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num, sleepTime):
threading.Thread.__init__(self)
self.num = num
self.sleepTime = sleepTime
# 重寫run()方法
def run(self):
self.num += 1
time.sleep(self.sleepTime)
print('線程(%s),num=%d' % (self.name, self.num))
if __name__ == '__main__':
t1 = MyThread(99, 5)
t1.start()
t2 = MyThread(199, 1)
t2.start()
t1.join()
t2.join()
# 結(jié)果如下:
# 線程(Thread-2),num=200
# 線程(Thread-1),num=100
Num11-->線程中ThreadLocal
Test01-->使用函數(shù)傳參的方法
def process_student(name):
std = Student(name)
# std是局部變量,但是每個(gè)函數(shù)都要用它,因此必須傳進(jìn)去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_2(std)
do_subtask_2(std)
每個(gè)函數(shù)一層一層調(diào)用都這么傳參數(shù)那還得了?用全局變量?也不行,因?yàn)槊總€(gè)線程處理不同的Student對(duì)象,不能共享。
Test02-->使用全局字典的方法
global_dict = {}
def std_thread(name):
std = Student(name)
# 把std放到全局變量global_dict中:
global_dict[threading.current_thread()] = std
do_task_1()
do_task_2()
def do_task_1():
# 不傳入std,而是根據(jù)當(dāng)前線程查找:
std = global_dict[threading.current_thread()]
...
def do_task_2():
# 任何函數(shù)都可以查找出當(dāng)前線程的std變量:
std = global_dict[threading.current_thread()]
...
這種方式理論上是可行的,它最大的優(yōu)點(diǎn)是消除了std對(duì)象在每層函數(shù)中的傳遞問(wèn)題,但是,每個(gè)函數(shù)獲取std的代碼有點(diǎn)low。
Test03-->使用ThreadLocal的方法
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
from threading import Thread
import threading
# 創(chuàng)建ThreadLocal對(duì)象
local_school = threading.local()
class Student(object):
def __init__(self, name):
self.name = name
# 線程中調(diào)用的函數(shù)
def do_task():
# 相當(dāng)于以當(dāng)前線程對(duì)象為key取出stu對(duì)象
stu = local_school.student
print("%s中學(xué)生名字為:%s" % (threading.current_thread().name, stu.name))
# 線程的功能函數(shù)
def task_thread(name):
# 創(chuàng)建類的實(shí)例對(duì)象
stu = Student(name)
# 相當(dāng)于以當(dāng)前的線程對(duì)象為key,放入全局的字典local_school
local_school.student = stu
do_task()
def main():
t1 = Thread(target=task_thread, args=("xiaoke",))
t2 = Thread(target=task_thread, args=("lili",))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
# 結(jié)果如下:
# Thread-1中學(xué)生名字為:xiaoke
# Thread-2中學(xué)生名字為:lili
全局變量local_school就是一個(gè)ThreadLocal對(duì)象,每個(gè)Thread對(duì)它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個(gè)屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問(wèn)題,ThreadLocal內(nèi)部會(huì)處理。
可以理解為全局變量local_school是一個(gè)dict。
ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫(kù)連接,HTTP請(qǐng)求,用戶身份信息等。這樣一個(gè)線程所有調(diào)用到的處理函數(shù),都可以非常方便地訪問(wèn)這些資源。
Test04-->小總結(jié)
一個(gè)ThreadLocal變量雖然是“全局變量”,但每個(gè)線程都只能讀寫自己線程的獨(dú)立副本,互不干擾。
ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問(wèn)題。
Num12-->GIL全局解釋器鎖
作用:保證同一時(shí)刻,無(wú)論你有多少個(gè)線程,只有一個(gè)線程被CPU執(zhí)行。
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。
所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過(guò)多線程利用多核,那只能通過(guò)C擴(kuò)展來(lái)實(shí)現(xiàn),不過(guò)這樣就失去了Python簡(jiǎn)單易用的特點(diǎn)。
不過(guò),也不用過(guò)于擔(dān)心,Python雖然不能利用多線程實(shí)現(xiàn)多核任務(wù),但可以通過(guò)多進(jìn)程實(shí)現(xiàn)多核任務(wù)。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖,互不影響。
Num13-->信號(hào)量Semaphore
信號(hào)量:是指同時(shí)開(kāi)幾個(gè)線程并發(fā)。比如廁所有5個(gè)坑,那最多只允許5個(gè)人上廁所,后面的人只能等里面的5個(gè)人都出來(lái)了,才能再進(jìn)去。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class myThread(threading.Thread):
def run(self):
# 加把鎖,可以放進(jìn)去多個(gè)(相當(dāng)于5把鎖,同時(shí)有5個(gè)線程)
if semaphore.acquire():
print(self.name)
time.sleep(3)
semaphore.release()
if __name__ == "__main__":
# 同時(shí)能有幾個(gè)線程進(jìn)去(設(shè)置為5就是一次5個(gè)線程進(jìn)去)
semaphore = threading.Semaphore(5)
thread_list = [] # 空列表
for i in range(200): # 200個(gè)線程
thread_list.append(myThread()) # 加線程對(duì)象
for t in thread_list:
t.start() # 分別啟動(dòng)
# 結(jié)果如下:每隔3秒就會(huì)同時(shí)顯示5個(gè)進(jìn)程
# Thread-1
# Thread-2
# Thread-3
# Thread-4
# Thread-5
# Thread-6
# Thread-7
# Thread-8
# Thread-9
# Thread-10
# .......
Num14-->Event線程間通信
Python提供了Event對(duì)象用于線程間通信。它是由線程設(shè)置的信號(hào)標(biāo)志,如果信號(hào)標(biāo)志為真,則其他線程等待,直到信號(hào)拿到。
Event對(duì)象實(shí)現(xiàn)了簡(jiǎn)單的線程通信機(jī)制。它提供了設(shè)置信號(hào),清除信號(hào),等待信號(hào)等,用于實(shí)現(xiàn)線程間的通信。
Events的使用
event = threading.Event()
event.wait()
Event對(duì)象wait()方法只有在內(nèi)部信號(hào)為真的時(shí)候,才會(huì)很快的執(zhí)行并完成返回。當(dāng)Event對(duì)象的內(nèi)部信號(hào)標(biāo)志為假時(shí),則wait()方法一直等待到其為真時(shí)才返回。
event.set()
使用Event的set()方法可以設(shè)置Event對(duì)象內(nèi)部的信號(hào)標(biāo)志為真。Event對(duì)象提供了isSet()方法來(lái)判斷其內(nèi)部信號(hào)標(biāo)志的狀態(tài)。當(dāng)使用Event對(duì)象的set()方法后,isSet()方法返回真。
event.clear()
使用Event對(duì)象的clear()方法可以清除Event對(duì)象內(nèi)部的信號(hào)標(biāo)志。即將其設(shè)為假,當(dāng)使用Event的clear方法后,isSet()方法返回假。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xiaoke
import threading, time
class Boss(threading.Thread):
def run(self):
print("大老板說(shuō):今晚大家都要加班到24:00。")
print(event.isSet())
event.set()
print(event.isSet())
print("辛苦大家了......加油干吧!?。?)
time.sleep(10)
print("大老板說(shuō):24:00到了,可以下班了。")
print(event.isSet())
event.set()
print(event.isSet())
class Worker(threading.Thread):
def run(self):
# 等待信號(hào)為真,取數(shù)據(jù)
event.wait()
print("加油工作,多賺錢……!")
time.sleep(5)
event.clear()
# 等待信號(hào)為真,取數(shù)據(jù)
event.wait()
print("下班回家陪老婆,孩子,哈哈哈!")
if __name__ == "__main__":
event = threading.Event()
threads = []
# 三個(gè)員工,一個(gè)老板
for i in range(3):
threads.append(Worker())
threads.append(Boss())
# 開(kāi)啟老板和員工
for t in threads:
t.start()
# 等待老板和員工工作都結(jié)束
for t in threads:
t.join()
# 結(jié)果如下:
# 大老板說(shuō):今晚大家都要加班到24:00。
# False
# True
# 辛苦大家了......加油干吧?。?!
# 加油工作,多賺錢……!
# 加油工作,多賺錢……!
# 加油工作,多賺錢……!
# 大老板說(shuō):24:00到了,可以下班了。
# False
# True
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!