一、app.run() 在做什么?
執(zhí)行 app.run() 便啟動(dòng)了 Flask 服務(wù),這個(gè)服務(wù)為什么能夠監(jiān)聽 http 請(qǐng)求并做出響應(yīng)?讓我們進(jìn)入 run 函數(shù)內(nèi)部一探究竟。
def run(self, host='localhost', port=5000, **options):
from werkzeug import run_simple
if 'debug' in options:
self.debug = options.pop('debug')
options.setdefault('use_reloader', self.debug)
options.setdefault('use_debugger', self.debug)
return run_simple(host, port, self, **options)
可以看到,run 函數(shù)3-6行做了些參數(shù)默認(rèn)值設(shè)置,最后將參數(shù)傳入 run_simple 并調(diào)用返回,注意,第3個(gè)參數(shù)是 Flask 對(duì)象(留意 Flask 對(duì)象的傳遞)。run_simple 是從 werkzeug 導(dǎo)入的。
def run_simple(hostname, port, application, use_reloader=False,
use_debugger=False, use_evalex=True,
extra_files=None, reloader_interval=1, threaded=False,
processes=1, request_handler=None, static_files=None,
passthrough_errors=False, ssl_context=None):
...
def inner():
make_server(hostname, port, application, threaded,
processes, request_handler,
passthrough_errors, ssl_context).serve_forever()
...
if use_reloader:
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind((hostname, port))
test_socket.close()
run_with_reloader(inner, extra_files, reloader_interval)
else:
inner()
run_simple() 最終調(diào)用 inner(),這個(gè)函數(shù)做了兩件事:
- 構(gòu)造一個(gè)服務(wù):
make_server() - 啟動(dòng)服務(wù):
.serve_forever()
def make_server(host, port, app=None, threaded=False, processes=1,
request_handler=None, passthrough_errors=False,
ssl_context=None):
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and "
"multi process server.")
elif threaded:
return ThreadedWSGIServer(host, port, app, request_handler,
passthrough_errors, ssl_context)
elif processes > 1:
return ForkingWSGIServer(host, port, app, processes, request_handler,
passthrough_errors, ssl_context)
else:
return BaseWSGIServer(host, port, app, request_handler,
passthrough_errors, ssl_context)
make_server() 返回一個(gè) BaseWSGIServer 對(duì)象。
class BaseWSGIServer(HTTPServer, object):
...
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
...
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
...
BaseWSGIServer 繼承 HTTPServer,HTTPServer 繼承來自 Python 標(biāo)準(zhǔn)庫 SocketServer.TCPServer 類,其最終繼承自 SocketServer.BaseServer。注意 Flask 對(duì)象被賦值給 self.app。
BaseServer 類有一個(gè) serve_forever 方法:
def serve_forever(self, poll_interval=0.5):
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self.__shutdown_request:
break
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
其中有一個(gè) while 循環(huán),在不斷執(zhí)行:
if ready:
self._handle_request_noblock()
def _handle_request_noblock(self):
...
self.process_request(request, client_address)
...
_handle_request_noblock() 調(diào)用 process_request(),這是處理請(qǐng)求的函數(shù),這個(gè)函數(shù)實(shí)例化 self.RequestHandlerClass 類來處理請(qǐng)求,這個(gè)類是 BaseServer 初始化時(shí)傳入的參數(shù)。
到此我們知道,app.run() 實(shí)際上實(shí)例化了一個(gè) socketserver.BaseServer 類,并調(diào)用該類的 server_forever 方法,這個(gè)方法主體是一個(gè) while 循環(huán),最終在不斷實(shí)例化 self.RequestHandlerClass 來處理請(qǐng)求,在不中斷 while 的情況下,程序會(huì)一直運(yùn)行,不斷接收請(qǐng)求,處理請(qǐng)求。
二、Flask 如何收到請(qǐng)求?
while 程序在不斷監(jiān)聽請(qǐng)求,當(dāng)接收到請(qǐng)求時(shí),實(shí)例化 self.RequestHandlerClass 來處理請(qǐng)求。這個(gè)變量在 werkzeug.BaseWSGIServer 的 __init__ 方法中被賦值(以下第10行):
class BaseWSGIServer(HTTPServer, object):
multithread = False
multiprocess = False
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
HTTPServer.__init__(self, (host, int(port)), handler)
默認(rèn)值為 werkzeug.WSGIRequestHandler,這個(gè)類最終繼承自 SocketServer.BaseRequestHandler,也就是說isinstance(self.RequestHandlerClass, SocketServer.BaseRequestHandler)。
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
SocketServer.BaseRequestHandler 實(shí)例化時(shí)調(diào)用 self.handle 方法。注意,此時(shí) Flask 對(duì)象存在于 self.server.app 中。werkzeug.WSGIRequestHandler 將這個(gè)方法覆寫:
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
def handle(self):
try:
return BaseHTTPRequestHandler.handle(self)
except (socket.error, socket.timeout), e:
self.connection_dropped(e)
except:
if self.server.ssl_context is None or not is_ssl_error():
raise
def handle_one_request(self):
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
elif self.parse_request():
return self.run_wsgi()
class BaseHTTPRequestHandler(object):
def handle(self):
self.close_connection = 1
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
覆寫的方法調(diào)用 BaseHTTPRequestHandler.handle(self),內(nèi)部調(diào)用了 self.handle_one_request 方法,最終調(diào)用了 WSGIRequestHandler.run_wsgi 方法:
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
def run_wsgi(self):
app = self.server.app
environ = self.make_environ()
...
def execute(app):
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
# make sure the headers are sent
if not headers_sent:
write('')
finally:
if hasattr(application_iter, 'close'):
application_iter.close()
application_iter = None
...
try:
execute(app)
except (socket.error, socket.timeout), e:
...
run_wsgi 方法第一行即取出 Flask 對(duì)象,然后將其傳入 execute 函數(shù)并調(diào)用,execute 第一行 application_iter = app(environ, start_response),這是在調(diào)用 Flask.__call__ 方法。
class Flask(object):
def wsgi_app(self, environ, start_response):
with self.request_context(environ):
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
response = self.make_response(rv)
response = self.process_response(response)
return response(environ, start_response)
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
至此,請(qǐng)求被傳到了 Flask 對(duì)象中處理,或者說 Flask 收到了請(qǐng)求。
三、Flask 如何處理請(qǐng)求?
Web 服務(wù)器把 environ, start_response 兩個(gè)參數(shù)傳入 Flask.__call__ 處理,正常處理完后將 Flask.__call__ 返回的數(shù)據(jù)寫入響應(yīng)體中。Flask 處理請(qǐng)求其實(shí)是接收這兩個(gè)參數(shù)并返回?cái)?shù)據(jù)。
class Flask(object):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
with self.request_context(environ):
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
response = self.make_response(rv)
response = self.process_response(response)
return response(environ, start_response)
Flask.__call__ 調(diào)用 Flask.wsgi_app 并返回,不將處理邏輯直接實(shí)現(xiàn)在 Flask.__call__ 里而封裝在 Flask.wsgi_app里,是為了能應(yīng)用中間件,例如:app.wsgi_app = MyMiddleware(app.wsgi_app)。
Flask.wsgi_app 詳解:
第8行(
self.preprocess_request()):執(zhí)行處理請(qǐng)求前的某些處理。即執(zhí)行所有被before_request裝飾的函數(shù),如果返回值非空,則不進(jìn)行真正的請(qǐng)求處理。第10行(
self.dispatch_request()):分發(fā)處理請(qǐng)求。將 URL 與對(duì)應(yīng)的處理函數(shù)匹配,執(zhí)行處理。第11行(
self.make_response()):將返回值轉(zhuǎn)換為響應(yīng)對(duì)象。第12行(
self.process_response()):執(zhí)行處理請(qǐng)求后的某些處理。即執(zhí)行所有被after_request裝飾的函數(shù)。第13行(
response(environ, start_response)):返回一個(gè)可迭代對(duì)象。
FLask 先調(diào)用 Flask.preprocess_request 處理請(qǐng)求,再調(diào)用與 URL 對(duì)應(yīng)的函數(shù)處理(注意,如果 Flask.preprocess_request 返回值非空,則跳過與 URL 對(duì)應(yīng)的處理函數(shù)),然后把返回值轉(zhuǎn)換為響應(yīng)對(duì)象,最后調(diào)用 Flask.process_response 處理。
3.1 請(qǐng)求前置處理
class Flask(object):
def before_request(self, f):
self.before_request_funcs.append(f)
return f
def preprocess_request(self):
for func in self.before_request_funcs:
rv = func()
if rv is not None:
return rv
Flask.before_request 是一個(gè)裝飾器,會(huì)把被裝飾的函數(shù)添加到列表 self.before_request_funcs 中, self.preprocess_request 遍歷 self.before_request_funcs,執(zhí)行所有函數(shù),如果執(zhí)行的函數(shù)返回值非空,則返回。
3.2 請(qǐng)求處理
class Flask(object):
def match_request(self):
rv = _request_ctx_stack.top.url_adapter.match()
request.endpoint, request.view_args = rv
return rv
def dispatch_request(self):
try:
endpoint, values = self.match_request()
return self.view_functions[endpoint](**values)
except HTTPException, e:
handler = self.error_handlers.get(e.code)
if handler is None:
return e
return handler(e)
except Exception, e:
handler = self.error_handlers.get(500)
if self.debug or handler is None:
raise
return handler(e)
match_request 中調(diào)用的 _request_ctx_stack.top.url_adapter.match(),是 _RequestContext.url_adapter.match()。
class _RequestContext(object):
def __init__(self, app, environ):
self.app = app
self.url_adapter = app.url_map.bind_to_environ(environ)
_RequestContext.url_adapter 是 Flask 對(duì)象中 url_map.bind_to_environ(environ) 返回的值,Map.bin_to_environ 返回 MapAdapter 對(duì)象。
MapAdapter.match() 會(huì)根據(jù) URL 與 請(qǐng)求方法(GET、POST 等)(URL、請(qǐng)求方法等信息會(huì)從 environ 中獲?。┓祷禺?dāng)前請(qǐng)求的 endpoint 與 參數(shù)。
# eg
>>> urls.match("/downloads/42")
('downloads/show', {'id': 42})
在執(zhí)行 URL 與 endpoint 解析前,需要先添加匹配規(guī)則。Flask 如何做的呢?
from werkzeug.routing import Map, Rule
class Flask(object):
def __init__(self, package_name):
self.url_map = Map()
Map 存儲(chǔ) URL 規(guī)則和配置參數(shù),可以通過 Map.add 添加 URL 匹配規(guī)則。
class Flask(object):
def add_url_rule(self, rule, endpoint, **options):
options['endpoint'] = endpoint
options.setdefault('methods', ('GET',))
self.url_map.add(Rule(rule, **options))
def route(self, rule, **options):
def decorator(f):
self.add_url_rule(rule, f.__name__, **options)
self.view_functions[f.__name__] = f
return f
return decorator
方法 Flask.add_url_rule 與裝飾器 Flask.route 都能添加匹配規(guī)則。裝飾器 route 會(huì)將被裝飾的函數(shù)名作為 endpoint,并將函數(shù)名作為 key,函數(shù)作為 value,存在 Flask 對(duì)象的 self.view_functions 屬性中。
通過 endpoint, values = self.match_request() 解析得到 endpoint 與 請(qǐng)求參數(shù),再從 self.view_functions 中取出對(duì)應(yīng)的函數(shù)處理(return self.view_functions[endpoint](**values))。
endpoint 作用與意義可參考:https://stackoverflow.com/questions/19261833/what-is-an-endpoint-in-flask/19262349#19262349
如果處理過程中拋出異常,會(huì)根據(jù)異常碼(e.code)取出對(duì)應(yīng)的函數(shù),執(zhí)行異常處理函數(shù)。
異常處理函數(shù)通過裝飾器 Flask.errorhandler 添加,將錯(cuò)誤碼作為 key,被裝飾的函數(shù)作為 value,存入 Flask 的屬性 self.error_handlers 中:
class Flask(object):
def errorhandler(self, code):
def decorator(f):
self.error_handlers[code] = f
return f
return decorator
3.3 生成響應(yīng)對(duì)象
為什么需要這一步,直接返回 Python 內(nèi)置數(shù)據(jù)類型不行嗎?
不行,因?yàn)?WSGI 規(guī)定 application 端必須返回一個(gè)可迭代對(duì)象[1]。
class Flask(object):
def make_response(self, rv):
if isinstance(rv, self.response_class):
return rv
if isinstance(rv, basestring):
return self.response_class(rv)
if isinstance(rv, tuple):
return self.response_class(*rv)
return self.response_class.force_type(rv, request.environ)
Flask.make_response(rv) 將參數(shù) rv 轉(zhuǎn)換為 self.response_class 對(duì)象。
3.4 請(qǐng)求后置處理
迭代 self.after_request_funcs 中保存的函數(shù),逐個(gè)執(zhí)行,返回最后執(zhí)行完的數(shù)據(jù)。
添加函數(shù)的方式為 Flask.after_request 裝飾器。
3.5 返回可迭代對(duì)象
Flask 默認(rèn)的 self.response_class 類繼承自 werkzeug.Response,這個(gè)類的 __call__ 方法接收 environ, start_response 兩個(gè)參數(shù),并返回一個(gè)迭代器。
class BaseResponse(object):
def __call__(self, environ, start_response):
app_iter, status, headers = self.get_wsgi_response(environ)
start_response(status, headers)
return app_iter
當(dāng)調(diào)用 response(environ, start_response) 時(shí)返回一個(gè)迭代器。
四、Flask 如何處理并發(fā)請(qǐng)求?
Flask.wsgi_app 中的 with self.request_context(environ) 有什么用?
self.request_context 返回 _RequestContext 的實(shí)例化對(duì)象。
class _RequestContext(object):
def __init__(self, app, environ):
self.app = app
self.url_adapter = app.url_map.bind_to_environ(environ)
self.request = app.request_class(environ)
self.session = app.open_session(self.request)
self.g = _RequestGlobals()
self.flashes = None
def __enter__(self):
_request_ctx_stack.push(self)
def __exit__(self, exc_type, exc_value, tb):
if tb is None or not self.app.debug:
_request_ctx_stack.pop()
進(jìn)入 with 語句體時(shí),會(huì)執(zhí)行 __enter__ 方法,離開 with 語句體時(shí)會(huì)執(zhí)行 __exit__ 方法。也就是說,在 Flask 開始處理請(qǐng)求前,會(huì)執(zhí)行 _request_ctx_stack.puth(self),處理完請(qǐng)求后會(huì)執(zhí)行 _request_ctx_stack.pop(self)。
_request_ctx_stack 是一個(gè)全局變量,是 LocalStack 類的對(duì)象。
class LocalStack(object):
def __init__(self):
self._local = Local()
self._lock = allocate_lock()
def __release_local__(self):
self._local.__release_local__()
def push(self, obj):
self._lock.acquire()
try:
rv = getattr(self._local, 'stack', None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv
finally:
self._lock.release()
def pop(self):
self._lock.acquire()
try:
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()
finally:
self._lock.release()
@property
def top(self):
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
LocalStack 在初始化時(shí)會(huì)創(chuàng)建一個(gè)線程鎖(self._lock = allocate_lock()),LocalStack.push(obj) 首先請(qǐng)求加鎖,獲取到鎖后將 obj 添加到列表 self._local.stack,如果 self._local 沒有屬性 stack 則將 stack 初始化為空列表,最后釋放鎖(self._lock.release())。self._local.stack 是什么?
class Local(object):
__slots__ = ('__storage__', '__lock__')
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__lock__', allocate_lock())
def __getattr__(self, name):
self.__lock__.acquire()
try:
try:
return self.__storage__[get_ident()][name]
except KeyError:
raise AttributeError(name)
finally:
self.__lock__.release()
def __setattr__(self, name, value):
self.__lock__.acquire()
try:
ident = get_ident()
storage = self.__storage__
if ident in storage:
storage[ident][name] = value
else:
storage[ident] = {name: value}
finally:
self.__lock__.release()
在初始化 self._local.stack 屬性時(shí),self._local.stack = rv = [] 等同于 Local.__setattr__(self._local, 'stack', []),這里首先加鎖,然后獲取線程id(ident = get_ident()),將線程id作為字典 self.__storage__ 的鍵,將 {'stack': []} 作為值,即:
self.__storage__ = {
"thread1": {
"stack": []
},
"thread2": {
"stack": []
},
...
}
LocalStack.push(obj) 是將 obj 加入了對(duì)應(yīng)線程的 stack 列表。LocalStack.pop() 是將 obj 從對(duì)應(yīng)的線程的 stack 列表移除。另外還有 LocalStack.top,返回對(duì)應(yīng)線程存在 stack 中的對(duì)象。
為什么要如此大費(fèi)周章的做這件事呢?來看一個(gè)例子:
假設(shè) Web 服務(wù)器單進(jìn)程啟動(dòng),啟動(dòng)2個(gè)線程,同一時(shí)刻有2個(gè)請(qǐng)求進(jìn)來,每個(gè)請(qǐng)求都有對(duì)應(yīng)的 environ 數(shù)據(jù),如果直接賦值給同一個(gè)變量,后一個(gè)請(qǐng)求會(huì)覆蓋前一個(gè)請(qǐng)求的數(shù)據(jù),因?yàn)榫€程數(shù)據(jù)是共享的。要如何保存這兩個(gè)請(qǐng)求的數(shù)據(jù)?并在需要的時(shí)候能正確取出?Flask 的做法是設(shè)置一個(gè)全局變量 _request_ctx_stack,存儲(chǔ)數(shù)據(jù)最終用一個(gè)字典 self.__storage__,將不同線程的請(qǐng)求數(shù)據(jù)用線程id作為 key,請(qǐng)求數(shù)據(jù)存在線程id對(duì)應(yīng)的 stack 列表中。
self.__storage__ = {
"thread1": {
"stack": [obj1]
},
"thread2": {
"stack": [obj2]
},
...
}
Flask 提供了非常便捷的方式來獲取當(dāng)前請(qǐng)求中的 Flask 對(duì)象、請(qǐng)求、session、全局變量等數(shù)據(jù):current_app, request, session, g。
_request_ctx_stack = LocalStack()
current_app = LocalProxy(lambda: _request_ctx_stack.top.app)
request = LocalProxy(lambda: _request_ctx_stack.top.request)
session = LocalProxy(lambda: _request_ctx_stack.top.session)
g = LocalProxy(lambda: _request_ctx_stack.top.g)
其本質(zhì)是返回存在于 _request_ctx_stack.top 中對(duì)應(yīng)的屬性,返回的數(shù)據(jù)是與當(dāng)前線程id相對(duì)應(yīng)的,實(shí)現(xiàn)了線程數(shù)據(jù)隔離。
LocalProxy 是封裝 local 的代理對(duì)象,能夠保護(hù) local 對(duì)象,防止其被意外修改,對(duì)應(yīng)設(shè)計(jì)模式中的代理模式。
五、總結(jié)
回過頭看看 Flask 的常見用法:
from flask import Flask, request, session
app = Flask(__name__)
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
if request.form['username'] != USERNAME:
error = 'Invalid username'
elif request.form['password'] != PASSWORD:
error = 'Invalid password'
else:
session['logged_in'] = True
flash('You were logged in')
return redirect(url_for('show_entries'))
return render_template('login.html', error=error)
if __name__ == '__main__':
app.run()
首先實(shí)例化一個(gè) Flask 對(duì)象 app,使用裝飾器 route 添加 URL 與處理函數(shù),執(zhí)行 app.run(),啟動(dòng)服務(wù)。
訪問 localhost:5000/login,服務(wù)接收到請(qǐng)求后會(huì)將 request, current_app 等數(shù)據(jù)保存至線程id對(duì)應(yīng)的 stack 中,然后進(jìn)行前置處理(self.preprocess_request),解析 URL,獲取 endpoint 與參數(shù),通過 endpoint 從 self.view_functions 中獲取對(duì)應(yīng)的處理函數(shù),處理完后將返回值轉(zhuǎn)換為響應(yīng)對(duì)象,進(jìn)行后置處理(self.process_response),最后返回可迭代對(duì)象(response(environ, start_response)),之后便是服務(wù)器處理的部分了。
版本:
python 2.7, werkzeug==0.6.1, Flask==0.1參考文獻(xiàn):
[1] Eby, P.J. (2010). PEP 3333 – Python Web Server Gateway Interface v1.0.1. Retrieved February 18, 2023, from https://peps.python.org/pep-3333/.
[2] What is an 'endpoint' in Flask?.(n.d). Retrieved February 18, 2023, from https://stackoverflow.com/questions/19261833/what-is-an-endpoint-in-flask/19262349#19262349.