進(jìn)程池和線程池

先看個(gè)例子:

from threading import Thread, current_thread
from queue import Queue
import time


def task_1():
    time.sleep(1)
    print('任務(wù)一')

def task_2():
    time.sleep(1)
    print('任務(wù)二')

thread = Thread(target=task_1)
thread.start()
thread.join()  #加入join讓子進(jìn)程執(zhí)行完,主進(jìn)程再繼續(xù)往下執(zhí)行
thread.start()

image.png

線程結(jié)束之后就不能再重復(fù)使用了。那要執(zhí)行2個(gè)任務(wù),我們可以創(chuàng)建2個(gè)線程各執(zhí)行一個(gè)任務(wù),但是我們需要節(jié)約資源,那如何只讓一個(gè)線程執(zhí)行幾個(gè)任務(wù)呢,即如何重復(fù)使用呢?

我們可以利用生產(chǎn)者消費(fèi)者模型來(lái)


def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2():
    print('任務(wù)二開(kāi)始')
    time.sleep(1)
    print('任務(wù)二完成')

class MyThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            task = self.queue.get()   #任務(wù),子線程作為一個(gè)消費(fèi)者
            print('拿到了:',task)    #獲取任務(wù)
            task()    #執(zhí)行任務(wù)

q = queue.Queue(5)  #創(chuàng)建隊(duì)列
thread = MyThread(q)
thread.start()
q.put(task_1)   #主線程充當(dāng)生產(chǎn)者
q.put(task_2)

運(yùn)行結(jié)果:
image.png

上述代碼中,因?yàn)殛?duì)列只是給子進(jìn)程用,所以可以放到類的初始化方法中,put也可以給子進(jìn)程中去生產(chǎn),所以可改寫成


def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2():
    print('任務(wù)二開(kāi)始')
    time.sleep(1)
    print('任務(wù)二完成')

class MyThread(Thread):
    def __init__(self):
        super().__init__()  
        self.queue = queue.Queue(5)   #創(chuàng)建隊(duì)列

    def run(self):
        while True:
            task = self.queue.get()   #任務(wù),子線程作為一個(gè)消費(fèi)者
            print('拿到了:',task)    #獲取任務(wù)
            task()    #執(zhí)行任務(wù)

    def apply_async(self,task):
         self.queue.put(task)  #生產(chǎn)者


thread = MyThread()
thread.start()
thread.apply_async(task_1)   #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2)

運(yùn)行結(jié)果同上。

但是執(zhí)行完還沒(méi)結(jié)束,一直停留在那,需要開(kāi)啟守護(hù)模式。
但是注意設(shè)置了守護(hù)模式super().__init__(daemon=True) #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死運(yùn)行是沒(méi)數(shù)據(jù)的

image.png

這是因?yàn)橹鬟M(jìn)程put完之后,沒(méi)有其他需要執(zhí)行的了。
所以需要進(jìn)行阻塞,利用join進(jìn)程阻塞,使得子進(jìn)程執(zhí)行完,主進(jìn)程再繼續(xù)執(zhí)行,在類中重寫join方法


def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2():
    print('任務(wù)二開(kāi)始')
    time.sleep(1)
    print('任務(wù)二完成')

class MyThread(Thread):
    def __init__(self):
        super().__init__(daemon=True)  #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死
        self.queue = queue.Queue(5)   #創(chuàng)建隊(duì)列

    def run(self):
        while True:
            task = self.queue.get()   #任務(wù),子線程作為一個(gè)消費(fèi)者
            print('拿到了:',task)    #獲取任務(wù)
            task()    #執(zhí)行任務(wù)
            self.queue.task_done()      #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了

    def apply_async(self, task):
         self.queue.put(task)  #生產(chǎn)者


    def join(self):
        self.queue.join()     #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作

'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''

thread = MyThread()
thread.start()
thread.apply_async(task_1)   #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2)
thread.join()    #因?yàn)槿绻O(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死,所以這里調(diào)用join等子進(jìn)程結(jié)束之后,主進(jìn)程再繼續(xù)執(zhí)行

