Python中線程的理解

Num01-->多線程threading

Python中建議使用threading模塊,而不要使用thread模塊。原因如下:

1,Python中threading模塊對(duì)thread進(jìn)行了一些包裝,可以更加方便的使用。
2,Python中threading模塊能確保重要的子線程在進(jìn)程退出前結(jié)束。
3,Python中thread模塊,當(dāng)主線程技術(shù),同一主線程下的其他所有子線程都被強(qiáng)制退出。
4,Python中thread模塊,不支持守護(hù)(daemon)線程。

Test01-->多線程執(zhí)行

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/28 14:05
# @Author  : xiaoke

import threading
import time


def say():
    print("我是子線程")
    time.sleep(1)


if __name__ == "__main__":
    print("我是主線程")
    # 創(chuàng)建三個(gè)子線程
    for i in range(3):
        t = threading.Thread(target=say)
        # 啟動(dòng)線程
        t.start()  
# 結(jié)果如下:
# 我是主線程
# 我是子線程
# 我是子線程
# 我是子線程

Test02-->主線程會(huì)等待所有子線程結(jié)束后,再結(jié)束

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/28 14:05
# @Author  : xiaoke

import threading
from time import sleep, ctime


def sing():
    for i in range(3):
        print("正在唱歌...%d" % i)
        sleep(1)

    print("唱歌子線程結(jié)束")


def dance():
    for i in range(3):
        print("正在跳舞...%d" % i)
        sleep(1)
    print("跳舞子線程結(jié)束")


if __name__ == '__main__':
    print('主線程---開(kāi)始時(shí)間---:%s' % ctime())

    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)

    t1.start()
    t2.start()

    # 屏蔽以下代碼,試試看,程序是否會(huì)立馬結(jié)束?
    # t1.join()
    # t2.join()

    print('主線程---結(jié)束時(shí)間---:%s' % ctime())

# 沒(méi)有屏蔽join()函數(shù)的結(jié)果如下:
# 主線程---開(kāi)始時(shí)間---:Fri Apr 28 14:19:29 2017
# 正在唱歌...0
# 正在跳舞...0
# 正在唱歌...1
# 正在跳舞...1
# 正在跳舞...2
# 正在唱歌...2
# 跳舞子線程結(jié)束
# 唱歌子線程結(jié)束
# 主線程---結(jié)束時(shí)間---:Fri Apr 28 14:19:32 2017


# 屏蔽join()函數(shù),的結(jié)果如下:
# 主線程---開(kāi)始時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在唱歌...0
# 正在跳舞...0
# 主線程---結(jié)束時(shí)間---:Fri Apr 28 14:14:00 2017
# 正在跳舞...1
# 正在唱歌...1
# 正在跳舞...2
# 正在唱歌...2
# 唱歌子線程結(jié)束
# 跳舞子線程結(jié)束

以上代碼說(shuō)明,子線程在啟動(dòng)后,調(diào)用join()函數(shù),是等待所有的子線程結(jié)束后,主線程才結(jié)束。否則主線程,很快就結(jié)束了。

Test03-->查看線程數(shù)量

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading
from time import sleep, ctime
import gc


def sing():
    for i in range(3):
        print("正在唱歌...%d" % i)
        sleep(1)
    print("唱歌子線程結(jié)束")


def dance():
    for i in range(3):
        print("正在跳舞...%d" % i)
        sleep(1)
    print("跳舞子線程結(jié)束")


if __name__ == '__main__':
    print('---開(kāi)始時(shí)間---:%s' % ctime())

    t1 = threading.Thread(target=sing)
    t2 = threading.Thread(target=dance)

    t1.start()
    t2.start()

    while True:
        # enumerate()函數(shù)的意思:返回一個(gè)列表,存放當(dāng)前活著的線程
        length = len(threading.enumerate())
        print('當(dāng)前運(yùn)行的線程數(shù)為:%d' % length)
        if length <= 1:
            break

        sleep(1)

    # t1.join()
    # t2.join()
    print('---結(jié)束時(shí)間---:%s' % ctime())
    # 手動(dòng)垃圾回收
    #gc.collect()
    print('當(dāng)前還活著的線程是:%s' % threading.current_thread().name)
