Python43_多任務(wù)之線程

python的thread模塊是比較底層的模塊(可能不同的操作系統(tǒng)不一樣),python的threading模塊是對thread做了一些包裝的,可以更加方便的被使用(跨平臺)

ps:看程序代碼的時候,切忌從上往下看,而是主要看程序的框架(比如C語言中,主要看main函數(shù))

Thread對象基礎(chǔ)

threading模塊

threading模塊中的對象列表

  1. Thread:表示一執(zhí)行線程的對象
  2. Lock:鎖原語對象(和thread模塊中的鎖一樣)
  3. RLock:可重入鎖對象(可讀不可寫)
  4. Condition:條件變量對象,使得一個線程等待另一個線程滿足特等的“條件”,比如改變狀態(tài)或某個數(shù)據(jù)值
  5. Event:條件變量的通用版本,任意數(shù)量的線程等待某個事件的發(fā)生,在該事件發(fā)生后所有線程將被激活
  6. Semaphore:為線程間共享的有限資源提供一個“計數(shù)器”,如果沒有可用資源將會被阻塞
  7. BoundeSemaphore:與Semaphore相似,不過它不允許超過初始值
  8. Timer:與Thread相似,不過它要在運行前等待一段時間
  9. Barrier:創(chuàng)建一個“障礙”,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù)

ps:我們通過Python實現(xiàn)過線程編程,豬油用到的是threading.Thread對象

Thread對象常用的方法和屬性

數(shù)據(jù)屬性:

  1. name:線程名

  2. ident:線程的標(biāo)識符

  3. daemon:布爾標(biāo)志,表示這個線程是否是守護(hù)線程

    • 用法:線程對象.setDaemon(True)
    • 如果把子線程設(shè)置為守護(hù)線程,表示該線程不重要,主線程結(jié)束,子線程結(jié)束

對象方法:

  1. __init__(group=None, target=None, name=None, args=(), kwargs={}, verbose=None,daemon=None):實例化一個線程對象,需要有一個可調(diào)用的target(函數(shù)),以及其參數(shù)args或kwargs。還可以傳遞name或group參數(shù),不過后者還未實現(xiàn)。此外,verbose標(biāo)志也是可接受的,而daemon的值將會設(shè)定thread.daemon標(biāo)志/屬性
  2. start():開始執(zhí)行該線程
  3. run():定義線程功能的方法,通常在子類中被開發(fā)者重寫
  4. join(timeout=None):直至啟動的線程終止之前一直:除非給出了timeout(秒),否則會一直阻塞,相當(dāng)于在此處等待調(diào)用者線程完成

線程詳解

多線程執(zhí)行

法一:函數(shù)方式

import threading    #threading模塊與Process很類似
import time

def saySorry():
    print("親愛的,我錯了,我能吃飯了嗎?")
    time.sleep(1)   #睡一秒

if __name__ == "__main__":
    for _ in range(10):
        t = threading.Thread(target=saySorry)   #創(chuàng)建一個線程對象t
        t.start()
 #會發(fā)現(xiàn)其執(zhí)行時間一共也就在1s左右(如果不用線程,應(yīng)該在10s作用)
 #主線程要等待所有子線程結(jié)束后才能結(jié)束

進(jìn)程、程序與線程:

  • 程序是死的,是代碼的集合;

  • 程序的運行被稱之為進(jìn)程——是擁有資源的最小單位;

  • 線程是程序調(diào)度的最小單位

ps:如果多個線程執(zhí)行的都是同一個函數(shù)的話,各自之間不會有影響,各是個的

方式二:類的方式

import threading,time

class MyThread(threading.Thread):
    #與Process相似,重寫run方法
    def run(self):
        for i in range(10):
            time.sleep(1)
            msg = "I'm {name} @ {count}".format(name = self.name, count = i)
            print(msg)

