無標(biāo)題文章

1. Stackless Python簡介?

??

Stackless Python是CPython的增強版本,它實現(xiàn)了一個合作式多任務(wù)系統(tǒng)。和搶占式多任?

務(wù)系統(tǒng)不同,Stackless中的每個任務(wù)(tasklet)必須主動放棄對處理器的控制,才能實現(xiàn)?

多任務(wù)。?

??

嚴(yán)格地說,合作式多任務(wù)系統(tǒng)都可以通過簡單的循環(huán)實現(xiàn)。但是使用Stackless可以讓代碼?

更清楚,同時也讓程序員不必考慮復(fù)雜的同步控制,因為實際上實現(xiàn)合作式多任務(wù)系統(tǒng)的循?

環(huán)并不簡單。另一方面,這種合作式多任務(wù)系統(tǒng)的資源消耗比傳統(tǒng)線程小得多,加上?

Python有偉大的GIL,所以Stackless的效率還是很不錯的。?

??

Stackless Python的主頁是

http://www.stackless.com

,上面可以下載源代碼和預(yù)編譯的版?

本。同時也有不少文檔——多是介紹性質(zhì)的,缺少手冊。幸好,使用Stackless Python不需?

要厚厚的手冊,看了主頁上的入門加上help()就足夠了。Python的self-documenting就是好?

!下面有關(guān)Stackless的介紹可能尚不敷用,所以如果對Stackless Python感興趣最好還是?

啃啃主頁上的英文。?

??

1.1. tasklet & run?

??

Stackless中一個任務(wù)被稱作一個tasklet,可以這樣創(chuàng)建一個tasklet:?

stackless.tasklet(a_callable)(args)。之后處理這個任務(wù)就是運行a_callable(args)?

??

創(chuàng)建了一個或多個tasklet之后,可以用stackless.run()來運行這些任務(wù)。實際上可以認(rèn)為?

run()是進(jìn)入了多任務(wù)系統(tǒng),如果還有可運行的任務(wù)(沒有運行完或者阻塞在某個channel上?

),那么run()就會持續(xù)運行。?

??

1.2. schedule?

??

當(dāng)在一個tasklet中運行stackless.schedule(),當(dāng)前的任務(wù)就被掛起,并放在運行循環(huán)的?

最后。下面一個程序是一個死循環(huán):?

??

import stackless?

def idle():?

???? while True:?

???????? print 'idle'?

???????? stackless.schedule()?

stackless.tasklet(idle)()?

stackless.run()?

??

1.3. channel: send & receive?

??

channel是tasklet之間的同步方式,使用stackless.channel()就能建立一個通信用的?

channel。channel可以傳送任何Python對象。下面假設(shè)ch = stackless.channel()。?

??

ch.receive()返回從ch中收到的對象,如果ch中沒有對象,則該tasklet被阻塞。?

??

ch.send(obj)把obj送入ch,如果已經(jīng)有其他tasklet執(zhí)行了ch.receive(),則當(dāng)前tasklet?

停止運行并成為最后一個tasklet,而阻塞在ch.receive()的tasklet繼續(xù)運行。如果沒有,?

則當(dāng)前tasklet阻塞在ch.send(obj)。?

??

這又是一個死循環(huán):?

??

import stackless?

ch = stackless.channel()?

def ping():?

???? while True:?

???????? print ch.receive()?

???????? ch.send('ping')?

def pong():?

???? while True:?

???????? print ch.receive()?

???????? ch.send('pong')?

stackless.tasklet(ping)()?

stackless.tasklet(pong)()?

ch.send('start')?

stackless.run()?

??

2. 基于Stackless的多任務(wù):三種行為模式?

??

概念基本上來自Introduction to Concurrent Programming with Stackless Python,分別?

是:?

??

Daemon——每個周期都要運行。?

class Daemon(object):?

???? def __init__(self):?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? while True:?

???????????? self.run()?

???????????? stackless.schedule()?

??

Task——只運行一次。?

class Task(object):?

???? def __init__(self):?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? self.run()?

??

Handler——通過channel處理請求,沒有請求就阻塞。如果收到一個nil請求,則結(jié)束運行。?

class Handler(object):?

???? def __init__(self):?

???????? self.channel = stackless.channel()?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? is_alive = True?

???????? while is_alive:?

???????????? message = self.channel.receive()?

???????????? if message:?

???????????????? is_alive = self.run(message)?

???????????? else:?

???????????????? is_alive = False?

??

在游戲中,Daemon實現(xiàn)一些管理功能,每個Object可能都要有一個Handler與之對應(yīng)(反正?

大多數(shù)Handler都是阻塞的),但因為Handler模型需要分析收到的message,所以有的時候?

也可以用Task。?

??

3. 網(wǎng)絡(luò)連接?

??

網(wǎng)絡(luò)連接當(dāng)然要使用異步socket,我從Python In A Nutshell抄了一個asyncore+asynchat?