# 結(jié)果如下:
# ---開(kāi)始時(shí)間---:Fri Apr 28 14:44:50 2017
# 正在唱歌...0
# 正在跳舞...0
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在唱歌...1
# 正在跳舞...1
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在唱歌...2
# 當(dāng)前運(yùn)行的線程數(shù)為:3
# 正在跳舞...2
# 唱歌子線程結(jié)束
# 當(dāng)前運(yùn)行的線程數(shù)為:2
# 跳舞子線程結(jié)束
# 當(dāng)前運(yùn)行的線程數(shù)為:1
# ---結(jié)束時(shí)間---:Fri Apr 28 14:44:54 2017
# 當(dāng)前還活著的線程是:MainThread

以上代碼加以說(shuō)明:在程序的最后,還剩下主線程還在存活,是因?yàn)橐粋€(gè)程序至少要有一個(gè)主線程,一直存活著。當(dāng)然Python垃圾回收器也會(huì)不定時(shí)的回收垃圾(引用計(jì)數(shù)和分代清除兩種機(jī)制),也可以手動(dòng)回收垃圾。

Num02-->第二種方式創(chuàng)建多線程

定義一個(gè)新的子類,繼承threading.Thread就可以,然后重寫run()方法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading
import time


class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm " + self.name + ' @ ' + str(i)  # name屬性中保存的是當(dāng)前線程的名字
            print(msg)


def main():
    for i in range(3):
        t = MyThread()
        t.start()


if __name__ == '__main__':
    main()

# 結(jié)果如下:
# I'm Thread-2 @ 0
# I'm Thread-1 @ 0
# I'm Thread-3 @ 0
# I'm Thread-1 @ 1
# I'm Thread-3 @ 1
# I'm Thread-2 @ 1
# I'm Thread-2 @ 2
# I'm Thread-1 @ 2
# I'm Thread-3 @ 2

以上代碼加以說(shuō)明:

1,Python中threading.Thread類中有一個(gè)run()方法,用于定義線程的功能函數(shù),可以在自己定義的線程類中重寫該方法。而創(chuàng)建自己的線程實(shí)例后,通過(guò)Thread類的start()方法,可以啟動(dòng)該線程,交個(gè)Python虛擬機(jī)進(jìn)行調(diào)度,當(dāng)該線程獲得執(zhí)行的機(jī)會(huì)時(shí),就會(huì)調(diào)用run()方法執(zhí)行線程。

2,多線程程序的執(zhí)行順序是不確定的。當(dāng)執(zhí)行到sleep()語(yǔ)句時(shí),線程將被阻塞(Blocked),到sleep()結(jié)束后,線程進(jìn)入就緒狀態(tài)(Runnable),等待調(diào)度。而線程的調(diào)度將會(huì)隨機(jī)選擇一個(gè)線程執(zhí)行。上面的代碼只能保證每個(gè)線程都運(yùn)行完,整個(gè)run()函數(shù)。但是線程的啟動(dòng)順序,run()函數(shù)中每次循環(huán)的執(zhí)行順序不能確定。

Num03-->線程的幾種狀態(tài)

這里寫圖片描述

Num04-->多線程共享全局變量

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

from threading import Thread
import time

g_num = 100


def work1():
    global g_num
    for i in range(3):
        g_num += 1

    print("----in work1, g_num is %d---" % g_num)


def work2():
    global g_num
    print("----in work2, g_num is %d---" % g_num)


print("---線程創(chuàng)建之前g_num is %d---" % g_num)

t1 = Thread(target=work1)
t1.start()
t1.join()

t2 = Thread(target=work2)
t2.start()
t2.join()

以上代碼加以說(shuō)明:
1,優(yōu)點(diǎn):不管是可變類型或者不可變類型的數(shù)據(jù),在一個(gè)進(jìn)程內(nèi)的所有線程共享全局變量,能夠在不使用其他方式的前提下完成多線程之間的數(shù)據(jù)共享。
2,缺點(diǎn):多線程對(duì)全局變量的隨意改變,會(huì)造成對(duì)全局變量值的混亂。也就是多線程非安全的原因。

Num05-->多線程中的互斥鎖

當(dāng)多個(gè)線程幾乎同時(shí)修改某一個(gè)共享數(shù)據(jù)的時(shí)候,需要進(jìn)行同步控制

線程同步能夠保證多個(gè)線程安全訪問(wèn)競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖。

互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定。

某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個(gè)線程進(jìn)行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。

Test01-->互斥鎖的創(chuàng)建

#創(chuàng)建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([blocking])
#釋放
mutex.release()

加以說(shuō)明:
其中,鎖定方法acquire可以有一個(gè)blocking參數(shù)。

