Python核心基礎(多線程、多進程編程)(Queue,Lock/Rlock,Condition,Semaphore)

前言

一個人活在這個世界上為了什么呢?我覺得是去經(jīng)歷和享受。對于沒做過的事情要做一做。
每個人在年輕的時候,所做出的的選擇是沒有對錯之分的,所有的選擇都是對的,只能說對于所做選擇的結果,只是好與更好的差別。每個人都有自己衡量事物的價值觀,我們有什么樣的認知就會投影出什么樣的圖像,所以一定要不斷超越有限的認知,不斷地提升內(nèi)外的自由度,不要嘗試讓自己假裝看起來很努力,因為結果不會陪你演戲!
學習如逆水行舟 不進則退

實戰(zhàn)

什么是GIL ( global interpreter lock ): 全局解釋鎖

Python中的一個線程對應于c語言當中的一個線程;因為python語言在前期為了簡單,在進行編程的時候,會在解釋器上面加一個非常大的鎖;它允許我們一次只有一個線程運行在我們的CPU上。

學習多線程,希望大家能夠了解2點:
1、python在多線程中為什么有人會覺得它慢? ---> 字節(jié)碼 - 使得同一時刻只能有一個線程在一個CPU上面執(zhí)行字節(jié)碼
2、是不是多線程真的就很慢?--->(這個要分情況,后面會進行驗證)
  • dis庫是python(默認的CPython)自帶的一個庫,可以用來分析字節(jié)碼
import dis
# 反編譯 將函數(shù)變成字節(jié)碼的流程
def tony(t):
    t = t+1
    return t
print(dis.dis(tony))
image

gil在某些情況下是可以被釋放掉的--> 這個是python內(nèi)部的策略問題;所以GIL鎖會釋放。

  • 第一種情況釋放:gil會根據(jù)執(zhí)行的字節(jié)碼行數(shù)以及時間片進行釋放gil
  • 第二種情況釋放:程序會在遇到io操作的時候 主動釋放
什么是時間片? 時間片的概念是什么?
  時間片即CPU分配給各個程序的時間,每個線程被分配一個時間段,稱作它的時間片,即該進程允許運行的時間,使各個程序從表面上看是同時進行的。如果在時間片結束時進程還在運行,則CPU將被剝奪并分配給另一個進程。如果進程在時間片結束前阻塞或結束,則CPU當即進行切換。而不會造成CPU資源浪費。
- 在宏觀上:我們可以同時打開多個應用程序,每個程序并行不悖,同時運行。
- 在微觀上:由于只有一個CPU,一次只能處理程序要求的一部分,如何處理公平,一種方法就是引入時間片,每個程序輪流執(zhí)行。--->{時間片輪回算法}
a = 0
def time():
    '''
    1. thread_001
    2. io操作  沒想象中那么嚴,會主動釋放的
    3. thread_002
    :return:
    '''
    global a  # LEGB
    for item in range(1000000):
        a += 1

def test():
   global a
   for item in range(1000000):
       a -= 1
import threading

thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()
print(a)

threading多線程-守護線程

image
class GetDataHtml(threading.Thread):
    def __init__(self,name):
        super().__init__(name=name)
    def run(self):
        print('開始獲取數(shù)據(jù)html的時間')
        time.sleep(2)
        print('獲取數(shù)據(jù)html結束的時間')

class GetDataUrl(threading.Thread):
    def __init__(self,name):
        super().__init__(name=name)
    def run(self):
        print('開始獲取數(shù)據(jù)url的時間')
        time.sleep(2)
        print('獲取數(shù)據(jù)url結束的時間')
if __name__ == '__main__':
    thread_one=GetDataHtml('tony')
    thread_two=GetDataUrl('老師')
    start_time = time.time()
    thread_one.start()
    thread_two.start()
    # print("中間運行時間:{}".format(time.time()-start_time))
    # 阻塞
    thread_one.join()
    thread_two.join()
    print("中間運行時間:{}".format(time.time()-start_time))

線程間的通訊-共享變量

queue

線程間的通訊

共享變量

import time
import threading

data_list = []
def get_data_html():
    global data_list
    for url in data_list:
        print('開始獲取數(shù)據(jù)html的時間')
        time.sleep(2)
        print('獲取數(shù)據(jù)html結束的時間')
def get_data_url():
    global data_list
    print('開始獲取數(shù)據(jù)url的時間')
    time.sleep(3)
    for item in range(30):
        data_list.append('http://logic.org/{}id'.format(item))
    print('獲取數(shù)據(jù)url結束的時間')