的例子,放在子線程里運行asyncore.loop()。我沒有用www.stackless.com提供的一個?

stacklesssocket,因為大概看了一下stacklesssocket.py,覺得好像有些問題。?

??

4. PyMud01?

??

代碼:實現(xiàn)了一個echo server,利用了Daemon-Handler-Task三級模型。?

network_thread通過conn_manager.queue和主線程通信,?

EchoDaemon給每個不同的連接建立一個EchoHandler,?

收到信息時發(fā)給相應(yīng)的EchoHandler,EchoHandler建立一個EchoTask。?

最后由EchoTask完成回顯。?

??

import stackless?

import asyncore, asynchat, socket?

import threading, Queue?

??

class Monitor(asyncore.dispatcher):?

???? def __init__(self, conn_manager, addr='', port=9000):?

???????? asyncore.dispatcher.__init__(self)?

???????? self.create_socket(socket.AF_INET, socket.SOCK_STREAM)?

???????? self.bind((addr, port))?

???????? self.listen(5)?

???????? self.conn_manager = conn_manager?

??????????

???? def handle_accept(self):?

???????? conn, addr_port = self.accept()?

???????? self.conn_manager.Create(conn, addr_port)?

??

class Connection(asynchat.async_chat):?

???? def __init__(self, conn, addr_port, conn_manager):?

???????? asynchat.async_chat.__init__(self, conn)?

???????? self.set_terminator('\r\n')?

???????? self.addr_port = addr_port?

???????? self.data_pieces = []?

???????? self.conn_manager = conn_manager?

??????????

???? def collect_incoming_data(self, data):?

???????? self.data_pieces.append(data)?

??

???? def found_terminator(self):?

???????? self.conn_manager.Receive(self.addr_port, ''.join(self.data_pieces))?

???????? self.data_pieces = []?

??

???? def handle_close(self):?

???????? self.conn_manager.Close(self.addr_port)?

???????? self.close()?

??????????

class ConnectionManager(object):?

???? def __init__(self):?

???????? self.queue = Queue.Queue()?

???????? self.conn_table = {}?

??????????

???? def Create(self, conn, addr_port):?

???????? self.conn_table[addr_port] = Connection(conn, addr_port, self)?

??

???? def Receive(self, addr_port, data):?

???????? self.queue.put_nowait((addr_port, data))?

??

???? def Send(self, addr_port, data):?

???????? self.conn_table[addr_port].push(data)?

??

???? def Close(self, addr_port):?

???????? self.conn_table[addr_port] = None?

??

???? def IsConnect(self, addr_port):?

???????? if self.conn_table.has_key(addr_port):?

???????????? return bool(self.conn_table[addr_port])?

???????? else:?

???????????? return False?

??????????

conn_manager = ConnectionManager()?

??

class NetworkThread(threading.Thread):?

???? def run(self):?

???????? Monitor(conn_manager)?

???????? asyncore.loop()?

??????

network_thread = NetworkThread()?

network_thread.start()?

??

class Daemon(object):?

???? def __init__(self):?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? while True:?

???????????? self.run()?

???????????? stackless.schedule()?

??

class Task(object):?

???? def __init__(self):?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? self.run()?

??

class Handler(object):?

???? def __init__(self):?

???????? self.channel = stackless.channel()?

???????? stackless.tasklet(self)()?

??

???? def __call__(self):?

???????? is_alive = True?

???????? while is_alive:?

???????????? message = self.channel.receive()?

???????????? if message:?

???????????????? is_alive = self.run(message)?

???????????? else:?

???????????????? is_alive = False?

??

class EchoTask(Task):?

???? def __init__(self, addr_port, data):?

???????? Task.__init__(self)?

???????? self.addr_port = addr_port?

???????? self.data = data?

??

???? def run(self):?

???????? print 'task:', stackless.getruncount(), self.addr_port, self.data?

???????? conn_manager.Send(self.addr_port, self.data+'\r\n')?

??

class EchoHandler(Handler):?

???? def run(self, message):?

???????? print 'handler:', stackless.getruncount(), message?

???????? EchoTask(message[0], message[1])?

???????? return True?

??????

class EchoDaemon(Daemon):?

???? def __init__(self):?

???????? Daemon.__init__(self)?

???????? self.handler_table = {}?

??????????

???? def run(self):?

???????? for key in self.handler_table.keys():?

???????????? if not conn_manager.IsConnect(key):?

???????????????? self.handler_table[key].channel.send(None)?

???????????????? del self.handler_table[key]?

???????? try:?

???????????? message = conn_manager.queue.get_nowait()?

???????? except Queue.Empty:?

???????????? return?

???????? print 'daemon:', message?

??????????

???????? if not self.handler_table.has_key(message[0]):?

???????????? self.handler_table[message[0]] = EchoHandler()?

???????????? print 'daemon:', len(self.handler_table)?

??

???????? self.handler_table[message[0]].channel.send(message)?

??

EchoDaemon()?

??

stackless.run()?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容