如果設(shè)定blocking為True,則當(dāng)前線程會(huì)堵塞,直到獲取到這個(gè)鎖為止。(默認(rèn)為True)

如果設(shè)定blocking為False,則當(dāng)前線程不會(huì)堵塞

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

from threading import Thread, Lock
import time

g_num = 0


def test1():
    global g_num
    for i in range(1000000):
        # True表示堵塞 即如果這個(gè)鎖在上鎖之前已經(jīng)被上鎖了,那么這個(gè)線程會(huì)在這里一直等待到解鎖為止
        # False表示非堵塞,即不管本次調(diào)用能夠成功上鎖,都不會(huì)卡在這,而是繼續(xù)執(zhí)行下面的代碼
        mutexFlag = mutex.acquire(True)
        if mutexFlag:
            g_num += 1
            mutex.release()

    print("---test1---g_num=%d" % g_num)


def test2():
    global g_num
    for i in range(1000000):
        mutexFlag = mutex.acquire(True)  # True表示堵塞
        if mutexFlag:
            g_num += 1
            mutex.release()

    print("---test2---g_num=%d" % g_num)


def main():
    p1 = Thread(target=test1)
    p1.start()

    p2 = Thread(target=test2)
    p2.start()

    print("---g_num=%d---" % g_num)

    p1.join()
    p2.join()


if __name__ == '__main__':
    # 創(chuàng)建一個(gè)互斥鎖
    # 這個(gè)所默認(rèn)是未上鎖的狀態(tài)
    mutex = Lock()
    main()
    
# 運(yùn)行結(jié)果如下:
# ---g_num=44416---
# ---test1---g_num=1990567
# ---test2---g_num=2000000

Test02-->上鎖解鎖過(guò)程

當(dāng)一個(gè)線程調(diào)用鎖的acquire()方法獲得鎖時(shí),鎖就進(jìn)入“l(fā)ocked”鎖住狀態(tài)。

每次只有一個(gè)線程可以獲得鎖。

如果此時(shí)另一個(gè)線程試圖獲得鎖,該線程就會(huì)變成“blocked”狀態(tài),稱為阻塞。直到擁有鎖的線程調(diào)用release()函數(shù)釋放鎖之后,這時(shí)這個(gè)鎖就進(jìn)入“unlocked”解鎖的狀態(tài)。

線程調(diào)度程序,從處于同步阻塞狀態(tài)的線程中選擇一個(gè)來(lái)獲得鎖,并使得該線程進(jìn)入(running)運(yùn)行狀態(tài)。

Test03-->互斥鎖的優(yōu)缺點(diǎn)

優(yōu)點(diǎn):確保了某段關(guān)鍵代碼只能有一個(gè)線程從頭到尾的執(zhí)行。
缺點(diǎn):
1,阻止了多線程并發(fā)執(zhí)行,包含鎖的代碼實(shí)際上是以單線程的模式執(zhí)行,效率大大降低了。
2,由于可以存在多個(gè)鎖,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖。

Num06-->線程死鎖和遞歸鎖

Test01--> 死鎖的情況

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        # 對(duì)mutexA上鎖
        if mutexA.acquire():
            print(self.name + '----mutexA上鎖----')
            time.sleep(1)

            if mutexB.acquire():
                print(self.name + '----mutexA中mutexB上鎖----')
                mutexB.release()
            mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        # 對(duì)mutexB上鎖
        if mutexB.acquire():
            print(self.name + '----mutexB上鎖----')
            time.sleep(1)
            if mutexA.acquire():
                print(self.name + '----mutexB中mutexA上鎖----')
                mutexA.release()
            mutexB.release()


mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    t1.join()
    t2.join()

# 死鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-2----mutexB上鎖----
# 我手寫的:一直停留在這里不動(dòng)


Test02--> 遞歸鎖,用于解決死鎖的問(wèn)題,對(duì)以上死鎖代碼加以修改

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        # 對(duì)mutexA上鎖
        if mutex.acquire():
            print(self.name + '----mutexA上鎖----')
            time.sleep(1)

            if mutex.acquire():
                print(self.name + '----mutexA中mutexB上鎖----')
                mutex.release()
            mutex.release()


class MyThread2(threading.Thread):
    def run(self):
        # 對(duì)mutexB上鎖
        if mutex.acquire():
            print(self.name + '----mutexB上鎖----')
            time.sleep(1)
            if mutex.acquire():
                print(self.name + '----mutexB中mutexA上鎖----')
                mutex.release()
            mutex.release()


