進(jìn)程同步-鎖
互斥鎖
模擬搶票程序:
from multiprocessing import Process,Lock # 導(dǎo)入鎖模塊
import json
import time
import random
import os
def search():
time.sleep(random.randint(1,3))
dic=json.load(open('db.txt','r',encoding='utf-8'))
print('%s 查看到余票數(shù)%s' %(os.getpid(),dic['count']))
def get():
dic=json.load(open('db.txt','r',encoding='utf-8'))
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1,3))
json.dump(dic,open('db.txt','w',encoding='utf-8'))
print("%s 購票成功!" %os.getpid())
def task(mutex):
search()
mutex.acquire() # 對寫操作加鎖
get()
mutex.release() # 寫完成后解鎖
if __name__=='__main__':
mutex=Lock()
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
在多個(gè)進(jìn)程要對同一個(gè)文件進(jìn)行修改時(shí),要避免出現(xiàn)多個(gè)進(jìn)程同時(shí)修改的情況,這種情況下我們需要對修改這一步操作加鎖,使他們串行運(yùn)行(只在修改處串行運(yùn)行),修改完成后解鎖,只有解鎖后下一個(gè)進(jìn)程才能修改。
將并發(fā)變成串行,犧牲了效率,但是保證了數(shù)據(jù)安全。
信號量
信號量指是多個(gè)線程可以同時(shí)執(zhí)行,可以理解為大家排隊(duì)上廁所,而信號量就是廁所里面的坑位,一時(shí)搶占不到坑位的線程就排隊(duì)等待,直到坑位被空出:
from threading import Thread, Semaphore, current_thread
import time, random
sm = Semaphore(5) # 信號量,允許同時(shí)開幾個(gè)線程,這里指5個(gè)
def task():
with sm:
print('%s is laing' % current_thread().getName())
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task)
t.start()
死鎖與遞歸鎖
死鎖現(xiàn)象: 當(dāng)進(jìn)程/線程A 那到A鎖后接著想要獲取B鎖,而此時(shí)B鎖被進(jìn)程/線程B擁有,B進(jìn)程/線程想要獲取A鎖,這樣A,B兩個(gè)進(jìn)程/線程相互等待的卡死的現(xiàn)象叫做死鎖現(xiàn)象。
解決辦法,使用遞歸鎖:
from threading import Thread, RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print('%s 拿到了A鎖' % self.name)
mutexB.acquire()
print('%s 拿到了B鎖' % self.name)
mutexA.release()
mutexB.release()
def f2(self):
mutexB.acquire()
print('%s 拿到了B鎖' % self.name)
time.sleep(0.1)
mutexA.acquire()
print('%s 拿到了A鎖' % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
輸出結(jié)果:
Thread-1 拿到了A鎖
Thread-1 拿到了B鎖
Thread-1 拿到了B鎖
Thread-1 拿到了A鎖
Thread-2 拿到了A鎖
Thread-2 拿到了B鎖
Thread-2 拿到了B鎖
Thread-2 拿到了A鎖
Thread-4 拿到了A鎖
Thread-4 拿到了B鎖
Thread-4 拿到了B鎖
Thread-4 拿到了A鎖
Thread-6 拿到了A鎖
Thread-6 拿到了B鎖
Thread-6 拿到了B鎖
Thread-6 拿到了A鎖
Thread-8 拿到了A鎖
Thread-8 拿到了B鎖
Thread-8 拿到了B鎖
Thread-8 拿到了A鎖
Thread-10 拿到了A鎖
Thread-10 拿到了B鎖
Thread-10 拿到了B鎖
Thread-10 拿到了A鎖
Thread-5 拿到了A鎖
Thread-5 拿到了B鎖
Thread-5 拿到了B鎖
Thread-5 拿到了A鎖
Thread-9 拿到了A鎖
Thread-9 拿到了B鎖
Thread-9 拿到了B鎖
Thread-9 拿到了A鎖
Thread-7 拿到了A鎖
Thread-7 拿到了B鎖
Thread-7 拿到了B鎖
Thread-7 拿到了A鎖
Thread-3 拿到了A鎖
Thread-3 拿到了B鎖
Thread-3 拿到了B鎖
Thread-3 拿到了A鎖
使用Rlock,當(dāng)鎖被加到線程后,會對鎖進(jìn)行一次計(jì)數(shù),再次加鎖計(jì)數(shù)會增加1次,只有當(dāng)鎖全部釋放后計(jì)數(shù)歸零,此時(shí)才能允許其他線程加鎖。
Event 事件
當(dāng)一個(gè)線程運(yùn)行前必須有其他初始化的工作,等待其他線程初始化完成之后才能啟動此進(jìn)程開始工作,或者在有些場景下喚起某個(gè)正在等待的線程繼續(xù)工作,在這種場景下就需要使用event事件進(jìn)行處理。
初始情況下Event對象中的信號標(biāo)志被設(shè)置為假,如果有線程等待一個(gè)Event對象,而這個(gè)Event對象的標(biāo)志位假,難么這個(gè)線程將會被一直阻塞直至該標(biāo)志為真。一個(gè)線程如果將一個(gè)Event對象的信號設(shè)置為真,它將喚醒所有等待這個(gè)Event對象的線程。如果一個(gè)線程等待一個(gè)已經(jīng)被設(shè)置為真的Event對象,那么它將忽略這個(gè)事件,繼續(xù)執(zhí)行。
常用方法如下:
event.isSet(): 返回event的狀態(tài)值
event.wait(): 如果event.isSet() == False 將阻塞線程
event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài),等待操作系統(tǒng)調(diào)度
event.clear(): 恢復(fù)event的狀態(tài)值為False
示例代碼:
from threading import Thread, Event, current_thread
import time
event = Event() # 初始狀態(tài)下event回阻塞其他線程
def check():
print('checking MySQL ...')
time.sleep(5)
event.set() # 激活其他線程,修改event狀態(tài)為True
print(event.is_set())
def conn():
count = 1
while not event.is_set(): # 判斷當(dāng)前的狀態(tài)是否為阻塞狀態(tài),阻塞狀態(tài)為False
if count > 3: # 設(shè)置嘗試次數(shù),超過3次拋出超時(shí)異常
raise TimeoutError('time out...')
print('%s trying to connect MySQL %s times...' % (current_thread().getName(), count))
event.wait(3)
count += 1
print('%s connect MySQL' % current_thread().getName())
if __name__ == '__main__':
t1 = Thread(target=check)
t2 = Thread(target=conn)
t3 = Thread(target=conn)
t1.start()
t2.start()
t3.start()
輸出結(jié)果:
checking MySQL ...
Thread-2 trying to connect MySQL 1 times...
Thread-3 trying to connect MySQL 1 times...
Thread-3 trying to connect MySQL 2 times...
Thread-2 trying to connect MySQL 2 times...
TrueThread-3 connect MySQL
Thread-2 connect MySQL
定時(shí)器
在一段時(shí)間以后運(yùn)行一個(gè)任務(wù),一般可以用于驗(yàn)證碼的場景。
from threading import Timer
def hello(name):
print('hello world! %s' % name)
t = Timer(3, hello, args=('come on!',)) # 3秒之后自動觸發(fā)運(yùn)行 hello
t.start()