if __name__ == '__main__':
    thread_url = threading.Thread(target=get_data_url)
    thread_1 = threading.Thread(target=get_data_html)
    thread_2 = threading.Thread(target=get_data_html)
    thread_3 = threading.Thread(target=get_data_html)
    thread_4 = threading.Thread(target=get_data_html)
    thread_5 = threading.Thread(target=get_data_html)
    start_time = time.time()
    thread_url.start()
    thread_1.start()
    thread_2.start()
    thread_3.start()
    thread_4.start()
    thread_5.start()

import time
import threading
# from '第七講-Tony老師' import test_tool

data_list = []
def get_data_html(data_list):
    # global data_list
    while True:
        if len(data_list):
            url=data_list.pop()
            # for url in data_list:
            print('開始獲取數(shù)據(jù)html的時間')
            time.sleep(2)
            print('獲取數(shù)據(jù)html結束的時間')
def get_data_url(data_list):
    # global data_list
    while True:
        print('開始獲取數(shù)據(jù)url的時間')
        time.sleep(3)
        for item in range(30):
            data_list.append('http://logic.org/{id}'.format(id=item))
        print('獲取數(shù)據(jù)url結束的時間')
if __name__ == '__main__':
    thread_url = threading.Thread(target=get_data_url,args=(data_list,))
    for item in range(10):
        thread_html = threading.Thread(target=get_data_html,args=(data_list,))
        thread_html.start()
    start_time = time.time()
    thread_url.start()
    print("中間運行時間:{}".format(time.time()-start_time))

Queue

1.共享變量

2.queue隊列 - 它本身是安全的 - 引用了 deque 雙端隊列

import time
import threading
from queue import Queue

def get_data_html(queue):
    # global data_list
    while True:
        url = queue.get()
        print('開始獲取數(shù)據(jù)html的時間')
        time.sleep(2)
        print('獲取數(shù)據(jù)html結束的時間')


def get_data_url(queue):
    # global data_list
    while True:
        print('開始獲取數(shù)據(jù)url的時間')
        time.sleep(3)
        for item in range(30):
            # 添加
            queue.put('http://logic.org/{id}'.format(id=item))
        print('獲取數(shù)據(jù)url結束的時間')


if __name__ == '__main__':
    # 設置隊列
    data_url_queue = Queue(maxsize=1000)
    thread_url = threading.Thread(target=get_data_url,args=(data_url_queue,))
    for item in range(10):
        thread_html = threading.Thread(target=get_data_html,args=(data_url_queue,))
        thread_html.start()
    start_time = time.time()
    # 成對出現(xiàn)的
    data_url_queue.join()
    data_url_queue.task_done()
    # thread_url.start()
    print("中間運行時間:{}".format(time.time()-start_time))

Lock,Rlock

1.線程間的通訊問題

2.線程同步問題 - 為了解決結果不一致

'''

'''
import dis


def time(a):
    a += 1


def test(a):
    a -= 1
'''
0 LOAD_FAST                0 (a)
2 LOAD_CONST               1 (1)
4 INPLACE_ADD   +          1
6 STORE_FAST               0 (a)
'''

print(dis.dis(time))
print(dis.dis(test))

Module_thread_lock.py

'''
A(a,b)
acquire(a)
acquire(b)
                    資源競爭 - 互相等待
B(a,b)
acquire(b)
acquire(a)
'''

from threading import Lock
a = 0
lock = Lock()
def time():
    global a
    global lock
    for item in range(1000000):
        # 上鎖
        lock.acquire()
        a += 1
        # 釋放鎖
        lock.release()


def test():
    global a
    global lock
    for item in range(1000000):
        # 上鎖
        lock.acquire()
        a -= 1
        # 釋放鎖
        lock.release()


import threading

thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)

thread_1.start()
thread_2.start()

thread_1.join()
thread_1.join()
print(a)

結論:

1.用鎖會影響性能

2.鎖會引起死鎖 資源競爭 - 互相等待

RLock - 可重入鎖

'''

# '''
# import dis
#
#
# def time(a):
#     a += 1
#
#
# def test(a):
#     a -= 1
# '''
# 0 LOAD_FAST                0 (a)
# 2 LOAD_CONST               1 (1)
# 4 INPLACE_ADD   +          1
# 6 STORE_FAST               0 (a)
# '''
#
# print(dis.dis(time))
# print(dis.dis(test))
'''
A(a,b)
acquire(a)
acquire(b)
                    資源競爭 - 互相等待
B(a,b)
acquire(b)
acquire(a)
'''
from threading import Lock, RLock
a = 0
lock = RLock()
def time():
    global a
    global lock
    for item in range(1000000):
        # 上鎖
        lock.acquire()
        # do_time(lock)
        lock.acquire()
        a += 1
        # 釋放鎖
        lock.release()
        lock.release()