mutex = threading.RLock()

if __name__ == '__main__':
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    t1.join()
    t2.join()

# RLock遞歸鎖的結(jié)果如下:
# Thread-1----mutexA上鎖----
# Thread-1----mutexA中mutexB上鎖----
# Thread-2----mutexB上鎖----
# Thread-2----mutexB中mutexA上鎖----

Num07-->利用互斥鎖實(shí)現(xiàn)同步

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

# 利用互斥鎖實(shí)現(xiàn)同步

from threading import Thread, Lock
import time
# 先創(chuàng)建A、B、C三個(gè)鎖
lockA = Lock()
lockB = Lock()
lockC = Lock()


def task_a():
    # 申請(qǐng)lockA,獲得鎖才開(kāi)始任務(wù)
    while True:
        lockA.acquire()
        print("-----A--------")
        time.sleep(1)
        lockB.release()


def task_b():
    # 申請(qǐng)lockB,獲得鎖才開(kāi)始任務(wù)
    while True:
        lockB.acquire()
        print("-----B--------")
        time.sleep(1)
        lockC.release()


def task_c():
    # 申請(qǐng)lockC,獲得鎖才開(kāi)始任務(wù)
    while True:
        lockC.acquire()
        print("-----C--------")
        time.sleep(1)
        lockA.release()


def main():
    # lockB和lockC在主線程獲得鎖
    lockB.acquire()
    lockC.acquire()
    thread_a = Thread(target=task_a)
    thread_b = Thread(target=task_b)
    thread_c = Thread(target=task_c)

    thread_a.start()
    thread_b.start()
    thread_c.start()

    thread_a.join()
    thread_b.join()
    thread_c.join()


if __name__ == '__main__':
    main()
# 結(jié)果如下:
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# -----A--------
# -----B--------
# -----C--------
# 此處省略......一直在同步的打印A、B、C

Num08-->生產(chǎn)者和消費(fèi)者模式--Queue

Python的Queue模塊中提供了同步的、線程安全的隊(duì)列類,包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ)(可以理解為原子操作,即要么不做,要么就做完),能夠在多線程中直接使用??梢允褂藐?duì)列來(lái)實(shí)現(xiàn)線程間的同步。

為什么要使用生產(chǎn)者和消費(fèi)者模式???
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問(wèn)題于是引入了生產(chǎn)者和消費(fèi)者模式。

什么是生產(chǎn)者消費(fèi)者模式???
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的。

Test01-->FIFO先進(jìn)先出隊(duì)列Queue

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import queue  # 隊(duì)列,解決多線程問(wèn)題

q = queue.Queue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})

while True:
    if not q.empty():
        data = q.get(block=False)
        print(data)
# 結(jié)果如下:
# 我的第一個(gè)進(jìn)來(lái)的
# 我是第二個(gè)進(jìn)來(lái)的
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}

Test02-->LIFO后入先出隊(duì)列LifoQueue

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import queue
# 后進(jìn)先出
q = queue.LifoQueue()
q.put("我的第一個(gè)進(jìn)來(lái)的")
q.put("我是第二個(gè)進(jìn)來(lái)的")
q.put({"name": "我是第三個(gè)進(jìn)來(lái)的"})

while True:
    if not q.empty():
        data = q.get(block=False)
        print(data)
# 結(jié)果如下:
# {'name': '我是第三個(gè)進(jìn)來(lái)的'}
# 我是第二個(gè)進(jìn)來(lái)的
# 我的第一個(gè)進(jìn)來(lái)的

Test03-->優(yōu)先級(jí)隊(duì)列PriorityQueue

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import queue

# 按優(yōu)先級(jí),按照2,3,4,5這樣從小到大的優(yōu)先級(jí)
q = queue.PriorityQueue()
q.put([5, "我的第一個(gè)進(jìn)來(lái)的"])
q.put([2, "我是第二個(gè)進(jìn)來(lái)的"])
q.put([4, {"name": "我是第三個(gè)進(jìn)來(lái)的"}])
q.put([3, (2, 3, 4, 5)])

while True:
    if not q.empty():
        data = q.get(block=False)
        print(data)

# 結(jié)果如下:
# [2, '我是第二個(gè)進(jìn)來(lái)的']
# [3, (2, 3, 4, 5)]
# [4, {'name': '我是第三個(gè)進(jìn)來(lái)的'}]
# [5, '我的第一個(gè)進(jìn)來(lái)的']