運(yùn)行結(jié)果:
image.png

現(xiàn)在可以正常結(jié)束了。
傳參數(shù)形式:


def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2(a,b):
    print('任務(wù)二開(kāi)始')
    print(a,b)
    time.sleep(1)
    print('任務(wù)二完成')

class MyThread(Thread):
    def __init__(self):
        super().__init__(daemon=True)  #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死
        self.queue = queue.Queue(5)   #創(chuàng)建隊(duì)列

    def run(self):
        while True:
            task, args, kwargs = self.queue.get()   #任務(wù),子線程作為一個(gè)消費(fèi)者
            print('拿到了:',task)    #獲取任務(wù)
            task(*args, **kwargs)    #執(zhí)行任務(wù)
            self.queue.task_done()      #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了

    def apply_async(self, task, *args, **kwargs):
        self.queue.put((task, args, kwargs))  #生產(chǎn)者

    def join(self):
        self.queue.join()     #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作

'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''

thread = MyThread()
thread.start()
thread.apply_async(task_1)   #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2,2,3)
thread.join()    #因?yàn)槿绻O(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死,所以這里調(diào)用join等子進(jìn)程結(jié)束之后,主進(jìn)程再繼續(xù)執(zhí)行

運(yùn)行結(jié)果:
image.png

主線程: 相當(dāng)于生產(chǎn)者,只管向線程池提交任務(wù)。并不關(guān)心線程池是如何執(zhí)行任務(wù)的。因此,并不關(guān)心是哪一個(gè)線程執(zhí)行的這個(gè)任務(wù)。

線程池: 相當(dāng)于消費(fèi)者,負(fù)責(zé)接收任務(wù),并將任務(wù)分配到一個(gè)空閑的線程中去執(zhí)行。


image.png

池簡(jiǎn)單實(shí)現(xiàn)


def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2(a,b):
    print('任務(wù)二開(kāi)始')
    print(a,b)
    time.sleep(1)
    print('任務(wù)二完成')

class MyThread():
    def __init__(self, num):
        self.queue = queue.Queue()   #創(chuàng)建隊(duì)列
        for i in range(num):
            Thread(target=self.run,daemon=True).start()

    def run(self):
        while True:
            task, args, kwargs = self.queue.get()   #任務(wù),子線程作為一個(gè)消費(fèi)者
            print('拿到了:',task)    #獲取任務(wù)
            task(*args, **kwargs)    #執(zhí)行任務(wù)
            self.queue.task_done()      #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了

    def apply_async(self, task, *args, **kwargs):
        self.queue.put((task, args, kwargs))  #生產(chǎn)者

    def join(self):
        self.queue.join()     #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作

'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''
thread = MyThread(5)
thread.apply_async(task_1)
thread.apply_async(task_2,2,3)
print('任務(wù)提交完成')
thread.join()
print('任務(wù)完成')

運(yùn)行結(jié)果:


image.png

內(nèi)置線程池


from multiprocessing.pool import ThreadPool   #線程池

print('-----------內(nèi)置線程池-----------')

def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2(*args,**kwargs):
    print('任務(wù)二開(kāi)始')
    print(args, kwargs)
    time.sleep(1)
    print('任務(wù)二完成')

pool = ThreadPool(2)   #使用內(nèi)置的pool
pool.apply_async(task_1)   # 向池中提交任務(wù)
pool.apply_async(task_2, args=(2,3,4),kwds={'a':5,'b':6})
print('任務(wù)提交完成')
pool.close() #在join之前必須close,就不允許再提交任務(wù)了。
pool.join()
print('任務(wù)完成')
image.png

內(nèi)置進(jìn)程池


from multiprocessing import Pool   #進(jìn)程池

print('-----------內(nèi)置進(jìn)程池-----------')

def task_1():
    print('任務(wù)一開(kāi)始')
    time.sleep(1)
    print('任務(wù)一完成')

