為什么要學(xué)習(xí)多線程
同一時(shí)間做了很多事情。
使用場景
1,快速高效的爬蟲程序
一個(gè)爬蟲同時(shí)解析連接、爬取文字、爬取圖片、代理IP驗(yàn)證碼
2,多用戶同時(shí)訪問的web服務(wù)
3,電商秒殺、搶購活動(dòng)
4,物聯(lián)網(wǎng)傳感器監(jiān)控服務(wù)器
線程vs進(jìn)程vs協(xié)程
關(guān)系:操作系統(tǒng)--(包含)--進(jìn)程--(包含)--線程--(包含)--協(xié)程
重要性
1,跳槽、面試、決定薪資高度
2,解決效率問題
3,python的GIL導(dǎo)致的系列問題
4,通常會(huì)混合使用(多進(jìn)程+協(xié)程)
進(jìn)程
1,是一個(gè)執(zhí)行中的程序。任務(wù)管理器中跑的應(yīng)用。
2, 每個(gè)進(jìn)程都擁有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)。
3,操作系統(tǒng)管理其上所有進(jìn)程的執(zhí)行,并為這些進(jìn)程合理地分配時(shí)間。
4,進(jìn)程也可通過派生(fork或spawn)新的進(jìn)程來執(zhí)行其他任務(wù)。
線程
1,在同一個(gè)進(jìn)程下執(zhí)行,并共享相同的上下文。
2,一個(gè)進(jìn)程中的各個(gè)線程與主線程共享同一片數(shù)據(jù)空間。
3,線程包括開始、執(zhí)行順序和結(jié)束三部分。
4,它可以被搶占(中斷)和臨時(shí)掛起(也成為睡眠)——讓步
5,線程一般是以并發(fā)方式執(zhí)行。
并發(fā)
1,并發(fā)不能等同于并行處理。
2,它是一種屬性—程序、算法或問題的屬性。
3,并行只是并發(fā)問題的可能方法之一。
4,如果兩個(gè)事件互不影響,則兩個(gè)事件是并發(fā)的。
對(duì)多核的利用
1,單核CPU系統(tǒng)中,不存在真正的并發(fā)
2,GIL—全局解釋器鎖
3,GIL只是強(qiáng)制在任何時(shí)候只有一個(gè)線程可以執(zhí)行python代碼
4,I/O密集型(對(duì)磁盤讀寫密集)應(yīng)用與CPU密集型(計(jì)算密集)的應(yīng)用
GIL執(zhí)行順序
1,設(shè)置GIL
2,切換進(jìn)一個(gè)線程去運(yùn)行
3,執(zhí)行下面操作之一:指定數(shù)量的字節(jié)碼指令;線程主動(dòng)讓出控制權(quán)(可以調(diào)用time.sleep(0)來完成)
4,把線程設(shè)置回睡眠狀態(tài)(切換出線程)
5,解鎖GIL
6,重復(fù)上述步驟
實(shí)現(xiàn)一個(gè)線程(兩種方法)
1,用threading模塊代替thread模塊
2,用threding.Tread創(chuàng)建線程
3,start()啟動(dòng)線程
4,join()掛起線程
threading模塊的對(duì)象
Thread : 表示一個(gè)執(zhí)行線程的對(duì)象.
Lock: 鎖原語對(duì)象(和thread模塊中的鎖一樣)
Rlock: 可重入鎖對(duì)象,使單一線程可以(再次)獲得已持有的所(遞歸鎖)
Condition: 條件變量對(duì)象,使得一個(gè)線程等待另一個(gè)線程滿足特定的條件,比如改變狀態(tài)或某個(gè)數(shù)據(jù)值
Event:條件變量的通用版本,任意數(shù)量的線程等待某個(gè)事件的發(fā)生,在該事件發(fā)生后所有線程將被激活。
Semaphore:為線程間共享的有限資源提供了一個(gè)“計(jì)數(shù)器”,如果沒有可用資源時(shí)會(huì)被阻塞。
BoundedSemaphore:與Semaphore相似,不過它不允許超過初始值。
Timer:與Thread相似,不過它要在運(yùn)行前等待一段時(shí)間
Barrier:創(chuàng)建一個(gè)“障礙”,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù)。
Thread對(duì)象(實(shí)例化)數(shù)據(jù)屬性
name:線程名
ident:線程的標(biāo)識(shí)符
daemon:布爾標(biāo)志,表示這個(gè)線程是否是守護(hù)線程(界面看不到,在后臺(tái)跑)
Thread對(duì)象方法
init():實(shí)例化一個(gè)線程對(duì)象,需要有一個(gè)可調(diào)用的target,以及其參數(shù)args或kwargs。
start():開始執(zhí)行該線程
run():定義線程功能的方法(通常在子類中被應(yīng)用開發(fā)者重寫)
join(timeout=None):直至啟動(dòng)的線程終止之前一直掛起;除非給出了timeout(秒),否則會(huì)一直阻塞。
getName():返回線程名
setName(name):設(shè)定線程名
isAlivel /is_alive():布爾標(biāo)志,表示這個(gè)線程是否還存活
isDaemon():如果是守護(hù)線程,則返回True;否則,返回False
setDaemon():把線程的守護(hù)標(biāo)志設(shè)定為布爾值daemonic(必須在線程start()之前調(diào)用)
實(shí)現(xiàn)一個(gè)線程
1,第一種方式
import threading
import time
def loop():
//心得線程執(zhí)行的代碼
n = 0
while n < 5:
print(n)
now_thread = threading.current_thread() //得到當(dāng)前正在執(zhí)行的線程
print('[loop]now thread name:{0}'.format(now_thread.name))
time.sleep(1)
n += 1
def use_thread():
//使用線程來實(shí)現(xiàn)
now_thread = threading.current_thread()//取得當(dāng)前在執(zhí)行的線程
print('now thread name :{0}'.format(now_thread.name))
//設(shè)置線程
t = threading.Thread(target=loop, name='loop_thread')
//啟動(dòng)線程
t.start()
//掛起線程
t.join()
if name == 'main':
use_thread()
2,第二種方式
//定義類(繼承)的方式
import threading
import time
class LoopThread(threading.Thread):
//自定義線程,繼承自threading.Thread
n = 0
def run(self): //重寫RUN()
while self.n < 5:
print(self.n)
now_thread = threading.current_thread()
print('[loop]now thread name:{0}'.format(now_thread.name))
time.sleep(1)
self.n += 1
if name == 'main':
//當(dāng)前正在執(zhí)行的線程名稱
now_thread =threading.current_thread()
print('now thread name :{0}'.format(now_thread.name))
//設(shè)置線程
t = threading.Thread(target=loop, name='loop_thread')
//啟動(dòng)線程
t.start()
//掛起線程
t.join()
實(shí)現(xiàn)多個(gè)線程
1,最后為什么不是0?
// 多線程會(huì)共享上下文,當(dāng)多線程操作一個(gè)全局變量時(shí)可能會(huì)出現(xiàn)問題。
import threading
//我的銀行賬戶
balance = 0
def change_it(n):
//改變我的余額
global balance
balance = balance + n
balance = balance - n
print('--------------->{0}: balance--->{1}'.format(n, balance))
class ChangeBalanceThread(threading.Thread):
def __init__(self, num, *args, **kwargs)
super().__init__(*args, **kwargs)
self.num = num
def run(self, num, *args, **kwargs):
for i in range(1000):
change_it()
if name == 'main':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))
多線程中的鎖實(shí)現(xiàn)
1,Lock()
2,Rlock()
3,Condition()
import threading
import time
//獲得一把鎖,Lock和RLock是兩種不同類型的鎖,RLock支持多次鎖定
my_lock = threading.Lock()
your_lock = threading.RLock()
//我的銀行賬戶
balance = 0
def change_it(n):
//改變我的余額
global balance
//第一種處理異常的方法
try:
//添加鎖
print('start lock')
//第一種鎖Lock,第二種鎖是RLock
1,my_lock.acquire()
//2,your_lock.acquire()
//print('locked on')
///1,my_lock.acquire() //資源已經(jīng)被鎖住了,不能重復(fù)鎖
定,產(chǎn)生四所
//2, your_lock.acquire() //RLock 支持多次鎖定
print('locked two')
balance = balance + n
time.sleep(2)
balance = balance -n
time.sleep(1)
print('-N---> {0}; balance: {1}'.format(n, balance))
finally:
//釋放掉鎖
1,my_lock.release()
2,your_lock.release() // RLock支持多次鎖定
//第二種處理異常的方法,with語法不用使用acquire()和release()了,更加簡潔。
with your_lock:
balance = balance + n
time.sleep(2)
balance = balance -n
time.sleep(1)
print('-N---> {0}; balance: {1}'.format(n, balance))
class ChangeBalanceThread(threading.Thread):
//改變銀行余額的線程
def init(self, num, *args, *kwargs):
super().init(args, **kwargs)
self.num = num
def run(self):
for i in range(100):
change_it(self.num)
if name == 'main':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t1.join()
print('the last: {0}'.format(balance))
線程的調(diào)度和優(yōu)化
import time
import threading
from multiprocessing.dummy import Pool
def run(n):
//線程要做的事情
time.sleep(2)
print(threading.current_thread().name, n)
def main():
//使用傳統(tǒng)的方法來做任務(wù)
t1= time.time()
for n in range(100):
run(n)
print(time.time() - t1) //事件很長需要優(yōu)化
def main_use_thread():
//使用線程優(yōu)化任務(wù)
//資源有限,最多只能跑10個(gè)線程
t1= time.time()
ls = [ ]
for count in range(10): //每次開10個(gè)線程,開10次是100個(gè)線程。
for i in range(10):
t = threading.Thread(target=run, args=(i,))
ls.append(t)
t.start()
for l in ls:
l.join()
print(time.time() - t1)
//上一個(gè)實(shí)現(xiàn)太麻煩,繼續(xù)優(yōu)化,使用線程池POOL
def main_use_pool():
//使用線程池來優(yōu)化
t1 = time.time()
n_list = range(100)
pool = Pool(10)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t1)
//用另一種性能更好的線程池
def main_use_executor():
//使用ThreadPoolExecutor來優(yōu)化
t1 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t1)
if name == 'main':
//main()
// main_use_thread()
// main_use_pool()
main_use_executor()
進(jìn)程
1,是一個(gè)執(zhí)行中的程序
2,每個(gè)進(jìn)程都擁有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)(線程是共享上下文)
3,操作系統(tǒng)管理其上所有進(jìn)程的執(zhí)行,并為這些進(jìn)程合理地分配時(shí)間。由操作系統(tǒng)管理
4,進(jìn)程也可以通過派生(fork或spanwn)新的進(jìn)程來執(zhí)行其他任務(wù)
import time
進(jìn)程的實(shí)現(xiàn)
def do_sth():
//進(jìn)程要做的事情
print('進(jìn)程的名稱 {0}',pid: {1}.format(name, os.getpid))
time.sleep(5)
print('進(jìn)程要做的事情')
//通過面向?qū)ο蟮姆绞綄?shí)現(xiàn)
class MyProcess(Process):
def __init__(self, name, *args, **kwargs): //把所有參數(shù)都加入
self.name = name //類中的方法run需要使用name,所以要重新構(gòu)造方法
super().__init__(*args, **kwargs) //這里會(huì)覆蓋掉上一語句中的name屬性
def run(self):
print('進(jìn)程的名稱 {0}',pid: {1}.format(self.name, os.getpid))
time.sleep(5)
print('MyProcess進(jìn)程要做的事情')
if name == 'main':
p = Process(target=do_sth, args=('my process', ) //用方法//args是一個(gè)元組
p = MyProcess('my process class') //面向?qū)ο蟮姆绞?br>
//啟動(dòng)進(jìn)程
p.start()
//掛起進(jìn)程
p.join()
進(jìn)程之間的通信
1,通過Queue、Pipes等實(shí)現(xiàn)進(jìn)程之間的通信
from multiprocessing import Process, Queue, current_process
import random
import time
class WriteProcess(Proc ess):
//寫的進(jìn)程
def init(self, q, *args, *kwargs):
self.q = q
super().init(args, **kwargs)
def run(self):
//實(shí)現(xiàn)進(jìn)程的業(yè)務(wù)邏輯
//要寫的內(nèi)容
ls = [
"第1行內(nèi)容",
"第2行內(nèi)容",
"第3行內(nèi)容",
"第4行內(nèi)容",
]
for line in ls:
print('寫入內(nèi)容:{0} -{1}'.format(line, currnt_process().name))
self.q.put(line)
//每寫入一次,休息1-5秒
time.sleep(random.randint(1,5))
class ReadProcess(Process):
//讀取內(nèi)容進(jìn)程
def init(self, q, *args, *kwargs):
self.q = q
super().init(args, **kwargs)
def run(self):
while True:
content = self.q.get()
print('讀取到的內(nèi)容:{0}- {1}'.format(content), currnt_process().name)
if name == 'main':
//通過Queue共享數(shù)據(jù)
q =Queue()
//寫入內(nèi)容的進(jìn)程
t_write = WriteProcess(q)
t_write.start()
//讀取進(jìn)程啟動(dòng)
t_read = ReadProcess(q)
t_read.start()
t_write.join()
t_read.join()
//因?yàn)樽x的進(jìn)程是死循環(huán),無法等待其結(jié)束,只能強(qiáng)制終止
t_read.terminate()