if __name__ == "__main__":
    t = MyThread(name = "test_thread")  #創(chuàng)建線程并為線程指定名稱。當(dāng)函數(shù)結(jié)束的時候,t這個線程就結(jié)束了
    t.start()   #線程開始執(zhí)行
    print("當(dāng)前線程:" + str(threading.enumerate())) #enumerate()能夠獲得當(dāng)前時刻 程序中的所有線程(包括自己)
    # ps:enumerate回憶:
    # names = ["aa","bb","cc"]
    # for temp in enumerate(names):
        # print(temp)  # 會輸出序號幾列表元素所組成的元組
    #對于線程,主線程一般也要等待子線程(為了收回子線程占有的一點點資源)

線程的執(zhí)行順序

線程的執(zhí)行順序不確定,與進(jìn)程一樣,取決于操作系統(tǒng)的調(diào)度算法

多線程對全局變量的共享

from threading import Thread
import time

g_num = 100

def work1():
    global  g_num   #注意:全局變量只要沒有改指向,則在函數(shù)里面就不需要加global,如果可能改指向,就需要加global
    for _ in range(3):
        g_num += 1
    print("In work1, g_num is {}".format(g_num))

def work2():
    global g_num
    print("In work2, g_num is {}".format(g_num))

print("線程創(chuàng)建之前,g_num is {}".format(g_num))

t1 = Thread(target=work1)
t1.start()

time.sleep(1)   #睡眠1s,保證t1執(zhí)行完畢

t2 = Thread(target=work2)
t2.start()

'''執(zhí)行結(jié)果如下:
線程創(chuàng)建之前,g_num is 100
In work1, g_num is 103
In work2, g_num is 103
'''

對于進(jìn)程,全局變量不共享,而對于多線程,全局變量是可以共享的,因為進(jìn)程是擁有資源的最小單位,而線程是共享其所屬進(jìn)程的資源,線程自己字擁有執(zhí)行所必不可少的一點點資源。也因此,線程之間的通信比進(jìn)程之間的通信方便

但是線程對全局變量的共享也會出現(xiàn)問題,如下:

from threading import Thread
import time

g_num = 0

def work1():
    global  g_num
    for _ in range(1000000):
        g_num += 1
    print("g_num is {}".format(g_num))

print("線程創(chuàng)建之前,g_num is {}".format(g_num))

t1 = Thread(target=work1)
t1.start()

# time.sleep(1)   #睡眠1s,保證t1執(zhí)行完畢

t2 = Thread(target=work1)
t2.start()
'''本次運行結(jié)果如下:(進(jìn)行了2000000次加法,但是結(jié)果并不是2000000)
線程創(chuàng)建之前,g_num is 0
g_num is 1153259
g_num is 1247721
'''

