先看個(gè)例子:
from threading import Thread, current_thread
from queue import Queue
import time
def task_1():
time.sleep(1)
print('任務(wù)一')
def task_2():
time.sleep(1)
print('任務(wù)二')
thread = Thread(target=task_1)
thread.start()
thread.join() #加入join讓子進(jìn)程執(zhí)行完,主進(jìn)程再繼續(xù)往下執(zhí)行
thread.start()

image.png
線程結(jié)束之后就不能再重復(fù)使用了。那要執(zhí)行2個(gè)任務(wù),我們可以創(chuàng)建2個(gè)線程各執(zhí)行一個(gè)任務(wù),但是我們需要節(jié)約資源,那如何只讓一個(gè)線程執(zhí)行幾個(gè)任務(wù)呢,即如何重復(fù)使用呢?
我們可以利用生產(chǎn)者消費(fèi)者模型來(lái)
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2():
print('任務(wù)二開(kāi)始')
time.sleep(1)
print('任務(wù)二完成')
class MyThread(Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
task = self.queue.get() #任務(wù),子線程作為一個(gè)消費(fèi)者
print('拿到了:',task) #獲取任務(wù)
task() #執(zhí)行任務(wù)
q = queue.Queue(5) #創(chuàng)建隊(duì)列
thread = MyThread(q)
thread.start()
q.put(task_1) #主線程充當(dāng)生產(chǎn)者
q.put(task_2)
運(yùn)行結(jié)果:
image.png
上述代碼中,因?yàn)殛?duì)列只是給子進(jìn)程用,所以可以放到類的初始化方法中,put也可以給子進(jìn)程中去生產(chǎn),所以可改寫成
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2():
print('任務(wù)二開(kāi)始')
time.sleep(1)
print('任務(wù)二完成')
class MyThread(Thread):
def __init__(self):
super().__init__()
self.queue = queue.Queue(5) #創(chuàng)建隊(duì)列
def run(self):
while True:
task = self.queue.get() #任務(wù),子線程作為一個(gè)消費(fèi)者
print('拿到了:',task) #獲取任務(wù)
task() #執(zhí)行任務(wù)
def apply_async(self,task):
self.queue.put(task) #生產(chǎn)者
thread = MyThread()
thread.start()
thread.apply_async(task_1) #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2)
運(yùn)行結(jié)果同上。
但是執(zhí)行完還沒(méi)結(jié)束,一直停留在那,需要開(kāi)啟守護(hù)模式。
但是注意設(shè)置了守護(hù)模式super().__init__(daemon=True) #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死運(yùn)行是沒(méi)數(shù)據(jù)的

image.png
這是因?yàn)橹鬟M(jìn)程put完之后,沒(méi)有其他需要執(zhí)行的了。
所以需要進(jìn)行阻塞,利用join進(jìn)程阻塞,使得子進(jìn)程執(zhí)行完,主進(jìn)程再繼續(xù)執(zhí)行,在類中重寫join方法
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2():
print('任務(wù)二開(kāi)始')
time.sleep(1)
print('任務(wù)二完成')
class MyThread(Thread):
def __init__(self):
super().__init__(daemon=True) #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死
self.queue = queue.Queue(5) #創(chuàng)建隊(duì)列
def run(self):
while True:
task = self.queue.get() #任務(wù),子線程作為一個(gè)消費(fèi)者
print('拿到了:',task) #獲取任務(wù)
task() #執(zhí)行任務(wù)
self.queue.task_done() #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了
def apply_async(self, task):
self.queue.put(task) #生產(chǎn)者
def join(self):
self.queue.join() #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作
'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''
thread = MyThread()
thread.start()
thread.apply_async(task_1) #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2)
thread.join() #因?yàn)槿绻O(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死,所以這里調(diào)用join等子進(jìn)程結(jié)束之后,主進(jìn)程再繼續(xù)執(zhí)行
運(yùn)行結(jié)果:
image.png
現(xiàn)在可以正常結(jié)束了。
傳參數(shù)形式:
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2(a,b):
print('任務(wù)二開(kāi)始')
print(a,b)
time.sleep(1)
print('任務(wù)二完成')
class MyThread(Thread):
def __init__(self):
super().__init__(daemon=True) #設(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死
self.queue = queue.Queue(5) #創(chuàng)建隊(duì)列
def run(self):
while True:
task, args, kwargs = self.queue.get() #任務(wù),子線程作為一個(gè)消費(fèi)者
print('拿到了:',task) #獲取任務(wù)
task(*args, **kwargs) #執(zhí)行任務(wù)
self.queue.task_done() #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了
def apply_async(self, task, *args, **kwargs):
self.queue.put((task, args, kwargs)) #生產(chǎn)者
def join(self):
self.queue.join() #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作
'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''
thread = MyThread()
thread.start()
thread.apply_async(task_1) #start調(diào)用的只是線程的run方法,所以生產(chǎn)還需要再去調(diào)用方法
thread.apply_async(task_2,2,3)
thread.join() #因?yàn)槿绻O(shè)置守護(hù)模式,主進(jìn)程結(jié)束,會(huì)把守護(hù)進(jìn)程殺死,所以這里調(diào)用join等子進(jìn)程結(jié)束之后,主進(jìn)程再繼續(xù)執(zhí)行
運(yùn)行結(jié)果:
image.png
池
主線程: 相當(dāng)于生產(chǎn)者,只管向線程池提交任務(wù)。并不關(guān)心線程池是如何執(zhí)行任務(wù)的。因此,并不關(guān)心是哪一個(gè)線程執(zhí)行的這個(gè)任務(wù)。
線程池: 相當(dāng)于消費(fèi)者,負(fù)責(zé)接收任務(wù),并將任務(wù)分配到一個(gè)空閑的線程中去執(zhí)行。