def do_time(lock):
    # 上鎖
    lock.acquire()
    # 0---------------------
    # 釋放鎖
    lock.release()


def test():
    global a
    global lock
    for item in range(1000000):
        # 上鎖
        lock.acquire()
        a -= 1
        # 釋放鎖
        lock.release()

import threading

thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)

thread_1.start()
thread_2.start()

thread_1.join()
thread_1.join()
print(a)

Condition(條件變量):主要用于復雜線程間同步的一個鎖。 ---> 通過condition完成雙方協(xié)同聊天的功能

wait 和 notify 是condition的精髓。
在調(diào)用with cond 之后才能調(diào)用 wait 或者 notify方法。 PS: **必須要在with語句下面,才能夠成功;否則會報錯。 **

在condition中 有2層鎖, 一把底層鎖會在線程調(diào)用wait方法的時候釋放,上面的鎖會在每次調(diào)用wait的時候分配一把并放入cond中的等待隊列中;直到notify方法的喚醒。

import threading

from threading import Condition

'''
張三 : 你好,很高興認識你     A
馬六 : 我也是                B
張三 : 你是什么樣的人      
馬六 : 我是憨憨
張三 : 。。。
馬六 : 。。。。。。
'''


class ZhangSan(threading.Thread):
    def __init__(self, lock):
        super().__init__(name='張三')
        self.lock = lock

    def run(self):
        lock.acquire()
        print("{} : 你好,很高興認識你".format(self.name))
        lock.release()

        # lock.acquire()
        # print("{} : 你是什么樣的人".format(self.name))
        # lock.release()


class MaLiu(threading.Thread):
    def __init__(self, lock):
        self.lock = lock
        super().__init__(name='馬六')

    def run(self):
        lock.acquire()
        print("{} : 我也是".format(self.name))
        lock.release()

        # lock.acquire()
        # print("{} : 我是憨憨".format(self.name))
        # lock.release()

if __name__ == '__main__':
    lock = threading.Lock()
    zhang_san = ZhangSan(lock)
    ma_liu = MaLiu(lock)

    zhang_san.start()
    ma_liu.start()

Condition

import threading

from threading import Condition

'''
張三 : 你好,很高興認識你     A
馬六 : 我也是                B
張三 : 你是什么樣的人      
馬六 : 我是憨憨
張三 : 。。。
馬六 : 。。。。。。
'''


class ZhangSan(threading.Thread):
    def __init__(self, cond):
        super().__init__(name='張三')
        self.cond = cond

    def run(self):
        with self.cond:
            # self.cond.acquire()
            print("{} : 你好,很高興認識你".format(self.name))
            self.cond.notify()
            self.cond.wait()
    
            print("{} : 你是什么樣的人".format(self.name))
            self.cond.notify()
            self.cond.wait()
            # self.cond.release()


class MaLiu(threading.Thread):
    def __init__(self, cond):
        self.cond = cond
        super().__init__(name='馬六')

    def run(self):
        with self.cond:
            # self.cond.acquire()
            self.cond.wait()
            print("{} : 我也是".format(self.name))
            self.cond.notify()
    
            self.cond.wait()
            print("{} : 我是憨憨".format(self.name))
            self.cond.notify()
            # self.cond.release()

if __name__ == '__main__':
    cond = threading.Condition()
    zhang_san = ZhangSan(cond)
    ma_liu = MaLiu(cond)
    # 程序的執(zhí)行順序
    ma_liu.start()
    zhang_san.start()

Semaphore (用于控制進入數(shù)量的鎖)

我們的線程對于操作系統(tǒng)來說,如果線程越多,我們操作系統(tǒng)的切換就會越慢;所以在某些時候我們想要控制線程并發(fā)數(shù)量的時候,Semaphore的意義是非常重要的;

'''
讀和寫
Semaphroe():用于控制進入的數(shù)量的鎖, 信號量
'''
import threading
# threading.Semaphore()
import time


class GetUrl(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        # list.append()
        for item in range(20):
            # 上鎖
            self.sem.acquire()
            html_thread = HtmlSpider('http://news.baidu.com/{}'.format(item), self.sem)
            html_thread.start()


class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("獲取了一頁html詳情信息!")
        # 釋放鎖
        self.sem.release()


if __name__ == '__main__':
    # 只允許并發(fā)3個
    sem = threading.Semaphore(3)
    url_threads = GetUrl(sem)
    url_threads.start()

在這個浮躁的時代;竟然還有人能堅持篇篇原創(chuàng);

如果本文對你學習有所幫助-可以點贊??+ 關注!將持續(xù)更新更多新的文章。

支持原創(chuàng)。感謝!

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容