ps:對于變量,線程除了可以以全局變量的形式共享,還可以以參數(shù)的形式共享(在Thread ()中以args = ()的形式傳遞

對于以上代碼運行結(jié)果不是2000000的解析:
在線程執(zhí)行g(shù)_num += 1的時候,實際上是g_num = g_num + 1,先取值加1,然后賦值,一共兩步。而在多線程的時候,由于cpu的調(diào)度,一個線程中的這兩步可能會被打斷,所以運行結(jié)果不為2000000

原子性:一段代碼,要么不執(zhí)行,要么就直接執(zhí)行完,不允許被打斷

如何保證代碼執(zhí)行的原子性——互斥鎖

threading模塊中定義了Lock鎖,可以方便的處理鎖定:

import threading
mutex = threading.Lock()    #創(chuàng)建鎖
mutex.acquire([blockign])   #獲得鎖:鎖定
mutext.release()    #釋放鎖

互斥鎖的應(yīng)用:acquire/release

from threading import Thread,Lock
import time,threading
g_num = 0

def work1():
#   print("%s"%threading.current_thread().name) #此句可以輸出當(dāng)前線程的名稱
    global  g_num
    mutex.acquire() #對g_num操作前上鎖,如果一方獲得了鎖,另一方如果還要獲得鎖,就必須阻塞(一直等待),直到另一方釋放這個鎖
    for _ in range(1000000):
        g_num += 1
    print("g_num is {}".format(g_num))
    mutex.release() #對g_num的操作完畢后,釋放鎖,以讓其他人可以獲得鎖從而對g_num進(jìn)行操作
    #ps:把鎖的釋放放在for外面,相當(dāng)于把多線程硬生生弄成了單線程,如果只是要最后的結(jié)果是2000000,則可以把鎖的釋放法在for里面,緊跟g_num += 1。其實通常是能不加的代碼就不加(即加鎖的地方盡可能小)



print("線程創(chuàng)建之前,g_num is {}".format(g_num))
mutex = Lock()
t1 = Thread(target=work1)
t1.start()

# time.sleep(1)   #睡眠1s,保證t1執(zhí)行完畢

t2 = Thread(target=work1)
t2.start()

'''運行結(jié)果如下:
線程創(chuàng)建之前,g_num is 0
g_num is 1000000
g_num is 2000000
'''

`ps:等待解鎖的方式:通知,而不是輪詢

互斥鎖的應(yīng)用:with

import threading
import time
g_num = 0

lock = threading.Lock()

def work1(num):
    global g_num
    with lock:
        for i in range(num):
            time.sleep(0.01)
            g_num += 1
    print("work1, g_num is %d"%g_num)

def work2(num):
    global g_num
    try:
        lock.acquire()  #獲得對數(shù)據(jù)的封鎖
        for i in range(num):
            time.sleep(0.01)
            g_num += 1
    finally:
        lock.release()  #釋放對數(shù)據(jù)的封鎖。acquire和release與with語句效果相同
    print("work2, g_num is %d"%g_num)

print("---線程創(chuàng)建之前,g_num is %d---"%g_num)
t1 = threading.Thread(target=work1,args=(100,))
t1.start()
t2 = threading.Thread(target=work2,args=(100,))
t2.start()

while len(threading.enumerate())  != 1:
    time.sleep(2)
print("---線程操作之后,g_num is %d---"%g_num)

#加鎖之后輸出結(jié)果為:
# ---線程創(chuàng)建之前,g_num is 0---
# work1, g_num is 100
# work2, g_num is 200
# ---線程操作之后,g_num is 200---

對于互斥鎖,通常是對值進(jìn)行修改時才加鎖,不修改的話不用加鎖

多線程使用非共享變量(函數(shù)里面的變量)

即當(dāng)多個線程所用的代碼相同時,其中變量的情況如何

from threading import Thread
import threading,time

def test():
    name = threading.current_thread().name
    print("Thread name is :{}".format(name))
    num = 100
    if name == "Thread-1":
        num += 1
    else:
        time.sleep(2)
        
    print("Thread is {}, num is {}".format(name, num))

t1 = Thread(target=test)
t1.start()

t2 = Thread(target=test)
t2.start()
'''運行結(jié)果如下:
Thread name is :Thread-1
Thread is Thread-1, num is 101
Thread name is :Thread-2
Thread is Thread-2, num is 100
'''
#說明雖然兩個線程都是到同一個函數(shù)里面執(zhí)行,但是他們函數(shù)里面的數(shù)據(jù)“各人是各人的”,互不影響,所以不需要加鎖。而全局變量是公用的

線程間使用Queue通信

from queue import Queue
import queue
import threading
import time

q = Queue(maxsize=10)   #隊列的最大容量為10

def producer():
    for i in range(10):
        q.put(i)

def customer():
    for i in range(10):
        data = q.get()
        print(data, end=" ")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=customer)


t1.start()
t2.start()

#輸出結(jié)果如下:
# 0 1 2 3 4 5 6 7 8 9

# ps:stack = queue.LifoQueue()   #棧
# ps:隊列的其他屬性
# q.empty()
# q.full()
# q.maxsize
# q.qsize()

線程池

線程池中線程的創(chuàng)建、執(zhí)行、銷毀都由線程池自己執(zhí)行

線程池的基類是 concurrent.futures 模塊中d Executor,Executor 提供了兩個子類,即ThreadPoolExecutor 和ProcessPoolExecutor,其中ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, kwargs):將 fn 函數(shù)提交給線程池。args 代表傳給 fn 函數(shù)的參數(shù),kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。
  • map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會啟動多個線程,以異步方式立即對 iterables 執(zhí)行 map 處理。
  • shutdown(wait=True):關(guān)閉線程池。

在用完一個線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會將以前所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,該線程池中的所有線程都會死亡。

  • 使用線程池的步驟
  1. 調(diào)用 ThreadPoolExecutor 類的構(gòu)造器創(chuàng)建一個線程池。
  2. 定義一個普通函數(shù)作為線程任務(wù)。
  3. 調(diào)用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務(wù)。
  4. 當(dāng)不想提交任何任務(wù)時,調(diào)用 ThreadPoolExecutor 對象的 shutdown() 方法來關(guān)閉線程池。
import threading
import time

def add(n1, n2):
    v = n1 + n2
    time.sleep(n1)

    return v

import concurrent.futures as futures

ex = futures.ThreadPoolExecutor(max_workers = 3)    #創(chuàng)建線程池并設(shè)置線程池容量
#ps:futures的意義:結(jié)果要未來才能獲得
f1 = ex.submit(add, 2, 3)   #創(chuàng)建線程并提交到線程池(ps:提交后線程即開始執(zhí)行),返回值為一個Future對象
f2 = ex.submit(add, 2, 2)

print(f1.done())    #判斷線程是否執(zhí)行結(jié)束。輸出False,因為線程還沒有執(zhí)行完
print(f1.result())  #獲得線程的執(zhí)行結(jié)果。輸出5,(通常是程序執(zhí)行完才有返回值,故這里可以用于阻塞線程,但是也可以對result指定timeout參數(shù)。
ex.shutdown()

ps:關(guān)于Future的簡單理解:由于線程任務(wù)會在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當(dāng)于一個“將來完成”的任務(wù),所以 Python 使用 Future 來代表。

Future對象

Future 提供了如下方法:

  • cancel():取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會取消該任務(wù),并返回 True。
  • cancelled():返回 Future 代表的線程任務(wù)是否被成功取消。
  • running():如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。
  • done():如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。
  • exception(timeout=None):獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊一個“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時,程序會自動觸發(fā)該 fn 函數(shù)。

獲取執(zhí)行結(jié)果

  1. 用Future的result()方法:但是該方法會阻塞當(dāng)前主線程,只有等到當(dāng)前任務(wù)完成后,result()方法的阻塞才會被解除
  2. 通過Future的add_done_callback()方法來添加回調(diào)函數(shù),該回調(diào)函數(shù)形如fn(future)。當(dāng)線程任務(wù)完成后,程序會自動觸發(fā)該回調(diào)函數(shù),并將對應(yīng)的Future對象作為參數(shù)傳給該回調(diào)函數(shù)
from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定義一個準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + ' ' + str(i))
        my_sum += i
    return my_sum
# 創(chuàng)建一個包含2條線程的線程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向線程池提交一個task, 50會作為action()函數(shù)的參數(shù)
    future1 = pool.submit(action, 50)
    # 向線程池再提交一個task, 100會作為action()函數(shù)的參數(shù)
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 為future1添加線程完成的回調(diào)函數(shù)
    future1.add_done_callback(get_result)
    # 為future2添加線程完成的回調(diào)函數(shù)
    future2.add_done_callback(get_result)
    print('--------------')



多線程的應(yīng)用:下載圖片

import os
import random
import time
import requests
import concurrent.futures as futures


def download_img(url):
    resp = requests.get(url)    #獲得鏈接指向文件的內(nèi)容
    filename = os.path.split(url)[1]  # 把路徑分為目錄名和文件名,[1]為文件名
    with open(filename, "wb+") as f:
        f.write(resp.content)   #寫入文件
    num = random.randint(0,5)
    time.sleep(num)
    print(num)
    return filename


# 鏈接的獲得:網(wǎng)絡(luò)上隨便點擊一張圖片,右擊:查看圖片
urls = ["http://pic118.huitu.com/res/20190420/1480621_20190420132348580020_1.jpg",
        "https://img.ivsky.com/img/tupian/pre/201811/19/jiguang-008.jpg"]
ex = futures.ThreadPoolExecutor(max_workers=2)  #創(chuàng)建線程池
res_iter = ex.map(download_img, urls)   #以第二個參數(shù)為第一個參數(shù)(函數(shù))的參數(shù)

"""
print(type(res_iter))   #發(fā)現(xiàn)其是一個生成器
for res in res_iter:
    print(type(res))
    help(res)
"""
fu_tasks = [ex.submit(download_img, url) for url in urls]

"""
print(type(fu_tasks[0]))
for future in futures.as_completed(fu_tasks):   #as_completed沒有順序,誰先完成就先返回誰
    print(future.result())
"""

ThreadLocal對象在線程中的應(yīng)用

傳參問題

前奏-其一

  • 函數(shù)
    • 傳參
    • 全局變量
    • 返回值

前奏-其二

-使用全局字典
用一個全局字典dict存放所有的對象,然后以thread自身作為key獲得線程對應(yīng)的對象(以Studetn為例)

global_dict = {}
def std_thread(name):
    std = Student(name) #當(dāng)多個線程來執(zhí)行std_thread的時候,各自得到各自的Student對象
    #把std放到全局變量global_dict中;鍵值也可以使用
    global_dict[threading.current_thread()] = std   #當(dāng)多個線程來執(zhí)行這句話時,由于鍵值不同,能夠取出不同的值
    do_task1()
    do_task2()
def do_task1():
    #不傳入std,而是根據(jù)當(dāng)前線程查找,故也可以取出自己想要的值。在沒有傳參的情況下保證了多個線程使用同一個全局變量來時候并沒有出錯
    std = global_dict[threading.current_thread()]   #鍵值也可以使用threading.current_thread().name
    ...
def do_task2():
    #不傳入std,而是根據(jù)當(dāng)前線程查找:
    std = global_dict[threading.current_thread()]
    ...

這種方式理論上是可行的,它最大的有點是消除了std對象在每層函數(shù)中的傳遞問題,但是,每個函數(shù)獲得std的代碼有點low

其三:使用ThreadLocal

import threading

local_school = threading.local()    #創(chuàng)建一個ThreadLocal對象

def process_student():
    #獲取當(dāng)前進(jìn)程相關(guān)的student
    std = local_school.student  #取出在process_thread中賦予的屬性

    print("Hlello, {} in {}".format(std, threading.current_thread().name))

def process_thread(name):
    #綁定ThreadLocal的studetn
    local_school.student = name #給對象添加屬性student,對于ThreadLocal對象的同一個屬性,在一個線程中設(shè)置的是哪個值,到時候取出的就是哪個值,不會因為線程不一樣導(dǎo)致同一屬性的值不一樣(即:對于同一個屬性,在不同線程中的操作是互不影響的)
    process_student()

t1 = threading.Thread(target=process_thread, args=("biubiu~",), name = "Thread_A")
t2 = threading.Thread(target=process_thread, args=("老王",), name = "Thread-B")

t1.start()
t2.start()

t1.join()
t2.join()
'''運行結(jié)果如下:
Hlello, biubiu~ in Thread_A
Hlello, 老王 in Thread-B

'''
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念,程序并不能單獨運行,只有...
    go以恒閱讀 1,795評論 0 6
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,658評論 1 32
  • 線程池ThreadPoolExecutor corepoolsize:核心池的大小,默認(rèn)情況下,在創(chuàng)建了線程池之后...
    irckwk1閱讀 864評論 0 0
  • Java-Review-Note——4.多線程 標(biāo)簽: JavaStudy PS:本來是分開三篇的,后來想想還是整...
    coder_pig閱讀 1,772評論 2 17
  • 1.1 多線程介紹 學(xué)習(xí)多線程之前,我們先要了解幾個關(guān)于多線程有關(guān)的概念。 進(jìn)程:進(jìn)程指正在運行的程序。確切的來說...
    Pecksniff1994閱讀 1,644評論 0 2

友情鏈接更多精彩內(nèi)容