image.png
池簡(jiǎn)單實(shí)現(xiàn)
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2(a,b):
print('任務(wù)二開(kāi)始')
print(a,b)
time.sleep(1)
print('任務(wù)二完成')
class MyThread():
def __init__(self, num):
self.queue = queue.Queue() #創(chuàng)建隊(duì)列
for i in range(num):
Thread(target=self.run,daemon=True).start()
def run(self):
while True:
task, args, kwargs = self.queue.get() #任務(wù),子線程作為一個(gè)消費(fèi)者
print('拿到了:',task) #獲取任務(wù)
task(*args, **kwargs) #執(zhí)行任務(wù)
self.queue.task_done() #讓join計(jì)數(shù)器-1,表明當(dāng)前的資源處理完了
def apply_async(self, task, *args, **kwargs):
self.queue.put((task, args, kwargs)) #生產(chǎn)者
def join(self):
self.queue.join() #等待所有的隊(duì)列資源都用完,等到隊(duì)列為空,再執(zhí)行別的操作
'''
如果線程里每從隊(duì)列里取一次,但沒(méi)有執(zhí)行task_done(),則join無(wú)法判斷隊(duì)列到底有沒(méi)有結(jié)束,
在最后執(zhí)行個(gè)join()是等不到結(jié)果的,會(huì)一直掛起??梢岳斫鉃?,每task_done一次 就從隊(duì)列里刪掉一個(gè)元素,
這樣在最后join的時(shí)候根據(jù)隊(duì)列長(zhǎng)度是否為零來(lái)判斷隊(duì)列是否結(jié)束,從而執(zhí)行主線程。
'''
thread = MyThread(5)
thread.apply_async(task_1)
thread.apply_async(task_2,2,3)
print('任務(wù)提交完成')
thread.join()
print('任務(wù)完成')
運(yùn)行結(jié)果:

image.png
內(nèi)置線程池
from multiprocessing.pool import ThreadPool #線程池
print('-----------內(nèi)置線程池-----------')
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2(*args,**kwargs):
print('任務(wù)二開(kāi)始')
print(args, kwargs)
time.sleep(1)
print('任務(wù)二完成')
pool = ThreadPool(2) #使用內(nèi)置的pool
pool.apply_async(task_1) # 向池中提交任務(wù)
pool.apply_async(task_2, args=(2,3,4),kwds={'a':5,'b':6})
print('任務(wù)提交完成')
pool.close() #在join之前必須close,就不允許再提交任務(wù)了。
pool.join()
print('任務(wù)完成')

image.png
內(nèi)置進(jìn)程池
from multiprocessing import Pool #進(jìn)程池
print('-----------內(nèi)置進(jìn)程池-----------')
def task_1():
print('任務(wù)一開(kāi)始')
time.sleep(1)
print('任務(wù)一完成')
def task_2(*args,**kwargs):
print('任務(wù)二開(kāi)始')
print(args, kwargs)
time.sleep(1)
print('任務(wù)二完成')
pool = Pool(2) #使用內(nèi)置的pool
pool.apply_async(task_1) # 向池中提交任務(wù)
pool.apply_async(task_2, args=(2,3,4),kwds={'a':5,'b':6})
print('任務(wù)提交完成')
pool.close() #在join之前必須close,就不允許再提交任務(wù)了。
pool.join()
print('任務(wù)完成')
運(yùn)行結(jié)果:

image.png
使用線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器
from multiprocessing.pool import ThreadPool
import socket
server =socket.socket()
server.bind(('0.0.0.0',7001))
server.listen()
print('等待連接......')
def workon(conn):
while True:
data = conn.recv(1024)
if data:
print('接受數(shù)據(jù){}'.format(data.decode()))
conn.send(data)
else:
conn.close()
break
if __name__ == '__main__':
pool = ThreadPool(5)
while True:
conn, addr = server.accept()
print('來(lái)自{}的連接'.format(addr))
pool.apply_async(workon,args=(conn,))
使用進(jìn)程池+線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器
from multiprocessing.pool import ThreadPool #線程池
from multiprocessing import Pool, cpu_count #進(jìn)程池
import socket
print('------------使用進(jìn)程池+線程池來(lái)實(shí)現(xiàn)并發(fā)服務(wù)器------------')
server =socket.socket()
server.bind(('0.0.0.0',7001))
server.listen()
print('等待連接......')
def workon_thread(conn):
while True:
data = conn.recv(1024)
if data:
print('接受數(shù)據(jù){}'.format(data.decode()))
conn.send(data)
else:
conn.close()
break
def workon_process(server):
thread_pool = ThreadPool(cpu_count()*2) #通常分配2倍個(gè)數(shù)的線程
while True:
conn, addr = server.accept()
print('來(lái)自{}的連接'.format(addr))
thread_pool.apply_async(workon_thread,args=(conn,))
if __name__ == '__main__':
n = cpu_count() #當(dāng)前計(jì)算機(jī)cpu核數(shù)
process_pool = Pool(n)
for i in range(n): #充分利用cpu,為每一個(gè)cpu分配一個(gè)進(jìn)程
process_pool.apply_async(workon_process,args=(server,))
process_pool.close()
process_pool.join()
客戶端:
import socket
client = socket.socket()
client.connect(('127.0.0.1', 7001))
while True:
message = input('發(fā)送消息>>>')
if message !='q':
client.send(message.encode())
data = client.recv(1024)
print('接受的消息>>>{}'.format(data.decode()))
else:
print('close client socket')
client.close()
break
運(yùn)行結(jié)果:

image.png

image.png