Tornado是異步非阻塞Web框架,能抗住每秒可以處理數(shù)以千計的連接。背后使用了Epoll,是一個高性能Web框架。
但是在使用Tornado作為框架來做業(yè)務(wù)邏輯編寫,就會發(fā)現(xiàn),雖然能抗住數(shù)以千計的連接,Tornado只有一個主線程。而編寫業(yè)務(wù)肯定需要有耗時的過程邏輯,比如數(shù)據(jù)庫的操作。這個時候,單線程的Tornado就特別容易阻塞在耗時邏輯上。
要想很好的適用業(yè)務(wù)邏輯,我的設(shè)計是基于Tornado引入線程池,并且能方便操作數(shù)據(jù)庫,對Sqlalchemy包裝一下。目前這種設(shè)計經(jīng)歷了1年多的項目運(yùn)行,穩(wěn)定運(yùn)行。特別適合高并發(fā)不是特別要求高的業(yè)務(wù)情況。
多線程裝飾器
建立線程池很簡單,編寫裝飾器 @thread_executor,這樣就可以很輕易的掛載裝飾器來把業(yè)務(wù)邏輯壓入線程池中運(yùn)行。
executor = ThreadPoolExecutor(8)
def thread_executor(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
future = executor.submit(fn, self, *args, **kwargs)
return future
return wrapper
多線程帶Session數(shù)據(jù)庫操作裝飾器
sqlalchemy_session = sessionmaker(bind=engine)
def thread_db_session_executor(fn):
"""
fn 是一個方法, @thread_db_session_executor 裝飾后,將自動轉(zhuǎn)入線程池里面運(yùn)行。
fn 的參數(shù)會自動追加一個session,為sqlalchemy的DB數(shù)據(jù)操作。 例如 原為fn(a,b) 會變成fn(a,b,session)
fn 運(yùn)行完成后 session自動關(guān)閉,請不要再fn內(nèi)關(guān)閉。
如果 fn 運(yùn)行異常,數(shù)據(jù)庫會自動回滾,fn返回值變成異常對象。
"""
@thread_executor
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
start_time = time.time()
try:
session = sqlalchemy_session()
new_fn = functools.partial(fn, session=session)
result = new_fn(self, *args, **kwargs)
except Exception as e:
try:
logging.exception(
"\n%s \n--function-->>\n%s -args: %s \n%s -kwargs: %s\n<<--function--\n"
% (str(e), fn.__name__, str(args), fn.__name__, str(kwargs))
)
session.rollback()
except Exception as e2:
pass
result = e
finally:
session.close()
return result
return wrapper
封裝Tornado的RequestHandler(json版本)
class SimpleThreadDBSessionHandler(tornado.web.RequestHandler):
"""
SimpleThreadDBSessionHandler 實例化后,會在線程池里面獲得一個線程,并且獲得DB的一個session(操作數(shù)據(jù)庫)。
重載 on_post, on_get 來實現(xiàn)業(yè)務(wù),方法執(zhí)行線程安全。執(zhí)行完成后,線程自動回收,session自動關(guān)閉。
post request body 是json格式
post get response body 是json
返回結(jié)果 也是 json
"""
@tornado.web.asynchronous
@tornado.gen.coroutine
def post(self):
try:
post_body_data = self.measure_post_body()
result = yield self._on_post(post_body_data)
except Exception as e:
result = e
print e
if isinstance(result, Exception) or result is None:
exception_response_result = self.exception_response(result)
if exception_response_result is None:
raise result
else:
result = exception_response_result
self.write(self.measure_response_body(result))
self.finish()
@thread_db_session_executor
def _on_post(self, post_body_data, session):
return self.on_post(post_body_data, session)
def on_post(self, post_body_data, session):
"""
請重載此函數(shù)處理
"""
raise tornado.web.HTTPError(405)
def measure_post_body(self):
"""
處理 request body的數(shù)據(jù) 轉(zhuǎn)成json格式字典
:return:
"""
return utils_json_string_to_dict(self.request.body.decode('utf-8'))
通過以上的編寫,在編寫業(yè)務(wù)邏輯的時候,使用就特別方便。
class TestHandler(SimpleThreadDBSessionHandler):
def on_post(self, post_body_data, session):
user = session.query(User).filter(User.id == 1).first()
return {
'user_name': user.nickname
}
業(yè)務(wù)邏輯(POST) 只要在def on_post(self, post_body_data, session)里面實現(xiàn)就可以了,
on_post 帶有一個操作數(shù)據(jù)庫的session,并且通過使用@thread_db_session_executor的,on_post的邏輯就放入線程池里面運(yùn)行,完成以后再返回到主線程里面返回。
就算中間發(fā)生異常,異常也會被捕捉,數(shù)據(jù)庫回滾。