摘要:Python,多線程,線程同步,線程池,GIL
線程概述
當(dāng)一個進(jìn)程里面只有一個線程時,叫做單線程,超過一個線程就叫做多線程,在多線程中會有一個主線程來完成整個進(jìn)程從開始到結(jié)束的全部操作,而其他的線程會在主線程的運(yùn)行過程中被創(chuàng)建或退出。
線程的創(chuàng)建和原理
(1)線程的模塊
Python的線程模塊主要是threading模塊
(2)主線程的產(chǎn)生
一個Python程序就是一個進(jìn)程,每個進(jìn)程會默認(rèn)啟動一個線程,即主線程,可以通過threading模塊中的current_thread函數(shù)查看
import threading
print(threading.current_thread())
<_MainThread(MainThread, started 140486979041088)>
current_thread返回的是當(dāng)前線程的信息,默認(rèn)是進(jìn)程下的主線程,結(jié)果尖括號的第一個顯示他是主線程_MainThread,圓括號中的MainThread是線程名稱,started后面的是線程號,在操作系統(tǒng)中每一個線程都會有一個ID號,用為唯一標(biāo)識。
threading模塊中還有兩個常用的函數(shù),分別是
- threading.enumerate:返回
正在運(yùn)行的線程list,正在運(yùn)行指的是線程處于啟動后結(jié)束前的狀態(tài) - threading.activeCount:返回
正在運(yùn)行的線程數(shù)量,相當(dāng)于len(threading.enumerate())
print(threading.enumerate())
print(threading.active_count())
在Python console中存在4個線程,分別打印出線程列表和線程數(shù)如下
[<_MainThread(MainThread, started 140486979041088)>, <Thread(Thread-2, started daemon 140486915045120)>, <HistorySavingThread(IPythonHistorySavingThread, started 140486836745984)>, <Thread(Thread-3, started daemon 140486828353280)>]
4
Python中所有進(jìn)程的主線程,名稱都是一樣的叫做MainThread,而子線程的名字需要在創(chuàng)建時指定,如果不指定Python會默認(rèn)起名字。
(3)創(chuàng)建子線程
創(chuàng)建子線程有兩種方法,都是通過threading.Thread類來實(shí)現(xiàn)
- 直接對類threading.Thread進(jìn)行實(shí)例化,實(shí)例化的時候指定執(zhí)行的函數(shù)和入?yún)ⅲ缓笳{(diào)用實(shí)例化的線程對象的start方法創(chuàng)建線程
- 用threading.Thread派生出一個新的子類,并且實(shí)例化該子類,重載run方法,調(diào)用start方法創(chuàng)建線程
先來看第一種方法直接實(shí)例化
import threading
def handle(sid):
print("Thread {} run, info: {}".format(sid, threading.current_thread()))
for i in range(10):
t = threading.Thread(target=handle, args=(i, ))
t.start() # 這個地方加t.join()是一樣的,默認(rèn)不守護(hù)進(jìn)程,則主線程會等待子線程執(zhí)行完畢再關(guān)閉
print(threading.current_thread())
執(zhí)行效果如下,此時線程類型變?yōu)?code>Thread,并且分別以數(shù)字ID命名,和主線程MainThread不一樣,在一個新的線程是執(zhí)行handle函數(shù),此時函數(shù)內(nèi)部的threading.current_thread()返回的就是當(dāng)前線程的信息
Thread 0 run, info: <Thread(Thread-1, started 140591162775296)>
Thread 1 run, info: <Thread(Thread-2, started 140591154382592)>
Thread 2 run, info: <Thread(Thread-3, started 140591145989888)>
Thread 3 run, info: <Thread(Thread-4, started 140591145989888)>
Thread 4 run, info: <Thread(Thread-5, started 140591145989888)>
Thread 5 run, info: <Thread(Thread-6, started 140591145989888)>
Thread 6 run, info: <Thread(Thread-7, started 140591145989888)>
Thread 7 run, info: <Thread(Thread-8, started 140591145989888)>
Thread 8 run, info: <Thread(Thread-9, started 140591145989888)>
Thread 9 run, info: <Thread(Thread-10, started 140591145989888)>
<_MainThread(MainThread, started 140591187232576)>
Process finished with exit code 0
可以改變線程的名稱,比如修改這一行代碼
t = threading.Thread(target=handle, name="a" + str(i), args=(i, ))
輸出如下
Thread 0 run, info: <Thread(a0, started 139623398315776)>
Thread 1 run, info: <Thread(a1, started 139623389923072)>
...
再看一下使用線程類,新建一個類對象繼承threading.Thread,然后重寫run方法,在調(diào)用start的時候線程對象會調(diào)用run方法
import threading
def handle(sid):
print("Thread {} run, info: {}".format(sid, threading.current_thread()))
class MyClass(threading.Thread):
def __init__(self, sid):
threading.Thread.__init__(self) # 重寫__init__方法,等同于super().__init__()
self.sid = sid
def run(self): # 在子類中如果方法與父類相同,父類的方法被覆蓋失效
handle(self.sid)
for i in range(10):
t = MyClass(i)
t.start()
查看原類的run方法,可見如果不重寫run函數(shù),默認(rèn)會執(zhí)行傳入的target參數(shù)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
返回輸出如下
Thread 0 run, info: <MyClass(Thread-1, started 139963900937984)>
Thread 1 run, info: <MyClass(Thread-2, started 139963892545280)>
Thread 2 run, info: <MyClass(Thread-3, started 139963892545280)>
Thread 3 run, info: <MyClass(Thread-4, started 139963892545280)>
Thread 4 run, info: <MyClass(Thread-5, started 139963892545280)>
Thread 5 run, info: <MyClass(Thread-6, started 139963892545280)>
Thread 6 run, info: <MyClass(Thread-7, started 139963892545280)>
Thread 7 run, info: <MyClass(Thread-8, started 139963892545280)>
Thread 8 run, info: <MyClass(Thread-9, started 139963892545280)>
Thread 9 run, info: <MyClass(Thread-10, started 139963892545280)>
Process finished with exit code 0
除此之外在實(shí)例化類對象的時候還有一個參數(shù)daemon,默認(rèn)是False,每個線程的守護(hù)進(jìn)程參數(shù)和主線程一致,默認(rèn)是False就是說進(jìn)程退出時必須等待這個線程也退出,看一下源碼
if daemon is not None:
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon # False
總結(jié)一下線程的創(chuàng)建,目的就是創(chuàng)建線程并且將執(zhí)行函數(shù)綁定到線程上,有兩種方法
- 在實(shí)例化時為target賦值參數(shù)
- 繼承threading.Thread類,重寫run方法,并在run中指定執(zhí)行函數(shù)
(4)threadingThread類的方法
- run:線程活動的方法
- start:啟動線程活動
- join:該方法有一個可選參數(shù)timeout,主線程一直處于阻塞狀態(tài),除非當(dāng)前調(diào)用join的線程執(zhí)行完畢,或者達(dá)到超時時間,主線程執(zhí)行完自己的任務(wù)以后,就退出了,主線程一旦執(zhí)行結(jié)束,則全部線程全部被終止執(zhí)行
(5)線程內(nèi)部狀態(tài)及原理
線程狀態(tài)分為5種:創(chuàng)建,就緒,運(yùn)行,阻塞,退出,過程如下
- 創(chuàng)建:在完成threading.Thread實(shí)例化之后,就完成了線程的創(chuàng)建
- 就緒:調(diào)用start函數(shù),線程就進(jìn)入就緒狀態(tài),等待CPU分配時間片
- 運(yùn)行:當(dāng)線程被分配到時間片,線程就進(jìn)入運(yùn)行狀態(tài),執(zhí)行run函數(shù)
- 阻塞:在執(zhí)行run函數(shù)期間,線程可以被打斷,進(jìn)入阻塞狀態(tài),阻塞狀態(tài)結(jié)束又回到就緒狀態(tài),接著運(yùn)行
- 退出:線程運(yùn)行結(jié)束進(jìn)入退出狀態(tài)
互斥鎖
多線程的優(yōu)勢在于并發(fā),即可以同時運(yùn)行多個任務(wù),但是當(dāng)多線程需要共享數(shù)據(jù)時,也會帶來數(shù)據(jù)不同步的問題,互斥鎖就是解決數(shù)據(jù)不同步的問題。
(1)多線程的問題
以一個例子來看,所有線程共享一個全局變量,并且在執(zhí)行函數(shù)之后修改這個全局變量,但是執(zhí)行時間不同
import time
import threading
a = 1
def handle(sid):
global a
a = a * 2
time.sleep(sid % 2)
print(sid, a)
class MyClass(threading.Thread):
def __init__(self, sid):
super().__init__()
self.sid = sid
def run(self):
handle(self.sid)
threads = []
for i in range(10):
t = MyClass(i)
t.start()
for t in threads:
t.join() # 主線程等待所有其他子線程執(zhí)行完畢
輸出結(jié)果如下,可見單數(shù)的id由于需要等待1s導(dǎo)致在sleep的時候其他線程還在更改共享數(shù)據(jù),1,3,5,7四個線程輸出的值都市9號線程的結(jié)果
0 2
2 8
4 32
6 128
8 512
1 1024
3 1024
7 1024
5 1024
9 1024
上述代碼使用了threads列表join使得主線程必須等待子線程全部執(zhí)行完畢再退出,如果在每一個start后面直接執(zhí)行join,輸出結(jié)果完全不一樣
import time
import threading
a = 1
def handle(sid):
global a
a = a * 2
time.sleep(sid % 2)
print(sid, a)
class MyClass(threading.Thread):
def __init__(self, sid):
super().__init__()
self.sid = sid
def run(self):
handle(self.sid)
for i in range(10):
t = MyClass(i)
t.start()
t.join()
原因是如果在每個線程start后直接join,則主線程被每個子線程的start后阻塞,無法啟動循環(huán)列表后面的子線程,相當(dāng)于單線程
0 2
1 4
2 8
3 16
4 32
5 64
6 128
7 256
8 512
9 1024
(2)互斥所
鎖的出現(xiàn)就是解決多線程之間的同步問題,其核心在于將執(zhí)行程序中的某段代碼保護(hù)起來(相當(dāng)于鎖起來),被鎖起來的代碼一次只能允許一個線程執(zhí)行。在Python中使用threading.RLock類來創(chuàng)建鎖,他有兩個方法acquire,release
- acquire:獲得鎖,acquire之后的代碼只允許一個線程執(zhí)行
- release:釋放鎖,release之后的代碼又可以多線程交叉執(zhí)行
import time
import threading
lock = threading.RLock()
a = 1
def handle(sid):
lock.acquire()
global a
a = a * 2
time.sleep(sid % 2)
print(sid, a)
lock.release()
class MyClass(threading.Thread):
def __init__(self, sid):
super().__init__()
self.sid = sid
def run(self):
handle(self.sid)
threads = []
for i in range(10):
t = MyClass(i)
t.start()
for t in threads:
t.join()
輸出結(jié)果如下,此時線程序號和乘數(shù)的值順序?qū)?yīng)上了
0 2
1 4
2 8
3 16
4 32
5 64
6 128
7 256
8 512
9 1024
鎖的注意事項(xiàng):
- 鎖的作用是將多線程變回單線程,犧牲性能換取準(zhǔn)確性
- 在代碼設(shè)計(jì)中應(yīng)盡量避免使用鎖,即使使用了鎖也要讓被鎖住的代碼區(qū)域盡可能小
- 有加鎖的操作一定要有解鎖的操作,否則代碼就是去多線程的優(yōu)勢
信號量
信號量(semaphore)是一種帶計(jì)數(shù)的線程同步機(jī)制,當(dāng)調(diào)用release時,增加計(jì)數(shù),當(dāng)acquire時,減少計(jì)數(shù),當(dāng)計(jì)數(shù)為0時,自動阻塞,等待release被調(diào)用,可以實(shí)現(xiàn)并發(fā)限制,分為純粹的信號量(Semaphore)和帶有不按揭的信號量(BoundedSemaphore),區(qū)別如下
- Semaphore:在調(diào)用release函數(shù)時,單純將計(jì)數(shù)器+1,不會檢查+1之后計(jì)數(shù)器是否超過上限
- BoundedSemaphore:在調(diào)用release函數(shù)時,會檢查計(jì)數(shù)器+1后是否超過上限,對計(jì)數(shù)器的上限制進(jìn)行校驗(yàn),是更加安全的機(jī)制
在創(chuàng)建信號量時需要指定信號量的個數(shù),沒調(diào)用一個acquire時信號量減少一個,當(dāng)信號量為0時就是說同一個時間線程數(shù)量超過信號量個數(shù)時,主線程阻塞等待信號量被釋放恢復(fù),直接看代碼
import threading
import datetime
import time
semaphore = threading.Semaphore(3)
def foo():
semaphore.acquire()
time.sleep(5)
print("當(dāng)前時間:", datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S"))
semaphore.release()
class MyClass(threading.Thread):
def __init__(self):
super(MyClass, self).__init__()
def run(self):
foo()
threads = []
for i in range(10):
t = MyClass()
t.start()
threads.append(t)
for t in threads:
t.join()
上述代碼設(shè)置信號量為3,開啟10個線程,執(zhí)行打印時間的函數(shù),10個線程超出3個信號量限制,因此每當(dāng)線程數(shù)量達(dá)到3個主線程阻塞,在循環(huán)內(nèi)部卡住使得下面的子線程無法啟動,最后的記過是沒3個線程一批打印輸出時間,一批和一批時間之間間隔5秒
當(dāng)前時間: 2021-05-14 11:18:52
當(dāng)前時間: 2021-05-14 11:18:52
當(dāng)前時間: 2021-05-14 11:18:52
當(dāng)前時間: 2021-05-14 11:18:57
當(dāng)前時間: 2021-05-14 11:18:57
當(dāng)前時間: 2021-05-14 11:18:57
當(dāng)前時間: 2021-05-14 11:19:02
當(dāng)前時間: 2021-05-14 11:19:02
當(dāng)前時間: 2021-05-14 11:19:02
當(dāng)前時間: 2021-05-14 11:19:07
再看BoundedSemaphore,如果調(diào)用多次release超出信號量上限就會報(bào)錯,但是Semaphore不會報(bào)錯,修改代碼如下
semaphore = threading.BoundedSemaphore(3)
def foo():
semaphore.acquire()
time.sleep(5)
print("當(dāng)前時間:", datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S"))
semaphore.release()
semaphore.release()
輸出報(bào)錯如下,顯示信號釋放太多次
ValueError: Semaphore released too many times
使用線程池提升運(yùn)行效率
線程池是一種多線程處理方式,是在正常的多線程處理方式上的一種優(yōu)化
- 正常線程使用方式是:創(chuàng)建,啟動,結(jié)束,銷毀
- 線程池吃力方式是:在程序啟動時就創(chuàng)建好若干個線程,并保存到內(nèi)存中,當(dāng)線程啟動并執(zhí)行完成后并不做銷毀處理,而是等待下次再使用,需要用時過來取,用完了還回去
在需要頻繁創(chuàng)建線程的系統(tǒng)中,一般都會使用線程池技術(shù),原因是
- 每個線程的創(chuàng)建都需要占用系統(tǒng)資源,是一件相對耗時的事情,銷毀線程時還需要回收線程資源,線程池技術(shù)可以省去創(chuàng)建與回收過程中所浪費(fèi)的系統(tǒng)開銷
- 在某些系統(tǒng)中需要為每個子任務(wù)創(chuàng)建線程,容易導(dǎo)致線程數(shù)量失控,直到程序崩潰,線程池技術(shù)可以很好的固定線程的數(shù)量
(1)實(shí)現(xiàn)線程池
Python中使用concurrent.features模塊下的ThreadPoolExecutor來實(shí)現(xiàn)線程池,只需要傳入線程個數(shù)系統(tǒng)就能為該線程池初始化相應(yīng)個數(shù)的線程,線程的使用有兩種
- 搶占式:線程池中線程的執(zhí)行順序不固定,該方式使用ThreadPoolExecutor下的submit方法實(shí)現(xiàn)
- 非搶占式:線程將按照調(diào)用的順序執(zhí)行,此方式使用ThreadPoolExecutor的map方法來實(shí)現(xiàn)
從使用角度來看,搶占式更靈活,非搶占式更嚴(yán)格。
- 搶占式:允許線程池中線程執(zhí)行函數(shù)不一樣,并且某個線程異常不影響其他線程
- 非搶占式:要求線程池中的線程必須執(zhí)行同樣的處理函數(shù),并且一旦其中一個線程異常,其他線程也會停止
(2)單線程和多線程處理時間比較
先寫一個簡單的程序,使用單線程循環(huán)遍歷執(zhí)行一個函數(shù)
import time
person = ["Anna", "Gary", "All"]
def print_person(p):
print(p)
time.sleep(2)
t1 = time.time()
for p in person:
print_person(p)
t2 = time.time()
print("耗時:", t2 - t1)
輸出如下,耗時6s
Anna
Gary
All
耗時: 6.005347490310669
下一步實(shí)現(xiàn)搶占式線程池,使用with關(guān)鍵字創(chuàng)建線程池,將列表元素一個一個傳入執(zhí)行函數(shù),調(diào)用實(shí)例化對象的submit方法將線程啟動,代碼如下
import time
from concurrent.futures import ThreadPoolExecutor
person = ["Anna", "Gary", "All"]
def print_person(p):
print(p)
time.sleep(2)
t1 = time.time()
with ThreadPoolExecutor(3) as executor: # 使用with上下文
for p in person:
executor.submit(print_person, p)
t2 = time.time()
print("耗時:", t2 - t1)
輸出如下,可見多線程的并發(fā)縮短了程序的運(yùn)行時間
Anna
Gary
All
耗時: 2.002558708190918
進(jìn)一步實(shí)現(xiàn)非搶占式線程池,也是使用with關(guān)鍵字,他是使用實(shí)例化線程池的map方法啟動線程,并且傳入函數(shù)參數(shù)時直接傳入列表
t1 = time.time()
with ThreadPoolExecutor(3) as executor:
executor.map(print_person, person)
t2 = time.time()
print("耗時:", t2 - t1)
輸出如下,也是2s,和搶占式效率差不多
Anna
Gary
All
耗時: 2.0014290809631348
一個業(yè)務(wù)案例,使用多線程讀取es數(shù)據(jù),通過prov分組多任務(wù)并發(fā)進(jìn)行
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from elasticsearch import Elasticsearch
LOGGRR = logging.getLogger("thread_test")
LOGGRR.setLevel(logging.INFO)
streamhandler = logging.StreamHandler()
streamhandler.setLevel(logging.INFO)
streamhandler.setFormatter(logging.Formatter('%(asctime)s [%(module)s] %(levelname)s [%(lineno)d] %(message)s', '%Y-%m-%d %H:%M:%S'))
LOGGRR.addHandler(streamhandler)
es = Elasticsearch("xxx.xx.x.xxx:xxxx")
def get_prov_list():
body = {
"size": 0,
"aggs": {
"name": {
"terms": {
"field": "prov",
"size": 100
}
}
}
}
query = es.search(index="index_name", body=body)
return [x["key"] for x in query['aggregations']['name']['buckets']]
def get_prov_info(prov):
body = {
"query": {
"bool": {
"must": [
{"term": {
"prov": prov
}},
{"term": {
"data_status": "Y"
}},
]
}
},
"_source": ["user_name", "filed"]
}
res = []
cnt = 0
query = es.search(index="index_name", body=body, scroll="5m", size=3000)
total = query['hits']['total']
scroll_id = query["_scroll_id"]
scroll_res = query['hits']['hits']
res.extend(scroll_res)
cnt += len(scroll_res)
LOGGRR.info("-------------{}: {} / {}".format(prov, cnt, total))
for i in range(0, int(total / 1000) + 1):
scroll_res = es.scroll(scroll_id=scroll_id, scroll="5m")['hits']['hits']
res.extend(scroll_res)
cnt += len(scroll_res)
LOGGRR.info("-------------{}: {} / {}".format(prov, cnt, total))
return res
if __name__ == '__main__':
prov_list = get_prov_list()
res = []
with ThreadPoolExecutor() as pool:
futures = [pool.submit(get_prov_info, prov) for prov in prov_list]
pool.shutdown(wait=True)
for fut in futures:
print(len(fut.result()))
res.extend(fut.result())
print(len(res))
多線程讀全量es 44秒,單線程230秒。
多線程和GIL
(1)知識準(zhǔn)備
GIL 是Python的全局解釋器鎖,同一進(jìn)程中假如有多個線程運(yùn)行,一個線程在運(yùn)行Python程序的時候會霸占Python解釋器,使該進(jìn)程內(nèi)的其他線程無法運(yùn)行。在GIL中,全局鎖并不是一直鎖定的,比如當(dāng)線程遇到IO等待或ticks計(jì)數(shù)(Python3改為計(jì)時器,執(zhí)行時間達(dá)到閾值后,當(dāng)前線程釋放GIL)達(dá)到100,cpu會做切換,把cpu的時間片讓給其他線程執(zhí)行,此時GIL釋放,釋放時候所有線程繼續(xù)進(jìn)行鎖競爭,Python里一個進(jìn)程永遠(yuǎn)只能同時執(zhí)行一個線程

- 在計(jì)算密集型操作時,GIL的存在導(dǎo)致多線程無法很好的立即多核CPU的并發(fā)處理能力,原因是這種情況下ticks計(jì)數(shù)很快就會達(dá)到閾值,然后觸發(fā)GIL的釋放與再競爭(多個線程來回切換當(dāng)然是需要消耗資源的),所以Python下的多線程對CPU密集型代碼并不友好
- 在IO密集操作線程中,比如在網(wǎng)絡(luò)通信,網(wǎng)絡(luò)爬蟲,time.sleep()延時的時候,將釋放GIL,達(dá)到并發(fā)能力,原因是單線程下有IO操作會進(jìn)行IO等待,造成不必要的時間浪費(fèi),而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費(fèi)CPU的資源,從而能提升程序執(zhí)行效率
- 多進(jìn)程能夠充分利用CPU達(dá)到并行,原因是每個進(jìn)程有各自獨(dú)立的GIL,互不干擾,這樣就可以真正意義上的并行執(zhí)行,所以在Python中,多進(jìn)程的執(zhí)行效率優(yōu)于多線程
(2)IO密集型和CPU密集型多線程測試
以爬取網(wǎng)頁進(jìn)行解析進(jìn)行測試,線程池最大線程數(shù)8個,爬去50次,總共需要1.4秒
import re
import time
import requests
from concurrent.futures import ThreadPoolExecutor
headers = {
...
}
def handle(sid):
response = requests.get("https://movie.douban.com/top250", headers=headers)
res = re.findall(r'alt="(.*?)"', response.text, re.S)
print(str(sid) + ",".join(res))
sid_list = list(range(50))
t1 = time.time()
with ThreadPoolExecutor(8) as executor:
executor.map(handle, sid_list)
t2 = time.time()
print("耗時:", t2 - t1) # 1.496328353881836
單線程模式測試如下需要40s,可見多線程應(yīng)對IO密集型可以實(shí)現(xiàn)并發(fā)提高效率
t1 = time.time()
for i in range(50):
handle(i)
t2 = time.time()
print("耗時:", t2 - t1) # 40s
再測試一下CPU密集型,此處以geopy計(jì)算球面距離為例,這個計(jì)算包含多個三角函數(shù)的計(jì)算,耗時1.5s
import time
from concurrent.futures import ThreadPoolExecutor
from geopy.distance import great_circle
def handle(sid):
for i in range(1000):
res = great_circle((41.49008, -71.312796), (41.499498, -81.695391)).meters
print(str(sid) + str(res))
sid_list = list(range(100))
t1 = time.time()
with ThreadPoolExecutor(8) as executor:
executor.map(handle, sid_list)
t2 = time.time()
print("耗時:", t2 - t1) # 1.5076820850372314
再測一下單線程,竟然比多線程耗時低,可見在CPU密集型中線程頻繁切換反而多線程效率更低
t1 = time.time()
for i in range(100):
handle(i)
t2 = time.time()
print("耗時:", t2 - t1) # 1.1195