Test04-->Queue隊(duì)列的方法:

創(chuàng)建一個(gè)“隊(duì)列”對(duì)象
from queue import Queue
q = queue.Queue(maxsize = 100)
Queue類即是一個(gè)隊(duì)列的同步實(shí)現(xiàn)。隊(duì)列長(zhǎng)度可為無(wú)限或者有限??赏ㄟ^(guò)Queue的構(gòu)造函數(shù)的可選參數(shù)maxsize來(lái)設(shè)定隊(duì)列長(zhǎng)度。如果maxsize小于1就表示隊(duì)列長(zhǎng)度無(wú)限。

將一個(gè)值放入隊(duì)列中
q.put(100)
調(diào)用隊(duì)列對(duì)象的put()方法在隊(duì)尾插入一個(gè)項(xiàng)目。put()有兩個(gè)參數(shù),第一個(gè)item為必需的,為插入項(xiàng)目的值;第二個(gè)block為可選參數(shù),默認(rèn)為
1。如果隊(duì)列當(dāng)前為空且block為1,put()方法就使調(diào)用線程暫停,直到空出一個(gè)數(shù)據(jù)單元。如果block為0,put方法將引發(fā)Full異常。

將一個(gè)值從隊(duì)列中取出
q.get()
調(diào)用隊(duì)列對(duì)象的get()方法從隊(duì)頭刪除并返回一個(gè)項(xiàng)目。可選參數(shù)為block,默認(rèn)為True。如果隊(duì)列為空且block為True,
get()就使調(diào)用線程暫停,直至有項(xiàng)目可用。如果隊(duì)列為空且block為False,隊(duì)列將引發(fā)Empty異常。


此包中的常用方法(q = queue.Queue()):

q.qsize()  返回隊(duì)列的大小
q.empty()  如果隊(duì)列為空,返回True,反之False
q.full()   如果隊(duì)列滿了,返回True,反之False
q.full 與 maxsize 大小對(duì)應(yīng)
q.get([block[, timeout]])  獲取隊(duì)列,timeout等待時(shí)間
q.get_nowait()      相當(dāng)q.get(False)
非阻塞 q.put(item)   寫入隊(duì)列,timeout等待時(shí)間
q.put_nowait(item)  相當(dāng)q.put(item, False)
q.task_done()  在完成一項(xiàng)工作之后,q.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
q.join()       實(shí)際上意味著等到隊(duì)列為空,再執(zhí)行別的操作

Test05-->生產(chǎn)者消費(fèi)者案例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke
from  threading import Thread
import time
from queue import Queue

# 3個(gè)生產(chǎn)者,當(dāng)產(chǎn)品數(shù)量小于100就生產(chǎn)8個(gè)
# 5個(gè)消費(fèi)者,當(dāng)產(chǎn)品數(shù)量大于10就消費(fèi)3
storageQueue = Queue()  # 創(chuàng)建用于保存產(chǎn)品的隊(duì)列
# 保存產(chǎn)品編號(hào)
serial_num = 0


# 生產(chǎn)者
class Producer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        global serial_num
        while True:
            if storageQueue.qsize() < 100:
                # 向隊(duì)列中添加消息模擬生產(chǎn)
                for i in range(8):
                    msg = self.name + ":" + str(serial_num)
                    storageQueue.put(msg)
                    print("%s 生產(chǎn)了:%s" % (self.name, msg))
                    serial_num += 1
            time.sleep(1)


