import threading
import sys
#import time
import logging
#logging.basicConfig(level=logging.DEBUG,
# format='[%(asctime)s %(msecs)d %(module)15s %(name)10s %(funcName)15s %(levelname)s] %(message)s',
# datefmt = '%F %T')
#
#logging.debug('start')
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
class MyThreadPool:
def __init__(self, max_thread_num):
#thread array
self._max_thread_num = max_thread_num
self._total_thread_num = 0
#the flag for all threads to destroy
self._stop = False
#mutex
self._lock = threading.Lock()
#import contitions
self._cond_idle = threading.Condition(self._lock)
self._cond_full = threading.Condition(self._lock)
self._cond_empty = threading.Condition(self._lock)
#self._thread_list = []
def __del__(self):
logger.debug('MyThreadPool __del__')
def PushBack(self, athread):
self.Lock()
logger.debug('PushBack inner')
if not athread == None:
logger.debug('athread is not None')
self._thread_list.append(athread)
self.Notify('idle')
logger.debug('Notify idle')
#全部空閑,可以結(jié)束線程池
if len(self._thread_list) >= self._total_thread_num:
logger.debug('Notify full')
self.Notify('full')
self.UnLock()
def PopThread(self):
if not len(self._thread_list) == 0:
return self._thread_list.pop()
def join(self):
for athread in self._thread_list:
athread.join()
def Destroy(self):
logger.debug('call Destroy')
self.Lock()
#self._stop = True
#等待線程全部工作完畢
if len(self._thread_list) < self._total_thread_num:
self.Wait('full')
logger.debug('Wait full')
self._stop = True
logger.debug('self._stop = True')
#所有線程啟動(dòng)
logger.debug('len of _thread_list is: %d',len(self._thread_list))
for athread in self._thread_list:
athread.Lock()
athread.Notify()
athread.UnLock()
logger.debug('通知每一個(gè)線程結(jié)束完畢')
if self._total_thread_num > 0:
logger.debug('waiting to receive empty notify')
self.Wait('empty')
logger.debug('have received empty notify')
self.UnLock()
def DispatchTask(self, function = None, args_dict = None):
self.Lock()
#線程池線程個(gè)數(shù)達(dá)到最大值并且都在使用,此時(shí)等待
while(len(self._thread_list) <= 0 and self._total_thread_num >= self._max_thread_num):
logger.debug('waiting idle notify')
self.Wait('idle')
#有idle線程
if len(self._thread_list) > 0:
athread = self.PopThread()
athread.SetTask(function, args_dict)
athread.Lock()
athread.Notify()
athread.UnLock()
#create new thread
else:
athread = MyThread(self)
athread.SetTask(function, args_dict)
self._total_thread_num +=1
athread.start()
self.UnLock()
logger.debug('DispatchTask is over')
def Stop(self):
return self._stop
def Lock(self):
self._lock.acquire()
def UnLock(self):
self._lock.release()
def OneThreadFinish(self):
self.Lock()
self._total_thread_num -=1
if self._total_thread_num <= 0:
self.Notify('empty')
logger.debug('send emtpy Notify')
self.UnLock()
def Wait(self, who):
if who == 'full':
self._cond_full.wait()
elif who == 'idle':
self._cond_idle.wait()
elif who == 'empty':
self._cond_empty.wait()
else:
logging.critical('Faltal error!')
def Notify(self, who):
if who == 'full':
self._cond_full.notify()
elif who == 'idle':
self._cond_idle.notify()
elif who == 'empty':
self._cond_empty.notify()
else:
logging.critical('Fatal error!')
class MyThread(threading.Thread):
def __init__(self, thread_pool = None):
threading.Thread.__init__(self)
self.setDaemon(True)
#pool
self._thread_pool = thread_pool
#process function
self._task = None
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
def run(self):
#沒(méi)有使用線程池
if self._thread_pool == None:
self._task(self._args)
logger.debug('thread pool is None')
return
#
while self._thread_pool.Stop() == False:
if not self._task == None:
self._task(self._args)
#線程池停止工作
if self._thread_pool.Stop() == True:
break
self.Lock()
#加入到線程池
self._thread_pool.PushBack(self)
#等待被喚醒
self.Wait()
self.UnLock()
#線程池停止,線程結(jié)束
self._thread_pool.OneThreadFinish()
logger.debug('thread %d is finishing', self.ident)
def Lock(self):
self._lock.acquire()
def UnLock(self):
self._lock.release()
def Wait(self):
self._cond.wait()
def Notify(self):
self._cond.notify()
def SetTask(self, task_func, args_dict):
self._task = task_func
self._args = args_dict
def process(args):
logger.debug('the value of key 0 is :%s', args[0])
import time
if __name__ == '__main__':
threadpool = MyThreadPool(2)
i = 0
while i < 10:
threadpool.DispatchTask(process, {0: 'lmy'})
i+=1
threadpool.Destroy()
logger.debug('over')
python線程池多任務(wù)分發(fā)
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 前言:線程數(shù)量為1的FixedThreadPool,如果提交了多個(gè)任務(wù),那么這些任務(wù)將會(huì)排隊(duì),每個(gè)任務(wù)都會(huì)在下一個(gè)...
- 看了不少書(shū)和資料,自認(rèn)為對(duì)于 python 中的線程、進(jìn)程、協(xié)程等略知一二了。 想實(shí)現(xiàn)一個(gè)多線程池的模型,但是也不...
- 對(duì)多進(jìn)程的補(bǔ)充 對(duì)比下面兩段代碼 if name=='main':... print('Parent pr...
- threading是一個(gè)比較底層的api, 一般來(lái)說(shuō)不用這個(gè)包來(lái)創(chuàng)建多線程 1.直接創(chuàng)建多線程 執(zhí)行結(jié)果: 2.利...
- 本文將介紹如何通過(guò)添加擴(kuò)展的方式進(jìn)行Json代碼格式化操作: 1,為Sublime Text增加擴(kuò)展功能(安裝Su...