本系列主要學習Python的基本使用和語法知識,后續(xù)可能會圍繞著AI學習展開。
Python3 (1) Python語言的簡介
Python3 (2) Python語法基礎
Python3 (3) Python函數(shù)
Python3 (4) Python高級特性
Python3 (5) Python 函數(shù)式編程
Python3 (6) Python 模塊
Python3 (7) Python 面向?qū)ο缶幊?/a>
Python3 (8) Python 面向?qū)ο蟾呒壘幊?/a>
Python3 (9) Python 錯誤、調(diào)試和測試
Python3 (10) Python IO編程
Python3 (11) Python 進程和線程
進程和線程是多并發(fā)開發(fā)中非常重要的兩個概念,也是衡量一個開發(fā)人員技術水平的一個很重要依據(jù),可想而知,應用好進程和線程的難度有多大,不是一天或者一篇文章可以學到的,而是一個開發(fā)人員慢慢成長,在項目中積累,根據(jù)各種應用場景,選擇最佳的技術方案。所以我們今天只聊一些進程、線程的概念,和Python中封裝的一些使用方法。千里之行,始于足下,我們開始吧。
進程和線程
進程和線程是多任務操作系統(tǒng)中的概念 ,如Mac OS X,UNIX,Linux,Windows等操作系統(tǒng),對于操作系統(tǒng)來說,一個任務就是一個進程(Process),如在一臺Android設備(android 采用Linux做內(nèi)核)上打開一個網(wǎng)易云客戶端聽歌、打開一個微信客戶端聊天、打開一個今日頭條看新聞等每一個應用就是一個進程,操作系統(tǒng)會輪流的將多任務調(diào)度到核心的CPU上執(zhí)行?,F(xiàn)在的硬件CPU基本上都是多核,處理能力成倍的提升。 線程就更好理解了,因為線程是最小的執(zhí)行單元,所以每個進程至少擁有一個線程,比如android的某個應用打開時就創(chuàng)建了一個主線程,如果要進行IO操作、網(wǎng)絡請求等耗時操作就需要開啟多個工作線程,這就是在一個進程中同時創(chuàng)建多個子任務(Thread) 的典型例子。Python既支持多進程,又支持多線程,我們會討論如何編寫這兩種多任務程序。
多進程
Python是跨平臺的,提供了一個跨平臺的多進程支持。
multiprocessing模塊就是跨平臺版本的多進程模塊。但是針對Unix/Linux操作系統(tǒng)提供了一個fork(),所以這兩種操作系統(tǒng)或延伸的系統(tǒng)如mac(基于BSD(Unix的一種)內(nèi)核)等在Python的os模塊封裝的各個系統(tǒng)的方法調(diào)用包括fork()方法,所以在Python中部分系統(tǒng)也可以通過fork()來創(chuàng)建進程。
from multiprocessing import Pool
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
# works on All:
# 子進程要執(zhí)行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
#開始進程
p.start()
#等待子進程結束后再繼續(xù)往下運行
p.join()
print('Child process end.')
運行結果就不寫了,應為我是window系統(tǒng),第一中通過fork()復制子進程的方法不能運行,fork()與Process()兩種方法都可以創(chuàng)建子進程,這樣我們就可以通過多個進程來執(zhí)行多個任務。當然進程模塊還有很多方法join()可以實現(xiàn)進程間的同步、還有守護進程等概念。
進程池(Pool)
Python 中提供了進程池來批量創(chuàng)建、管理進程
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
#核心進程數(shù)
p = Pool(4)
for i in range(5):
# 創(chuàng)建子進程
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
#關閉進程池,關閉后不能添加新的子進程
p.close()
#進程間同步,子進程運行完成后,代碼繼續(xù)執(zhí)行
p.join()
print('All subprocesses done.')
輸出結果:
Parent process 10624.
Waiting for all subprocesses done...
Run task 0 (6784)...
Run task 1 (11812)...
Run task 2 (740)...
Run task 3 (11048)...
Task 2 runs 0.07 seconds.
Run task 4 (740)...
Task 3 runs 0.30 seconds.
Task 1 runs 0.87 seconds.
Task 4 runs 0.98 seconds.
Task 0 runs 2.50 seconds.
All subprocesses done.
這就是進程池的使用通過apply_async添加子線程,還提供控制線程池的各種方法。
子進程
上面我們介紹了父進程可以
fork()出多個子進程,multiprocessing模塊中通過Process()
生成子進程,還有Pool中apply_async()批量創(chuàng)建子進程,這幾種模式都是子進程對自身的操作,但是很多時候子進程需要執(zhí)行其他程序或命令,還需要控制子進程的輸入輸出。這樣的子進程我們可以通過subprocess來創(chuàng)建并進程輸入輸出操作。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import os
print('Run current process (%s)...' % ( os.getpid()))
print('nslookup www.python.ory')
# 轉(zhuǎn) utf-8 編碼
os.system('chcp 65001')
r = subprocess.call(['nslookup','www.python.org'])
print('Exit code',r)
print('----------------------------------------------')
print('$ nslookup')
print('Run current process (%s)...' % ( os.getpid()))
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('subprocess.Popen is',p.pid)
print('Exit code:', p.returncode)
輸出結果:
Run current process (8172)...
nslookup www.python.ory
Active code page: 65001
Non-authoritative answer:
Server: public1.114dns.com
Address: 114.114.114.114
Name: python.map.fastly.net
Addresses: 2a04:4e42:36::223
151.101.228.223
Aliases: www.python.org
Exit code 0
----------------------------------------------
$ nslookup
Run current process (8172)...
Default Server: public1.114dns.com
Address: 114.114.114.114
> > Server: public1.114dns.com
Address: 114.114.114.114
python.org MX preference = 50, mail exchanger = mail.python.org
python.org nameserver = ns3.p11.dynect.net
python.org nameserver = ns2.p11.dynect.net
python.org nameserver = ns4.p11.dynect.net
python.org nameserver = ns1.p11.dynect.net
mail.python.org internet address = 188.166.95.178
mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1
ns1.p11.dynect.net internet address = 208.78.70.11
ns2.p11.dynect.net internet address = 204.13.250.11
ns3.p11.dynect.net internet address = 208.78.71.11
ns4.p11.dynect.net internet address = 204.13.251.11
>
subprocess.Popen is 15896
Exit code: 0
-
subprocess.call()創(chuàng)建子進程執(zhí)行程序,然后等待子進程完成。call()返回子進程的 退出狀態(tài) 即child.returncode屬性; -
subprocess.Popen創(chuàng)建并返回一個子進程,并在這個進程中執(zhí)行指定的程序。并且Popen 對象提供了很多與子進程交互的方法,如:p.communicate(input=None)和子進程 p 交流,將參數(shù) input (字符串)中的數(shù)據(jù)發(fā)送到子進程的 stdin,同時從子進程的 stdout 和 stderr 讀取數(shù)據(jù),直到EOF。返回值為二元組 (stdoutdata, stderrdata) 分別表示從標準出和標準錯誤中讀出的數(shù)據(jù)。注意,該方法一旦調(diào)用立即阻塞父進程,直到子進程結束! - 關于
subprocess.Popen更多的使用方法,可以自己去了解。
進程間通信
Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數(shù)據(jù)。
做一個Queue通信的示例:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
import os, time, random
# 寫數(shù)據(jù)進程執(zhí)行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進程執(zhí)行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進程創(chuàng)建Queue,并傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 啟動子進程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
# pr進程里是死循環(huán),無法等待其結束,只能強行終止:
pr.terminate()
輸出結果:
Process to read: 13760
Process to write: 9296
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
上面完成了一個讀/寫 操作,將數(shù)據(jù)存儲在父進程創(chuàng)建的Queue隊列中,兩個子進程進行寫入/讀取操作。
進程間的通信方法很多,這這里不深入學習。
多線程
線程是操作系統(tǒng)直接支持的執(zhí)行單元,因此,高級語言通常都內(nèi)置多線程的支持,Python也不例外,并且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個高級模塊。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time, threading
# 新線程執(zhí)行的代碼:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
輸出結果:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
通過threading.Thread()就可以創(chuàng)建一個新的線程,執(zhí)行對應的方法。
Lock
在多進程中,同一個變量,各自有一份拷貝存在于每個進程中,互不影響。而在多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,這就可能導致執(zhí)行的結果與預期不符,所以在處理多線程的問題中,出現(xiàn)了一個線程鎖。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time, threading
# 假定這是你的銀行存款:
balance = 0
def change_it(n):
# 先存后取,結果應該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
以上是兩個線程,同時操作同一個函數(shù),邏輯上輸出的結果應該是0,但是多次運行會有不同的結果。因為高級語言的一條語句在CPU執(zhí)行時是若干條語句,所以多個線程同時使用某個變量時,會發(fā)生錯位的現(xiàn)象。Python中通過threading.Lock()來實現(xiàn)。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time, threading
# 假定這是你的銀行存款:
balance = 0
lock = threading.Lock()
def change_it(n):
# 先存后取,結果應該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要釋放鎖:
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
輸出結果:
0
這次無論執(zhí)行多少次,結果都是0。多個線程同時執(zhí)行l(wèi)ock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續(xù)執(zhí)行代碼,其他線程就繼續(xù)等待直到獲得鎖為止。在執(zhí)行完成一次后一定要釋放鎖lock.release(),我們用```try...finally...`確保鎖被釋放,不然會造成死鎖
- 好處:確保了某段關鍵代碼只能由一個線程從頭到尾完整地執(zhí)行。
- 壞處:1. 先是阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實際上只能以單線程模式執(zhí)行,效率就大大地下降了。 2. 由于可以存在多個鎖,不同的線程持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執(zhí)行,也無法結束,只能靠操作系統(tǒng)強制終止。
多核CPU
Python解釋器由于設計時有GIL全局鎖,導致了多線程無法利用多核。Python的線程雖然是真正的線程,但解釋器執(zhí)行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執(zhí)行。這個GIL全局鎖實際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個線程跑在100核CPU上,也只能用到1個核。Python雖然不能利用多線程實現(xiàn)多核任務,但可以通過多進程實現(xiàn)多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。
ThreadLocal
ThreadLoacal 可以是一個全局變量,但是每個線程都只能讀寫自己線程的獨立副本,ThreadLocal解決了參數(shù)在一個線程中各個函數(shù)之間互相傳遞的問題,而不用考慮管理鎖的問題。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
# 創(chuàng)建全局ThreadLocal對象:
local_school = threading.local()
def process_student():
# 獲取當前線程關聯(lián)的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('張三',), name='Thread-1')
t2 = threading.Thread(target= process_thread, args=('李四',), name='Thread-2')
t1.start()
t2.start()
t1.join()
t2.join()
輸出結果:
Hello, 張三 (in Thread-1)
Hello, 李四 (in Thread-2)
上面實現(xiàn)了student變成一個 local_school對象的屬性,每個Thread都可以讀取student屬性,每個線程讀取的都是該線程的局部變量,不會造成錯亂,也無需管理鎖的問題。ThreadLocal最常用的地方就是為每個線程綁定一個數(shù)據(jù)庫連接,HTTP請求,用戶身份信息等配置信息,這樣一個線程的所有調(diào)用到的處理函數(shù)都可以非常方便地訪問這些資源。
進程 VS 線程
首先要實現(xiàn)多任務的執(zhí)行,應該采用Master-Worker模式,Master負責分配任務,Worker負責執(zhí)行任務:
- 多進程實現(xiàn)Master-Worker,主進程就是Master,其他進程就是Worker。
- 多進程模式最大的優(yōu)點就是穩(wěn)定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。
- 多進程模式的缺點是創(chuàng)建進程的代價大,在Unix/Linux系統(tǒng)下,用fork調(diào)用還行,在Windows下創(chuàng)建進程開銷巨大。另外,操作系統(tǒng)能同時運行的進程數(shù)也是有限的,在內(nèi)存和CPU的限制下,如果有幾千個進程同時運行,操作系統(tǒng)連調(diào)度都會成問題。
- 多線程實現(xiàn)Master-Worker,主線程就是Master,其他線程就是Worker。
- 多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內(nèi)存。
所以為了緩解這個問題,IIS和Apache現(xiàn)在又有多進程+多線程的混合模式,但這種模式的復雜度更大。
進程/線程切換
無論是多進程還是多線程,只要數(shù)量一多,效率肯定上不去 ,因為在進程/線程切換過程中,要進行保護現(xiàn)場、準備新的環(huán)境會耗費很多資源、時間。在任務達到一定的限度,就會消耗掉系統(tǒng)所有的資源,結果效率急劇下降,所有任務都做不好。
計算密集型 vs IO密集型
在考慮多任務時,要考慮任務的類型:
- 計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這樣的任務應該開啟與CPU核心數(shù)相同的任務數(shù)量,來保證最大效率的執(zhí)行計算,另外Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。應該使用 C 語言等接近匯編語言來編寫。
- IO密集型 ,主要涉及到網(wǎng)絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低于CPU和內(nèi)存的速度),對于IO密集型任務,最合適的語言就是開發(fā)效率最高(代碼量最少)的語言。
異步IO
現(xiàn)代操作系統(tǒng)支持異步IO,單進程單線程模型來執(zhí)行多任務,這種模型稱為事件驅(qū)動模型。Nginx就是支持異步IO的Web服務器。對于Python語言,單線程的異步編程模型稱為協(xié)程,有了協(xié)程的支持,就可以基于事件驅(qū)動編寫高效的多任務程序。
分布式進程
分布式進程只做了解,因為進程是支持分布到多臺機器上,而線程是不能的。在Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調(diào)度者,將任務分布到其他多個進程中,依靠網(wǎng)絡通信。Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多臺機器的環(huán)境下。
參考
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319272686365ec7ceaeca33428c914edf8f70cca383000
http://www.cnblogs.com/Security-Darren/p/4733368.html