# 消費(fèi)者
class Consumer(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        while True:

            if storageQueue.qsize() > 10:
                # 從消息隊(duì)列中取消息,代表消費(fèi)產(chǎn)品
                for i in range(3):
                    msg = storageQueue.get()
                    print("%s消費(fèi)了:%s" % (self.name, msg))

            time.sleep(1)


def main():
    # 3個(gè)生產(chǎn)者
    for i in range(3):
        t = Producer("Producter--" + str(i))
        t.start()
    # 5個(gè)消費(fèi)者
    for i in range(5):
        t = Consumer("Consumer--" + str(i))
        t.start()


if __name__ == '__main__':
    main()


# 結(jié)果如下:
# Producter--0 生產(chǎn)了:Producter--0:0
# Producter--0 生產(chǎn)了:Producter--0:1
# Producter--0 生產(chǎn)了:Producter--0:2
# Producter--0 生產(chǎn)了:Producter--0:3
# Producter--0 生產(chǎn)了:Producter--0:4
# Producter--0 生產(chǎn)了:Producter--0:5
# Producter--0 生產(chǎn)了:Producter--0:6
# Producter--0 生產(chǎn)了:Producter--0:7
# Producter--1 生產(chǎn)了:Producter--1:8
# Producter--1 生產(chǎn)了:Producter--1:9
# Producter--1 生產(chǎn)了:Producter--1:10
# Producter--1 生產(chǎn)了:Producter--1:11
# Producter--1 生產(chǎn)了:Producter--1:12
# Producter--1 生產(chǎn)了:Producter--1:13
# Producter--2 生產(chǎn)了:Producter--2:13
# Producter--2 生產(chǎn)了:Producter--2:14
# Producter--2 生產(chǎn)了:Producter--2:15
# Producter--2 生產(chǎn)了:Producter--2:16
# Producter--2 生產(chǎn)了:Producter--2:17
# Producter--2 生產(chǎn)了:Producter--2:18
# Producter--2 生產(chǎn)了:Producter--2:19
# Producter--2 生產(chǎn)了:Producter--2:20
# Producter--1 生產(chǎn)了:Producter--1:22
# Producter--1 生產(chǎn)了:Producter--1:23
# Consumer--0消費(fèi)了:Producter--0:0
# Consumer--0消費(fèi)了:Producter--0:1
# Consumer--0消費(fèi)了:Producter--0:2
# Consumer--1消費(fèi)了:Producter--0:3
# Consumer--1消費(fèi)了:Producter--0:4
# Consumer--1消費(fèi)了:Producter--0:5
# Consumer--2消費(fèi)了:Producter--0:6
# Consumer--2消費(fèi)了:Producter--0:7
# Consumer--2消費(fèi)了:Producter--1:8
# Consumer--3消費(fèi)了:Producter--1:9
# Consumer--3消費(fèi)了:Producter--1:10
# Consumer--3消費(fèi)了:Producter--1:11
# Consumer--4消費(fèi)了:Producter--1:12
# Consumer--4消費(fèi)了:Producter--1:13
# Consumer--4消費(fèi)了:Producter--2:13
# Producter--0 生產(chǎn)了:Producter--0:24
# Producter--0 生產(chǎn)了:Producter--0:25
# Producter--0 生產(chǎn)了:Producter--0:26
# Producter--0 生產(chǎn)了:Producter--0:27
# Producter--0 生產(chǎn)了:Producter--0:28
# Producter--0 生產(chǎn)了:Producter--0:29
# 我手動(dòng)寫的:此處省略很多......

Num09-->守護(hù)線程

只要非守護(hù)線程結(jié)束了,不管守護(hù)線程結(jié)束沒(méi)結(jié)束,程序都結(jié)束.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke
import threading
import time


def run(n):
    print("我是守護(hù)線程--", n)
    time.sleep(1)


start_time = time.time()
thread_list = []
for i in range(3):
    t = threading.Thread(target=run, args=('thread_list-%s' % i,))
    # 設(shè)置線程為守護(hù)狀態(tài),非守護(hù)狀態(tài)線程都退出,程序就退出,不等待守護(hù)狀態(tài)線程
    t.daemon = True
    t.start()  # t.daemon=True 必須在 調(diào)用start()函數(shù) 前面
    thread_list.append(t)

print("當(dāng)前還活著的線程數(shù)量是:", threading.active_count())
print("當(dāng)前還活著的線程有:", thread_list)
print("當(dāng)前還剩線程是:", threading.current_thread().name)
print("耗時(shí):", time.time() - start_time)
# 結(jié)果是:
# 我是守護(hù)線程-- thread_list-0
# 我是守護(hù)線程-- thread_list-1
# 我是守護(hù)線程-- thread_list-2
# 當(dāng)前還活著的線程數(shù)量是: 4
# 當(dāng)前還活著的線程有: [<Thread(Thread-1, started daemon 12852)>, <Thread(Thread-2, started daemon 6696)>, <Thread(Thread-3, started daemon 11940)>]
# 當(dāng)前還剩線程是: MainThread
# 耗時(shí): 0.0

Num10-->多線程--非共享數(shù)據(jù)

在多線程開(kāi)發(fā)中,全局變量是多個(gè)線程都共享的數(shù)據(jù),而局部變量則是各個(gè)線程的,是非共享的。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, num, sleepTime):
        threading.Thread.__init__(self)
        self.num = num
        self.sleepTime = sleepTime

    # 重寫run()方法
    def run(self):
        self.num += 1
        time.sleep(self.sleepTime)
        print('線程(%s),num=%d' % (self.name, self.num))