def task_2(*args,**kwargs):
    print('任務(wù)二開(kāi)始')
    print(args, kwargs)
    time.sleep(1)
    print('任務(wù)二完成')

pool = Pool(2)   #使用內(nèi)置的pool
pool.apply_async(task_1)   # 向池中提交任務(wù)
pool.apply_async(task_2, args=(2,3,4),kwds={'a':5,'b':6})
print('任務(wù)提交完成')
pool.close() #在join之前必須close,就不允許再提交任務(wù)了。
pool.join()
print('任務(wù)完成')

運(yùn)行結(jié)果:


image.png

使用線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器

from multiprocessing.pool import ThreadPool
import socket

server =socket.socket()

server.bind(('0.0.0.0',7001))
server.listen()

print('等待連接......')

def workon(conn):
    while True:
        data = conn.recv(1024)
        if data:
            print('接受數(shù)據(jù){}'.format(data.decode()))
            conn.send(data)
        else:
            conn.close()
            break

if __name__ == '__main__':
    pool = ThreadPool(5)
    while True:
        conn, addr = server.accept()
        print('來(lái)自{}的連接'.format(addr))
        pool.apply_async(workon,args=(conn,))

使用進(jìn)程池+線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器


from multiprocessing.pool import ThreadPool   #線程池
from multiprocessing import Pool, cpu_count        #進(jìn)程池
import socket

print('------------使用進(jìn)程池+線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器------------')


server =socket.socket()

server.bind(('0.0.0.0',7001))
server.listen()

print('等待連接......')

def workon_thread(conn):
    while True:
        data = conn.recv(1024)
        if data:
            print('接受數(shù)據(jù){}'.format(data.decode()))
            conn.send(data)
        else:
            conn.close()
            break

def workon_process(server):
    thread_pool = ThreadPool(cpu_count()*2)  #通常分配2倍個(gè)數(shù)的線程
    while True:
        conn, addr = server.accept()
        print('來(lái)自{}的連接'.format(addr))
        thread_pool.apply_async(workon_thread,args=(conn,))



if __name__ == '__main__':
    n = cpu_count()        #當(dāng)前計(jì)算機(jī)cpu核數(shù)
    process_pool = Pool(n)

    for i in range(n):   #充分利用cpu,為每一個(gè)cpu分配一個(gè)進(jìn)程
        process_pool.apply_async(workon_process,args=(server,))

process_pool.close()
process_pool.join()

客戶端:

import socket

client = socket.socket()

client.connect(('127.0.0.1', 7001))

while True:
    message = input('發(fā)送消息>>>')
    if message !='q':
        client.send(message.encode())
        data = client.recv(1024)
        print('接受的消息>>>{}'.format(data.decode()))

    else:
        print('close client socket')
        client.close()
        break

運(yùn)行結(jié)果:


image.png

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進(jìn)行...
    月亮是我踢彎得閱讀 6,144評(píng)論 3 28
  • 本文是我自己在秋招復(fù)習(xí)時(shí)的讀書(shū)筆記,整理的知識(shí)點(diǎn),也是為了防止忘記,尊重勞動(dòng)成果,轉(zhuǎn)載注明出處哦!如果你也喜歡,那...
    波波波先森閱讀 11,589評(píng)論 4 56
  • 1.內(nèi)存的頁(yè)面置換算法 (1)最佳置換算法(OPT)(理想置換算法):從主存中移出永遠(yuǎn)不再需要的頁(yè)面;如無(wú)這樣的...
    杰倫哎呦哎呦閱讀 3,577評(píng)論 1 9
  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對(duì)應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過(guò)程中...
    勝浩_ae28閱讀 5,256評(píng)論 0 23
  • 第三步 盡管我們的拆機(jī)工程師希望能盡快著手拆機(jī)工作,我們還是花了一點(diǎn)時(shí)間完整測(cè)試了下S pen。 第一印象:S P...
    Justin_Zhai閱讀 1,019評(píng)論 0 0

友情鏈接更多精彩內(nèi)容