前言
拖了好久,不過還是得堅持。喜歡本文的話可以加下公眾號【于你供讀】。
目錄
線程與進程
線程與進程是操作系統(tǒng)里面的術語,簡單來講,每一個應用程序都有一個自己的進程。
操作系統(tǒng)會為這些進程分配一些執(zhí)行資源,例如內存空間等。在進程中,又可以創(chuàng)建一些線程,他們共享這些內存空間,并由操作系統(tǒng)調用,以便并行計算。
我們都知道現(xiàn)代操作系統(tǒng)比如 Mac OS X,UNIX,Linux,Windows 等可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網(wǎng),一邊在聽敲代碼,一邊用 Markdown 寫博客,這就是多任務,至少同時有 3 個任務正在運行。當然還有很多任務悄悄地在后臺同時運行著,只是桌面上沒有顯示而已。對于操作系統(tǒng)來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開 PyCharm 就是一個啟動了一個 PtCharm 進程,打開 Markdown 就是啟動了一個 Md 的進程。
雖然現(xiàn)在多核 CPU 已經非常普及了??墒怯捎?CPU 執(zhí)行代碼都是順序執(zhí)行的,這時候我們就會有疑問,單核 CPU 是怎么執(zhí)行多任務的呢?
其實就是操作系統(tǒng)輪流讓各個任務交替執(zhí)行,任務 1 執(zhí)行 0.01 秒,切換到任務 2 ,任務 2 執(zhí)行 0.01 秒,再切換到任務 3 ,執(zhí)行 0.01秒……這樣反復執(zhí)行下去。表面上看,每個任務都是交替執(zhí)行的,但是,由于 CPU的執(zhí)行速度實在是太快了,我們肉眼和感覺上沒法識別出來,就像所有任務都在同時執(zhí)行一樣。
真正的并行執(zhí)行多任務只能在多核 CPU 上實現(xiàn),但是,由于任務數(shù)量遠遠多于 CPU 的核心數(shù)量,所以,操作系統(tǒng)也會自動把很多任務輪流調度到每個核心上執(zhí)行。
有些進程不僅僅只是干一件事的啊,比如瀏覽器,我們可以播放時視頻,播放音頻,看文章,編輯文章等等,其實這些都是在瀏覽器進程中的子任務。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。
由于每個進程至少要干一件事,所以,一個進程至少有一個線程。當然,一個進程也可以有多個線程,多個線程可以同時執(zhí)行,多線程的執(zhí)行方式和多進程是一樣的,也是由操作系統(tǒng)在多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執(zhí)行一樣。
那么在 Python 中我們要同時執(zhí)行多個任務怎么辦?
有兩種解決方案:
一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執(zhí)行多個任務。
還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執(zhí)行多個任務。
當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執(zhí)行的任務就更多了,當然這種模型更復雜,實際很少采用。
總結一下就是,多任務的實現(xiàn)有3種方式:
- 多進程模式;
- 多線程模式;
- 多進程+多線程模式。
同時執(zhí)行多個任務通常各個任務之間并不是沒有關聯(lián)的,而是需要相互通信和協(xié)調,有時,任務 1 必須暫停等待任務 2 完成后才能繼續(xù)執(zhí)行,有時,任務 3 和任務 4 又不能同時執(zhí)行,所以,多進程和多線程的程序的復雜度要遠遠高于我們前面寫的單進程單線程的程序。
因為復雜度高,調試困難,所以,不是迫不得已,我們也不想編寫多任務。但是,有很多時候,沒有多任務還真不行。想想在電腦上看電影,就必須由一個線程播放視頻,另一個線程播放音頻,否則,單線程實現(xiàn)的話就只能先把視頻播放完再播放音頻,或者先把音頻播放完再播放視頻,這顯然是不行的。
多線程編程
其實創(chuàng)建線程之后,線程并不是始終保持一個狀態(tài)的,其狀態(tài)大概如下:
- New 創(chuàng)建
- Runnable 就緒。等待調度
- Running 運行
- Blocked 阻塞。阻塞可能在 Wait Locked Sleeping
- Dead 消亡
線程有著不同的狀態(tài),也有不同的類型。大致可分為:
- 主線程
- 子線程
- 守護線程(后臺線程)
- 前臺線程
簡單了解完這些之后,我們開始看看具體的代碼使用了。
1、線程的創(chuàng)建
Python 提供兩個模塊進行多線程的操作,分別是 thread 和 threading
前者是比較低級的模塊,用于更底層的操作,一般應用級別的開發(fā)不常用。
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import time
import threading
class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread {}, @number: {}'.format(self.name, i))
time.sleep(1)
def main():
print("Start main threading")
# 創(chuàng)建三個線程
threads = [MyThread() for i in range(3)]
# 啟動三個線程
for t in threads:
t.start()
print("End Main threading")
if __name__ == '__main__':
main()
運行結果:
Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
End Main threading
thread Thread-2, @number: 1
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 2
thread Thread-2, @number: 3
thread Thread-3, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4
注意喔,這里不同的環(huán)境輸出的結果肯定是不一樣的。
2、線程合并(join方法)
上面的示例打印出來的結果來看,主線程結束后,子線程還在運行。那么我們需要主線程要等待子線程運行完后,再退出,要怎么辦呢?
這時候,就需要用到 join 方法了。
在上面的例子,新增一段代碼,具體如下:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import time
import threading
class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread {}, @number: {}'.format(self.name, i))
time.sleep(1)
def main():
print("Start main threading")
# 創(chuàng)建三個線程
threads = [MyThread() for i in range(3)]
# 啟動三個線程
for t in threads:
t.start()
# 一次讓新創(chuàng)建的線程執(zhí)行 join
for t in threads:
t.join()
print("End Main threading")
if __name__ == '__main__':
main()
從打印的結果,可以清楚看到,相比上面示例打印出來的結果,主線程是在等待子線程運行結束后才結束的。
Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
thread Thread-1, @number: 1
thread Thread-3, @number: 1
thread Thread-2, @number: 1
thread Thread-2, @number: 2
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 3
thread Thread-1, @number: 3
thread Thread-3, @number: 3
thread Thread-3, @number: 4
thread Thread-2, @number: 4
thread Thread-1, @number: 4
End Main threading
3、線程同步與互斥鎖
使用線程加載獲取數(shù)據(jù),通常都會造成數(shù)據(jù)不同步的情況。當然,這時候我們可以給資源進行加鎖,也就是訪問資源的線程需要獲得鎖才能訪問。
其中 threading 模塊給我們提供了一個 Lock 功能。
lock = threading.Lock()
在線程中獲取鎖
lock.acquire()
使用完成后,我們肯定需要釋放鎖
lock.release()
當然為了支持在同一線程中多次請求同一資源,Python 提供了可重入鎖(RLock)。RLock 內部維護著一個 Lock 和一個 counter 變量,counter 記錄了 acquire 的次數(shù),從而使得資源可以被多次 require。直到一個線程所有的 acquire 都被 release,其他的線程才能獲得資源。
那么怎么創(chuàng)建重入鎖呢?也是一句代碼的事情:
r_lock = threading.RLock()
4、Condition 條件變量
實用鎖可以達到線程同步,但是在更復雜的環(huán)境,需要針對鎖進行一些條件判斷。Python 提供了 Condition 對象。使用 Condition 對象可以在某些事件觸發(fā)或者達到特定的條件后才處理數(shù)據(jù),Condition 除了具有 Lock 對象的 acquire 方法和 release 方法外,還提供了 wait 和 notify 方法。線程首先 acquire 一個條件變量鎖。如果條件不足,則該線程 wait,如果滿足就執(zhí)行線程,甚至可以 notify 其他線程。其他處于 wait 狀態(tài)的線程接到通知后會重新判斷條件。
其中條件變量可以看成不同的線程先后 acquire 獲得鎖,如果不滿足條件,可以理解為被扔到一個( Lock 或 RLock )的 waiting 池。直達其他線程 notify 之后再重新判斷條件。不斷的重復這一過程,從而解決復雜的同步問題。
該模式常用于生產者消費者模式,具體看看下面在線購物買家和賣家的示例:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import threading, time
class Consumer(threading.Thread):
def __init__(self, cond, name):
# 初始化
super(Consumer, self).__init__()
self.cond = cond
self.name = name
def run(self):
# 確保先運行Seeker中的方法
time.sleep(1)
self.cond.acquire()
print(self.name + ': 我這兩件商品一起買,可以便宜點嗎')
self.cond.notify()
self.cond.wait()
print(self.name + ': 我已經提交訂單了,你修改下價格')
self.cond.notify()
self.cond.wait()
print(self.name + ': 收到,我支付成功了')
self.cond.notify()
self.cond.release()
print(self.name + ': 等待收貨')
class Producer(threading.Thread):
def __init__(self, cond, name):
super(Producer, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
# 釋放對瑣的占用,同時線程掛起在這里,直到被 notify 并重新占有瑣。
self.cond.wait()
print(self.name + ': 可以的,你提交訂單吧')
self.cond.notify()
self.cond.wait()
print(self.name + ': 好了,已經修改了')
self.cond.notify()
self.cond.wait()
print(self.name + ': 嗯,收款成功,馬上給你發(fā)貨')
self.cond.release()
print(self.name + ': 發(fā)貨商品')
cond = threading.Condition()
consumer = Consumer(cond, '買家(兩點水)')
producer = Producer(cond, '賣家(三點水)')
consumer.start()
producer.start()
輸出的結果如下:
買家(兩點水): 我這兩件商品一起買,可以便宜點嗎
賣家(三點水): 可以的,你提交訂單吧
買家(兩點水): 我已經提交訂單了,你修改下價格
賣家(三點水): 好了,已經修改了
買家(兩點水): 收到,我支付成功了
買家(兩點水): 等待收貨
賣家(三點水): 嗯,收款成功,馬上給你發(fā)貨
賣家(三點水): 發(fā)貨商品
5、線程間通信
如果程序中有多個線程,這些線程避免不了需要相互通信的。那么我們怎樣在這些線程之間安全地交換信息或數(shù)據(jù)呢?
從一個線程向另一個線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫中的隊列了。創(chuàng)建一個被多個線程共享的 Queue 對象,這些線程通過使用 put() 和 get() 操作來向隊列中添加或者刪除元素。
# -*- coding: UTF-8 -*-
from queue import Queue
from threading import Thread
isRead = True
def write(q):
# 寫數(shù)據(jù)進程
for value in ['兩點水', '三點水', '四點水']:
print('寫進 Queue 的值為:{0}'.format(value))
q.put(value)
def read(q):
# 讀取數(shù)據(jù)進程
while isRead:
value = q.get(True)
print('從 Queue 讀取的值為:{0}'.format(value))
if __name__ == '__main__':
q = Queue()
t1 = Thread(target=write, args=(q,))
t2 = Thread(target=read, args=(q,))
t1.start()
t2.start()
輸出的結果如下:
寫進 Queue 的值為:兩點水
寫進 Queue 的值為:三點水
從 Queue 讀取的值為:兩點水
寫進 Queue 的值為:四點水
從 Queue 讀取的值為:三點水
從 Queue 讀取的值為:四點水
Python 還提供了 Event 對象用于線程間通信,它是由線程設置的信號標志,如果信號標志位真,則其他線程等待直到信號接觸。
Event 對象實現(xiàn)了簡單的線程通信機制,它提供了設置信號,清楚信號,等待等用于實現(xiàn)線程間的通信。
- 設置信號
使用 Event 的 set() 方法可以設置 Event 對象內部的信號標志為真。Event 對象提供了 isSe() 方法來判斷其內部信號標志的狀態(tài)。當使用 event 對象的 set() 方法后,isSet() 方法返回真
- 清除信號
使用 Event 對象的 clear() 方法可以清除 Event 對象內部的信號標志,即將其設為假,當使用 Event 的 clear 方法后,isSet() 方法返回假
- 等待
Event 對象 wait 的方法只有在內部信號為真的時候才會很快的執(zhí)行并完成返回。當 Event 對象的內部信號標志位假時,則 wait 方法一直等待到其為真時才返回。
示例:
# -*- coding: UTF-8 -*-
import threading
class mThread(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
# 使用全局Event對象
global event
# 判斷Event對象內部信號標志
if event.isSet():
event.clear()
event.wait()
print(self.getName())
else:
print(self.getName())
# 設置Event對象內部信號標志
event.set()
# 生成Event對象
event = threading.Event()
# 設置Event對象內部信號標志
event.set()
t1 = []
for i in range(10):
t = mThread(str(i))
# 生成線程列表
t1.append(t)
for i in t1:
# 運行線程
i.start()
輸出的結果如下:
1
0
3
2
5
4
7
6
9
8
6、后臺線程
默認情況下,主線程退出之后,即使子線程沒有 join。那么主線程結束后,子線程也依然會繼續(xù)執(zhí)行。如果希望主線程退出后,其子線程也退出而不再執(zhí)行,則需要設置子線程為后臺線程。Python 提供了 setDeamon 方法。
進程
Python 中的多線程其實并不是真正的多線程,如果想要充分地使用多核 CPU 的資源,在 Python 中大部分情況需要使用多進程。Python 提供了非常好用的多進程包 multiprocessing,只需要定義一個函數(shù),Python 會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發(fā)執(zhí)行的轉換。multiprocessing 支持子進程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。
1、類 Process
創(chuàng)建進程的類:Process([group [, target [, name [, args [, kwargs]]]]])
- target 表示調用對象
- args 表示調用對象的位置參數(shù)元組
- kwargs表示調用對象的字典
- name為別名
- group實質上不使用
下面看一個創(chuàng)建函數(shù)并將其作為多個進程的例子:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval, name):
print(name + '【start】')
time.sleep(interval)
print(name + '【end】')
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=(2, '兩點水1'))
p2 = multiprocessing.Process(target=worker, args=(3, '兩點水2'))
p3 = multiprocessing.Process(target=worker, args=(4, '兩點水3'))
p1.start()
p2.start()
p3.start()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print("END!!!!!!!!!!!!!!!!!")
輸出的結果:
2、把進程創(chuàng)建成類
當然我們也可以把進程創(chuàng)建成一個類,如下面的例子,當進程 p 調用 start() 時,自動調用 run() 方法。
# -*- coding: UTF-8 -*-
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
n = 5
while n > 0:
print("當前時間: {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
輸出結果如下:
3、daemon 屬性
想知道 daemon 屬性有什么用,看下下面兩個例子吧,一個加了 daemon 屬性,一個沒有加,對比輸出的結果:
沒有加 deamon 屬性的例子:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.start()
print('【EMD】')
輸出結果:
【EMD】
工作開始時間:Mon Oct 9 17:47:06 2017
工作結果時間:Mon Oct 9 17:47:09 2017
在上面示例中,進程 p 添加 daemon 屬性:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
print('【EMD】')
輸出結果:
【EMD】
根據(jù)輸出結果可見,如果在子進程中添加了 daemon 屬性,那么當主進程結束的時候,子進程也會跟著結束。所以沒有打印子進程的信息。
4、join 方法
結合上面的例子繼續(xù),如果我們想要讓子線程執(zhí)行完該怎么做呢?
那么我們可以用到 join 方法,join 方法的主要作用是:阻塞當前進程,直到調用 join 方法的那個進程執(zhí)行完,再繼續(xù)執(zhí)行當前進程。
因此看下加了 join 方法的例子:
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
p.join()
print('【EMD】')
輸出的結果:
工作開始時間:Tue Oct 10 11:30:08 2017
工作結果時間:Tue Oct 10 11:30:11 2017
【EMD】
5、Pool
如果需要很多的子進程,難道我們需要一個一個的去創(chuàng)建嗎?
當然不用,我們可以使用進程池的方法批量創(chuàng)建子進程。
例子如下:
# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('進程的名稱:{0} ;進程的PID: {1} '.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('進程 {0} 運行了 {1} 秒'.format(name, (end - start)))
if __name__ == '__main__':
print('主進程的 PID:{0}'.format(os.getpid()))
p = Pool(4)
for i in range(6):
p.apply_async(long_time_task, args=(i,))
p.close()
# 等待所有子進程結束后在關閉主進程
p.join()
print('【End】')
輸出的結果如下:
主進程的 PID:7256
進程的名稱:0 ;進程的PID: 1492
進程的名稱:1 ;進程的PID: 12232
進程的名稱:2 ;進程的PID: 4332
進程的名稱:3 ;進程的PID: 11604
進程 2 運行了 0.6500370502471924 秒
進程的名稱:4 ;進程的PID: 4332
進程 1 運行了 1.0830621719360352 秒
進程的名稱:5 ;進程的PID: 12232
進程 5 運行了 0.029001712799072266 秒
進程 4 運行了 0.9720554351806641 秒
進程 0 運行了 2.3181326389312744 秒
進程 3 運行了 2.5331451892852783 秒
【End】
這里有一點需要注意: Pool 對象調用 join() 方法會等待所有子進程執(zhí)行完畢,調用 join() 之前必須先調用 close() ,調用close() 之后就不能繼續(xù)添加新的 Process 了。
請注意輸出的結果,子進程 0,1,2,3是立刻執(zhí)行的,而子進程 4 要等待前面某個子進程完成后才執(zhí)行,這是因為 Pool 的默認大小在我的電腦上是 4,因此,最多同時執(zhí)行 4 個進程。這是 Pool 有意設計的限制,并不是操作系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時跑 5 個進程。
6、進程間通信
Process 之間肯定是需要通信的,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。Python 的 multiprocessing 模塊包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換數(shù)據(jù)。
以 Queue 為例,在父進程中創(chuàng)建兩個子進程,一個往 Queue 里寫數(shù)據(jù),一個從 Queue 里讀數(shù)據(jù):
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
# 寫數(shù)據(jù)進程
print('寫進程的PID:{0}'.format(os.getpid()))
for value in ['兩點水', '三點水', '四點水']:
print('寫進 Queue 的值為:{0}'.format(value))
q.put(value)
time.sleep(random.random())
def read(q):
# 讀取數(shù)據(jù)進程
print('讀進程的PID:{0}'.format(os.getpid()))
while True:
value = q.get(True)
print('從 Queue 讀取的值為:{0}'.format(value))
if __name__ == '__main__':
# 父進程創(chuàng)建 Queue,并傳給各個子進程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程 pw
pw.start()
# 啟動子進程pr
pr.start()
# 等待pw結束:
pw.join()
# pr 進程里是死循環(huán),無法等待其結束,只能強行終止
pr.terminate()
輸出的結果為:
讀進程的PID:13208
寫進程的PID:10864
寫進 Queue 的值為:兩點水
從 Queue 讀取的值為:兩點水
寫進 Queue 的值為:三點水
從 Queue 讀取的值為:三點水
寫進 Queue 的值為:四點水
從 Queue 讀取的值為:四點水