if __name__ == '__main__':
    t1 = MyThread(99, 5)
    t1.start()
    t2 = MyThread(199, 1)
    t2.start()
    
    t1.join()
    t2.join()

# 結(jié)果如下:
# 線程(Thread-2),num=200
# 線程(Thread-1),num=100

Num11-->線程中ThreadLocal

Test01-->使用函數(shù)傳參的方法

def process_student(name):
    std = Student(name)
    # std是局部變量,但是每個(gè)函數(shù)都要用它,因此必須傳進(jìn)去:
    do_task_1(std)
    do_task_2(std)

def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)

def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)

每個(gè)函數(shù)一層一層調(diào)用都這么傳參數(shù)那還得了?用全局變量?也不行,因?yàn)槊總€(gè)線程處理不同的Student對(duì)象,不能共享。

Test02-->使用全局字典的方法

global_dict = {}

def std_thread(name):
    std = Student(name)
    # 把std放到全局變量global_dict中:
    global_dict[threading.current_thread()] = std
    do_task_1()
    do_task_2()

def do_task_1():
    # 不傳入std,而是根據(jù)當(dāng)前線程查找:
    std = global_dict[threading.current_thread()]
    ...

def do_task_2():
    # 任何函數(shù)都可以查找出當(dāng)前線程的std變量:
    std = global_dict[threading.current_thread()]
    ...

這種方式理論上是可行的,它最大的優(yōu)點(diǎn)是消除了std對(duì)象在每層函數(shù)中的傳遞問(wèn)題,但是,每個(gè)函數(shù)獲取std的代碼有點(diǎn)low。

Test03-->使用ThreadLocal的方法

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

from threading import Thread
import threading

# 創(chuàng)建ThreadLocal對(duì)象
local_school = threading.local()


class Student(object):
    def __init__(self, name):
        self.name = name


# 線程中調(diào)用的函數(shù)
def do_task():
    # 相當(dāng)于以當(dāng)前線程對(duì)象為key取出stu對(duì)象
    stu = local_school.student
    print("%s中學(xué)生名字為:%s" % (threading.current_thread().name, stu.name))


# 線程的功能函數(shù)
def task_thread(name):
    # 創(chuàng)建類的實(shí)例對(duì)象
    stu = Student(name)
    # 相當(dāng)于以當(dāng)前的線程對(duì)象為key,放入全局的字典local_school
    local_school.student = stu
    do_task()


def main():
    t1 = Thread(target=task_thread, args=("xiaoke",))
    t2 = Thread(target=task_thread, args=("lili",))

    t1.start()
    t2.start()

    t1.join()
    t2.join()


if __name__ == '__main__':
    main()
    
# 結(jié)果如下:
# Thread-1中學(xué)生名字為:xiaoke
# Thread-2中學(xué)生名字為:lili

全局變量local_school就是一個(gè)ThreadLocal對(duì)象,每個(gè)Thread對(duì)它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個(gè)屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問(wèn)題,ThreadLocal內(nèi)部會(huì)處理。

可以理解為全局變量local_school是一個(gè)dict。

ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫(kù)連接,HTTP請(qǐng)求,用戶身份信息等。這樣一個(gè)線程所有調(diào)用到的處理函數(shù),都可以非常方便地訪問(wèn)這些資源。

Test04-->小總結(jié)

一個(gè)ThreadLocal變量雖然是“全局變量”,但每個(gè)線程都只能讀寫自己線程的獨(dú)立副本,互不干擾。
ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問(wèn)題。

Num12-->GIL全局解釋器鎖

作用:保證同一時(shí)刻,無(wú)論你有多少個(gè)線程,只有一個(gè)線程被CPU執(zhí)行。

因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。

所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過(guò)多線程利用多核,那只能通過(guò)C擴(kuò)展來(lái)實(shí)現(xiàn),不過(guò)這樣就失去了Python簡(jiǎn)單易用的特點(diǎn)。

不過(guò),也不用過(guò)于擔(dān)心,Python雖然不能利用多線程實(shí)現(xiàn)多核任務(wù),但可以通過(guò)多進(jìn)程實(shí)現(xiàn)多核任務(wù)。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖,互不影響。

