多線程
多任務的概念
什么叫“多任務”呢?簡單地說,就是操作系統(tǒng)可以同時運行多個任務。
單核CPU如何執(zhí)行多任務? 多核CPU如何執(zhí)行多任務?
真正的并行執(zhí)行多任務只能在多核CPU上實現(xiàn),但是,由于任務數(shù)量遠遠多于CPU的核心數(shù)量,所以,操作系統(tǒng)也會自動把很多任務輪流調度到每個核心上執(zhí)行。
注意:
并發(fā):
指的是任務數(shù)多余cpu核數(shù),通過操作系統(tǒng)的各種任務調度算法,實現(xiàn)用多個任務“一起”執(zhí)行(實際上總有一些任務不在執(zhí)行,因為切換任務的速度相當快,看上去一起執(zhí)行而已)
并行:
指的是任務數(shù)小于等于cpu核數(shù),即任務真的是一起執(zhí)行的
threading.Thread參數(shù)介紹
- target:線程執(zhí)行的函數(shù)
- name:線程名稱
- args:執(zhí)行函數(shù)中需要傳遞的參數(shù),元組類型 另外:注意daemon參數(shù)
- 如果某個子線程的daemon屬性為False,主線程結束時會檢測該子線程是否結束,如果該子線程還在運行,則主線程會等待它完成后再退出;
- 如果某個子線程的daemon屬性為True,主線程運行結束時不對這個子線程進行檢查而直接退出,同時所有daemon值為True的子線程將隨主線程一起結束,而不論是否運行完成。
- 屬性daemon的值默認為False,如果需要修改,必須在調用start()方法啟動線程之前進行設置
代碼:
import threading,queue
def write_data(queue):
for i in range(0,100):
print(threading.current_thread().name+'正在寫入'+str(i))
queue.put(i)
def read_data(queue):
while queue.empty() is not True:
print(queue.get())
if __name__ == '__main__':
queue = queue.Queue()
w_td = threading.Thread(target=write_data,name='東方紅1號',args=(queue,))
w_td.start()
w_td.join()
r_td = threading.Thread(target=write_data,name='東方紅2號',args=(queue,))
r_td.start()
r_td.join()
互斥鎖
當多個線程幾乎同時修改某一個共享數(shù)據(jù)的時候,需要進行同步控制
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。
互斥鎖為資源引入一個狀態(tài):鎖定/非鎖定
某個線程要更改共享數(shù)據(jù)時,先將其鎖定,此時資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源?;コ怄i保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。
threading模塊中定義了Lock類,可以方便的處理鎖定:
創(chuàng)建鎖
xc_lock = threading.Lock()
鎖定
xc_lock.acquire()
釋放
xc_lock.release()
使用互斥鎖完成2個線程對同一個全局變量各加100萬次的操作
import threading
import time
g_num = 0
def test1(num):
global g_num
for i in range(num):
mutex.acquire() # 上鎖
g_num += 1
mutex.release() # 解鎖
print("---test1---g_num=%d"%g_num)
def test2(num):
global g_num
for i in range(num):
mutex.acquire() # 上鎖
g_num += 1
mutex.release() # 解鎖
print("---test2---g_num=%d"%g_num)
# 創(chuàng)建一個互斥鎖
# 默認是未上鎖的狀態(tài)
mutex = threading.Lock()
# 創(chuàng)建2個線程,讓他們各自對g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()
p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()
p1.join()
p2.join()
print("2個線程對同一個全局變量操作之后的最終結果是:%s" % g_num)
死鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源,就會造成死鎖。
盡管死鎖很少發(fā)生,但一旦發(fā)生就會造成應用的停止響應。下面看一個死鎖的例子
#coding=utf-8
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 對mutexA上鎖
mutexA.acquire()
# mutexA上鎖后,延時1秒,等待另外那個線程 把mutexB上鎖
print(self.name+'----do1---up----')
time.sleep(1)
# 此時會堵塞,因為這個mutexB已經被另外的線程搶先上鎖了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 對mutexA解鎖
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 對mutexB上鎖
mutexB.acquire()
# mutexB上鎖后,延時1秒,等待另外那個線程 把mutexA上鎖
print(self.name+'----do2---up----')
time.sleep(1)
# 此時會堵塞,因為這個mutexA已經被另外的線程搶先上鎖了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 對mutexB解鎖
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
線程池
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優(yōu)先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。
代碼:
from concurrent.futures import ThreadPoolExecutor
def down_page_data():
pass
def download_done():
pass
if __name__ == '__main__':
#實例化一個線程池
#max_workers:在線程池中要創(chuàng)建的線程的數(shù)量
thread_pool = ThreadPoolExecutor(max_workers=10)
for page in range(1,101):
handler = thread_pool.submit(down_page_data,args=(page,))
handler.add_done_callback(download_done)
thread_pool.shutdown()