Num13-->信號(hào)量Semaphore

信號(hào)量:是指同時(shí)開(kāi)幾個(gè)線程并發(fā)。比如廁所有5個(gè)坑,那最多只允許5個(gè)人上廁所,后面的人只能等里面的5個(gè)人都出來(lái)了,才能再進(jìn)去。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading, time


class myThread(threading.Thread):
    def run(self):
        # 加把鎖,可以放進(jìn)去多個(gè)(相當(dāng)于5把鎖,同時(shí)有5個(gè)線程)
        if semaphore.acquire():
            print(self.name)
            time.sleep(3)
            semaphore.release()


if __name__ == "__main__":
    # 同時(shí)能有幾個(gè)線程進(jìn)去(設(shè)置為5就是一次5個(gè)線程進(jìn)去)
    semaphore = threading.Semaphore(5)

    thread_list = []  # 空列表
    for i in range(200):  # 200個(gè)線程
        thread_list.append(myThread())  # 加線程對(duì)象

    for t in thread_list:
        t.start()  # 分別啟動(dòng)

# 結(jié)果如下:每隔3秒就會(huì)同時(shí)顯示5個(gè)進(jìn)程
# Thread-1
# Thread-2
# Thread-3
# Thread-4
# Thread-5
# Thread-6
# Thread-7
# Thread-8
# Thread-9
# Thread-10
# .......

Num14-->Event線程間通信

Python提供了Event對(duì)象用于線程間通信。它是由線程設(shè)置的信號(hào)標(biāo)志,如果信號(hào)標(biāo)志為真,則其他線程等待,直到信號(hào)拿到。

Event對(duì)象實(shí)現(xiàn)了簡(jiǎn)單的線程通信機(jī)制。它提供了設(shè)置信號(hào),清除信號(hào),等待信號(hào)等,用于實(shí)現(xiàn)線程間的通信。

Events的使用

event = threading.Event()
event.wait()
Event對(duì)象wait()方法只有在內(nèi)部信號(hào)為真的時(shí)候,才會(huì)很快的執(zhí)行并完成返回。當(dāng)Event對(duì)象的內(nèi)部信號(hào)標(biāo)志為假時(shí),則wait()方法一直等待到其為真時(shí)才返回。

event.set()
使用Event的set()方法可以設(shè)置Event對(duì)象內(nèi)部的信號(hào)標(biāo)志為真。Event對(duì)象提供了isSet()方法來(lái)判斷其內(nèi)部信號(hào)標(biāo)志的狀態(tài)。當(dāng)使用Event對(duì)象的set()方法后,isSet()方法返回真。

event.clear()
使用Event對(duì)象的clear()方法可以清除Event對(duì)象內(nèi)部的信號(hào)標(biāo)志。即將其設(shè)為假,當(dāng)使用Event的clear方法后,isSet()方法返回假。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke

import threading, time


class Boss(threading.Thread):
    def run(self):
        print("大老板說(shuō):今晚大家都要加班到24:00。")
        print(event.isSet())
        event.set()
        print(event.isSet())
        print("辛苦大家了......加油干吧!?。?)
        time.sleep(10)
        print("大老板說(shuō):24:00到了,可以下班了。")
        print(event.isSet())
        event.set()
        print(event.isSet())


class Worker(threading.Thread):
    def run(self):
        # 等待信號(hào)為真,取數(shù)據(jù)
        event.wait()
        print("加油工作,多賺錢……!")
        time.sleep(5)
        event.clear()
        # 等待信號(hào)為真,取數(shù)據(jù)
        event.wait()
        print("下班回家陪老婆,孩子,哈哈哈!")


if __name__ == "__main__":
    event = threading.Event()
    threads = []
    # 三個(gè)員工,一個(gè)老板
    for i in range(3):
        threads.append(Worker())
    threads.append(Boss())

    # 開(kāi)啟老板和員工
    for t in threads:
        t.start()
    # 等待老板和員工工作都結(jié)束
    for t in threads:
        t.join()

# 結(jié)果如下:
# 大老板說(shuō):今晚大家都要加班到24:00。
# False
# True
# 辛苦大家了......加油干吧?。?!
# 加油工作,多賺錢……!
# 加油工作,多賺錢……!
# 加油工作,多賺錢……!
# 大老板說(shuō):24:00到了,可以下班了。
# False
# True
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!
# 下班回家陪老婆,孩子,哈哈哈!
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容