資料
起因
- 在學(xué)校做網(wǎng)絡(luò)安全或者軟件定義網(wǎng)絡(luò)等領(lǐng)域的研究時,并不太關(guān)心所使用的sdn控制器的水平擴展、對數(shù)據(jù)中心的支持等
- 更多的場景就是幾臺甚至一臺主機的ovs上對理論模型進行實驗仿真,得到仿真結(jié)果后就可以寫在論文里了
- 我這次遇到的場景是需要在軟件交換機ovs中做802.1x認(rèn)證者的邏輯,不涉及什么創(chuàng)新
- 但還是想把過程中對ryu的學(xué)習(xí)進行總結(jié),后面如果再用的話,爭取現(xiàn)在留下的文章能夠讓我快速重新?lián)炱饋?/li>
- 本篇初探主要內(nèi)容是ryu主線源碼解讀,ryu的設(shè)計很優(yōu)美,但如果不去讀源碼的話,很多約定會覺得像魔法
- 此外ryu并發(fā)模型使用的協(xié)程,導(dǎo)致不讀源碼的話,編寫的app如果使用多線程,調(diào)試的時候也可能會一頭霧水
功能
- 在讀ryu源碼前,可以想象一下sdn控制器要做的事情都有哪些
- 帶著ryu如何實現(xiàn)這些功能的疑問去讀源碼會更加清晰
- 我認(rèn)為的幾個主要的功能點:
- 通過openflow協(xié)議管理多個交換機
- 通過netconf協(xié)議管理多個交換機
- 可選支持ovsdb等協(xié)議完善控制能力
- 支持用戶編寫數(shù)據(jù)包處理邏輯,并提供合適的框架簡化代碼
- 可以同時管理運行用戶編寫的多個sdn應(yīng)用,并使其協(xié)作
- 最好能對http restful api提供框架支持
- 統(tǒng)一的網(wǎng)絡(luò)并發(fā)模型管理上述涉及到的網(wǎng)絡(luò)通信
- 對接交換機
- 對接restapi調(diào)用者
- 多應(yīng)用之間進行內(nèi)部通信協(xié)作
- 通信方式擴展,如unix socket、AMPQ協(xié)議等
- 可選實現(xiàn)一些數(shù)據(jù)中心常見的控制協(xié)議,比如bgp
閱讀
一級文件目錄
╭─root@testdevice ~/ryu ?master?
╰─# tree -L 1
.
├── CONTRIBUTING.rst
├── LICENSE
├── MANIFEST.in
├── README.rst
├── bin
├── debian
├── doc
├── etc
├── run_tests.sh
├── ryu
├── setup.cfg
├── setup.py
├── tools
└── tox.ini
- 先來看文件目錄,幾個rst我們不用關(guān)心
- bin文件夾里面是ryu用來啟動的可執(zhí)行腳本
from ryu.cmd.manager import main
main()
- 可以看下ryu-manager啟動腳本內(nèi)容,除了開源許可注釋,只有兩行,從ryu包的cmd.manager導(dǎo)入了main函數(shù)并執(zhí)行
- 因此我們也就知道了,我們通過
ryu-manager --verbose myapp.py命令啟動的控制器實例,是由cmd.manager.main開始的
- bin文件夾中還有一個ryu可執(zhí)行腳本,允許通過run的subcommand執(zhí)行ryu-manager的邏輯
ryu run myapp.py
- 此外ryu命令通過調(diào)用yu.cmd路徑下的其他函數(shù),實現(xiàn)了
[run|of-config-cli|rpc-cli]這三個子命令
ryu二級目錄
╭─root@testdevice ~/ryu/ryu ?master?
╰─# tree -L 1
.
├── __init__.py
├── app
├── base
├── cfg.py
├── cmd
├── contrib
├── controller
├── exception.py
├── flags.py
├── hooks.py
├── lib
├── log.py
├── ofproto
├── services
├── tests
├── topology
└── utils.py
10 directories, 7 files
- 可以看到文件內(nèi)容不少,但都是實現(xiàn)ryu控制器的核心邏輯代碼
- 接下來我們繼續(xù)跟啟動的主線
啟動主線
ryu/ryu/cmd/manager.py
包導(dǎo)入
19 import os
20 import sys
21
22 from ryu.lib import hub
23 hub.patch(thread=False)
24
25 from ryu import cfg
26
27 import logging
28 from ryu import log
29 log.early_init_log(logging.DEBUG)
30
31 from ryu import flags
32 from ryu import version
33 from ryu.app import wsgi
34 from ryu.base.app_manager import AppManager
35 from ryu.controller import controller
36 from ryu.topology import switches
- 注意在入口文件的第四行就已經(jīng)通過ryu.lib.hub引入了協(xié)程庫,并對除線程以外的庫進行了patch
- patch即為將目標(biāo)模塊,替換為協(xié)程形式的實現(xiàn),但實現(xiàn)的并不完美,有一些地方會導(dǎo)致bug
- 中間幾行導(dǎo)入了配置管理和日志管理模塊
- 最后幾行引入了
- wsgi:用于提供web服務(wù)
- AppManager:管理多app
- controller:openflow控制器,使其能夠控制交換機的核心功能
- switches:未知?。?!
配置管理
37
38
39 CONF = cfg.CONF
40 CONF.register_cli_opts([
41 cfg.ListOpt('app-lists', default=[],
42 help='application module name to run'),
43 cfg.MultiStrOpt('app', positional=True, default=[],
44 help='application module name to run'),
45 cfg.StrOpt('pid-file', default=None, help='pid file name'),
46 cfg.BoolOpt('enable-debugger', default=False,
47 help='don\'t overwrite Python standard threading library'
48 '(use only for debugging)'),
49 cfg.StrOpt('user-flags', default=None,
50 help='Additional flags file for user applications'),
51 ])
52
53
54 def _parse_user_flags():
55 """
56 Parses user-flags file and loads it to register user defined options.
57 """
58 try:
59 idx = list(sys.argv).index('--user-flags')
60 user_flags_file = sys.argv[idx + 1]
61 except (ValueError, IndexError):
62 user_flags_file = ''
63
64 if user_flags_file and os.path.isfile(user_flags_file):
65 from ryu.utils import _import_module_file
66 _import_module_file(user_flags_file)
67
68
69 def main(args=None, prog=None):
70 _parse_user_flags()
71 try:
72 CONF(args=args, prog=prog,
73 project='ryu', version='ryu-manager %s' % version,
74 default_config_files=['/usr/local/etc/ryu/ryu.conf'])
75 except cfg.ConfigFilesNotFoundError:
76 CONF(args=args, prog=prog,
77 project='ryu', version='ryu-manager %s' % version)
78
79 log.init_log()
80 logger = logging.getLogger(__name__)
81
- 配置管理和日志管理不是本篇重點,就不贅述
- 思考了一下還是把完整的內(nèi)容貼在這里,讓以后的我和讀者有個直觀的感受
- 因為我在讀其他人的分析源碼的的文章的時候,經(jīng)常出現(xiàn)的疑問就是,這段代碼在哪?
核心邏輯
82 if CONF.enable_debugger:
83 msg = 'debugging is available (--enable-debugger option is turned on)'
84 logger.info(msg)
85 else:
86 hub.patch(thread=True)
87
88 if CONF.pid_file:
89 with open(CONF.pid_file, 'w') as pid_file:
90 pid_file.write(str(os.getpid()))
91
92 app_lists = CONF.app_lists + CONF.app
93 # keep old behavior, run ofp if no application is specified.
94 if not app_lists:
95 app_lists = ['ryu.controller.ofp_handler']
96
97 app_mgr = AppManager.get_instance()
98 app_mgr.load_apps(app_lists)
99 contexts = app_mgr.create_contexts()
100 services = []
101 services.extend(app_mgr.instantiate_apps(**contexts))
102
103 webapp = wsgi.start_service(app_mgr)
104 if webapp:
105 thr = hub.spawn(webapp)
106 services.append(thr)
107
108 try:
109 hub.joinall(services)
110 except KeyboardInterrupt:
111 logger.debug("Keyboard Interrupt received. "
112 "Closing RYU application manager...")
113 finally:
114 app_mgr.close()
115
116
117 if __name__ == "__main__":
118 main()
- 首先一個比較有意思的是82行會對是否啟動debugger調(diào)試模式進行判斷,如果啟動了的話,就不會對線程進行協(xié)程patch
- 因此如果我們寫的app代碼調(diào)用的庫不得不調(diào)用多線程時,可以嘗試開啟debugger,看情況是否有所變化
- 接下來92到95行,讀取配置文件或命令行中指定的app列表,如果沒有指定,那么會運行ryu.controller.ofp_handler
- ryu.controller.ofp_handler就是控制器與交換機進行openflow協(xié)議交互的核心app
- 那么如果我們指定了自己的app,就不會運行ryu.controller.ofp_handler這樣核心的邏輯了嗎?
- 后面我們會發(fā)現(xiàn),就算app_lists不是空,我們依舊會在運行自己指定app的同時,運行起來ryu.controller.ofp_handler
- 這就涉及到了ryu對app的依賴管理和導(dǎo)入機制的設(shè)計,如果不看源碼可能會很膜法
- 下面97行,get_instance可以猜個大概app_manager使用的單例模式
- 98行,調(diào)用manager的load_apps函數(shù)把要運行的app的py代碼都動態(tài)的加載進內(nèi)存
- 99行創(chuàng)建并初始化了應(yīng)用上下文,后面會詳細(xì)說明
- 這里可以簡單理解為我們編寫的app以來的其他的庫
- ryu會為我們初始化好,并傳入供我們的app調(diào)用
- context是個字典,注意相同名字的依賴會只初始化一次,可以存在多app都調(diào)用同一個context的情況,因此也可作為多app共享內(nèi)容的方式之一
- 100-101行,services是一個列表,存儲了app_manager初始化所有app后返回的內(nèi)容
- 103-106行,wsgi啟動了他自己的服務(wù),
- 如果初始化成功,會通過hub.spawn注冊進協(xié)程管理
- 并將返回結(jié)果append到了services里
- 由此我們可以理解,services里其實都是hub.spawn返回的每個協(xié)程的句柄
- 最后的108行開始
- 嘗試hub.joinall等待所有的協(xié)程實例運行結(jié)束
- 如果遇到了ctrl+c,或者遇到異常,無論如何調(diào)用app_manager的close方法進行清理
- app_manager作為app們的管理者,同樣會調(diào)用各app的close方法,完成樹狀的資源清理
- 92行到114行,這短短的二十多行就是ryu運行控制器核心邏輯的樹根,后面的一切邏輯上的樹枝,都由此而來
- 其中的邏輯又可主要分為幾大塊:
- app讀取、拉起,涉及到app間的依賴管理
- app對其他類實例的依賴,涉及到app對其他類的依賴
- wsgi對app中涉及restful應(yīng)用的管理
- 協(xié)程貫穿在ryu中的使用
- 接下來我們就對其逐一進行分析
app依賴管理
- 啟動主線中的app_manager是從ryu/ryu/base/app_manager.py中實例化的
- 而base這個文件夾中,只有一個app_manager.py,可見其重要性
- 其中不止實現(xiàn)了app管理器,還規(guī)定了了我們編寫的app需要繼承的基類
ryu/ryu/base/app_manager.py
包導(dǎo)入
26 import inspect
27 import itertools
28 import logging
29 import sys
30 import os
31 import gc
32
33 from ryu import cfg
34 from ryu import utils
35 from ryu.app import wsgi
36 from ryu.controller.handler import register_instance, get_dependent_services
37 from ryu.controller.controller import Datapath
38 from ryu.controller import event
39 from ryu.controller.event import EventRequestBase, EventReplyBase
40 from ryu.lib import hub
41 from ryu.ofproto import ofproto_protocol
42
43 LOG = logging.getLogger('ryu.base.app_manager')
44
45 SERVICE_BRICKS = {}
- app_manager引入的內(nèi)容與剛剛cmd中的manager類似
- 需要注意37行導(dǎo)入了datapath,用于管理網(wǎng)橋的庫
- 以及38-39行引入的ryu的事件管理庫
- 42行進行了日志配置
- 45行文件全局變量SERVICE_BRICKS這個字典,是一個app名稱映射到app實例的字典
- 但為什么不叫app,突然改叫brick,是閱讀過程中比較困惑的地方
- 猜測是app在event管理路由這個場景就給他換了個名字
- 后面會多處出現(xiàn)對這個字典的操作
app管理器
- 注意45行后到351行間是app基類與工具方法,此處跳躍進行介紹
351 class AppManager(object):
352 # singleton
353 _instance = None
354
355 @staticmethod
356 def run_apps(app_lists):
357 """Run a set of Ryu applications
358
359 A convenient method to load and instantiate apps.
360 This blocks until all relevant apps stop.
361 """
362 app_mgr = AppManager.get_instance()
363 app_mgr.load_apps(app_lists)
364 contexts = app_mgr.create_contexts()
365 services = app_mgr.instantiate_apps(**contexts)
366 webapp = wsgi.start_service(app_mgr)
367 if webapp:
368 services.append(hub.spawn(webapp))
369 try:
370 hub.joinall(services)
371 finally:
372 app_mgr.close()
373 for t in services:
374 t.kill()
375 hub.joinall(services)
376 gc.collect()
377
378 @staticmethod
379 def get_instance():
380 if not AppManager._instance:
381 AppManager._instance = AppManager()
382 return AppManager._instance
383
384 def __init__(self):
385 self.applications_cls = {}
386 self.applications = {}
387 self.contexts_cls = {}
388 self.contexts = {}
389 self.close_sem = hub.Semaphore()
- 首先看379行即我們看到在上面的代碼中調(diào)用的實例獲取函數(shù),也確實是單實例模式
- 再看384行的初始化函數(shù),不需要傳入任何參數(shù),僅對app和context的加載所需的變量進行初始化
- 而這個run_apps函數(shù),與我們在manager.py中的run_apps函數(shù)名字都一樣,做的事情也一致
- 經(jīng)過搜索調(diào)用此函數(shù)的文件后,發(fā)現(xiàn)此函數(shù)僅在ryu/ryu/cmd/ofa_neutron_agent.py被使用
- 猜測當(dāng)openstack的neutron插件使用ryu控制器時,會與平時ryu-manager的邏輯有所不同
391 def load_app(self, name):
392 mod = utils.import_module(name)
393 clses = inspect.getmembers(mod,
394 lambda cls: (inspect.isclass(cls) and
395 issubclass(cls, RyuApp) and
396 mod.__name__ ==
397 cls.__module__))
398 if clses:
399 return clses[0][1]
400 return None
401
402 def load_apps(self, app_lists):
403 app_lists = [app for app
404 in itertools.chain.from_iterable(app.split(',')
405 for app in app_lists)]
406 while len(app_lists) > 0:
407 app_cls_name = app_lists.pop(0)
408
409 context_modules = [x.__module__ for x in self.contexts_cls.values()]
410 if app_cls_name in context_modules:
411 continue
412
413 LOG.info('loading app %s', app_cls_name)
414
415 cls = self.load_app(app_cls_name)
416 if cls is None:
417 continue
418
419 self.applications_cls[app_cls_name] = cls
420
421 services = []
422 for key, context_cls in cls.context_iteritems():
423 v = self.contexts_cls.setdefault(key, context_cls)
424 assert v == context_cls
425 context_modules.append(context_cls.__module__)
426
427 if issubclass(context_cls, RyuApp):
428 services.extend(get_dependent_services(context_cls))
429
430 # we can't load an app that will be initiataed for
431 # contexts.
432 for i in get_dependent_services(cls):
433 if i not in context_modules:
434 services.append(i)
435 if services:
436 app_lists.extend([s for s in set(services)
437 if s not in app_lists])
- 接下來就是關(guān)鍵的app加載相關(guān)的函數(shù)
- 上面的分析我們知道m(xù)anager.py調(diào)用app_manager的load_apps作為加載app的入口函數(shù)
- 首先403行對app_list中的內(nèi)容進行扁平化處理,將逗號分隔的app都拆分成單獨的app重新存入列表
- 接下的循環(huán)就是加載app類以及處理依賴的邏輯
- 406行,可以看到循環(huán)的結(jié)束條件是app_list為空
- 而在407行從app_list中取出一個app進行處理
- 一般來說這樣使用while 列表不為空來處理列表中的內(nèi)容,都是列表的內(nèi)容會隨著處理而動態(tài)改變
- 而動態(tài)變化的原因,正是因為只有加載了一個app類之后,才知道他的依賴是什么,才能將其動態(tài)的加入app_list中
- ryu沒有強硬的規(guī)定依賴關(guān)系必須顯式的寫在配置中或者app類中(除了context)
- 而是通過調(diào)用關(guān)系,隱含式的給我們實現(xiàn)的類中添加了對應(yīng)的依賴關(guān)系,存疑???
- 這也是ryu實現(xiàn)非常優(yōu)雅的地方之一
- 接下來409行確認(rèn)當(dāng)前處理的app是不是只是一個context而已
- 415行調(diào)用load_app加載單個app
- 419行可以看出,app_manager.applications_cls的鍵是類名,值是類,而不是實例
- 注意此階段并沒有對app類進行實例化,只是記錄類的類別
- 421行初始化了一個列表叫services
- 這里的services與上文manage.py中用來hub.joinall的services含義完全不同
- 這里的services僅用于存儲當(dāng)前正在處理的app所依賴的app們
- 422行迭代app類所聲明的context
- 如果是context只是一個其他的類,就加入到app_manager的contexts_cls字典中
- 注意同樣僅存了類的類別,沒有初始化
- 而425行的context_module列表既存儲了之前app聲明的context,也添加了當(dāng)前app聲明的context
- 427行,如果context中的類同樣實現(xiàn)了ryu的app_base,那么把他添加到services里,后續(xù)添加到app_list里
- 說明如果context中聲明的一個依賴同樣實現(xiàn)了ryu的app基類,那么同樣將其以app看待進行加載
- 432行的get_dependent_services函數(shù)并不在本文件中
- 但可以根據(jù)其名稱知其含義,即獲取本app所依賴的app們
- 后續(xù)會對此函數(shù)詳細(xì)解釋
- 433行的判斷邏輯避免了context中的app類被重復(fù)添加至app_list,而維護context_modules變量的意義即如此
- 435行將services變量中的app,即本app所依賴的app,添加到待加載的app_list中去
- 而前面391行的load_app函數(shù),也很簡單,利用python動態(tài)的特性,從對應(yīng)對的模塊中找繼承了app基類的那個用戶實現(xiàn)的app類
439 def create_contexts(self):
440 for key, cls in self.contexts_cls.items():
441 if issubclass(cls, RyuApp):
442 # hack for dpset
443 context = self._instantiate(None, cls)
444 else:
445 context = cls()
446 LOG.info('creating context %s', key)
447 assert key not in self.contexts
448 self.contexts[key] = context
449 return self.contexts
- 接下來create_contexts,也是前文manage.py中調(diào)用的函數(shù)
- 從內(nèi)容可以看出本函數(shù)對context中記錄的類進行是初始化
- 將各類的實例以kv的形式存儲在app_manager.context中,并最后返回這個字典
- 所以在load_apps中context進行了加載,而本函數(shù)進行了context的初始化,生成類的實例
451 def _update_bricks(self):
452 for i in SERVICE_BRICKS.values():
453 for _k, m in inspect.getmembers(i, inspect.ismethod):
454 if not hasattr(m, 'callers'):
455 continue
456 for ev_cls, c in m.callers.items():
457 if not c.ev_source:
458 continue
459
460 brick = _lookup_service_brick_by_mod_name(c.ev_source)
461 if brick:
462 brick.register_observer(ev_cls, i.name,
463 c.dispatchers)
464
465 # allow RyuApp and Event class are in different module
466 for brick in SERVICE_BRICKS.values():
467 if ev_cls in brick._EVENTS:
468 brick.register_observer(ev_cls, i.name,
469 c.dispatchers)
470
471 @staticmethod
472 def _report_brick(name, app):
473 LOG.debug("BRICK %s", name)
474 for ev_cls, list_ in app.observers.items():
475 LOG.debug(" PROVIDES %s TO %s", ev_cls.__name__, list_)
476 for ev_cls in app.event_handlers.keys():
477 LOG.debug(" CONSUMES %s", ev_cls.__name__)
478
479 @staticmethod
480 def report_bricks():
481 for brick, i in SERVICE_BRICKS.items():
482 AppManager._report_brick(brick, i)
483
- 上面的update_brickcs函數(shù)涉及到了ryu的一個魔法設(shè)計,即caller,后面還會詳細(xì)說
- 下面的兩個report_bricks是用來打印當(dāng)前控制器所管理的bricks,而這個磚頭就是從文件開頭的全局變量取出
- 而bricks磚頭,對應(yīng)著我們指定的app,以及我們的app所依賴的app
- 如下是我運行ryu時verbose模式下打印的BRICK內(nèi)容
- 其中openflow、hostapd_wif、hostapd_eth、pusher是我編寫的app
- 而ofp_event對應(yīng)著我們的app依賴的app:ryu.controller.ofp_handler
- 下面更重要的內(nèi)容是每個BRICK,即app,消費以及產(chǎn)生的事件類型,對于了解多app情況下的event路由情況非常有幫助
- 其中PROVIDES EventX To {'openflow': {'main'}}中的openflow指的是app名字,main指定是當(dāng)前控制器所處于的狀態(tài)disaptcher,如config配置狀態(tài),main運行狀態(tài)
BRICK openflow
CONSUMES EventOFPPacketIn
CONSUMES EventJoin
CONSUMES EventLeave
CONSUMES EventOFPSwitchFeatures
BRICK hostapd_wif
PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK hostapd_eth
PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK pusher
CONSUMES EventJoin
CONSUMES EventLeave
BRICK ofp_event
PROVIDES EventOFPPacketIn TO {'openflow': {'main'}}
PROVIDES EventOFPSwitchFeatures TO {'openflow': {'config'}}
CONSUMES EventOFPEchoReply
CONSUMES EventOFPEchoRequest
CONSUMES EventOFPErrorMsg
CONSUMES EventOFPHello
CONSUMES EventOFPPortDescStatsReply
CONSUMES EventOFPPortStatus
CONSUMES EventOFPSwitchFeatures
484 def _instantiate(self, app_name, cls, *args, **kwargs):
485 # for now, only single instance of a given module
486 # Do we need to support multiple instances?
487 # Yes, maybe for slicing.
488 LOG.info('instantiating app %s of %s', app_name, cls.__name__)
489
490 if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
491 ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
492
493 if app_name is not None:
494 assert app_name not in self.applications
495 app = cls(*args, **kwargs)
496 register_app(app)
497 assert app.name not in self.applications
498 self.applications[app.name] = app
499 return app
500
501 def instantiate(self, cls, *args, **kwargs):
502 app = self._instantiate(None, cls, *args, **kwargs)
503 self._update_bricks()
504 self._report_brick(app.name, app)
505 return app
506
507 def instantiate_apps(self, *args, **kwargs):
508 for app_name, cls in self.applications_cls.items():
509 self._instantiate(app_name, cls, *args, **kwargs)
510
511 self._update_bricks()
512 self.report_bricks()
513
514 threads = []
515 for app in self.applications.values():
516 t = app.start()
517 if t is not None:
518 app.set_main_thread(t)
519 threads.append(t)
520 return threads
- 接下來是用來進行app初始化的函數(shù)
- 首先manager.py調(diào)用的app初始化入口函數(shù)是instantiate_apps,
- 在本函數(shù)里可以看到遍歷了app_manager的app class,并調(diào)用_instantiate實際初始化
- 后面調(diào)用_update_bricks更新app間的事件依賴關(guān)系,并通過report打印結(jié)果
- 最后如同我們上面分析manager.py中的猜測一樣,instantiate_apps函數(shù)返回了注冊的所有app的協(xié)程句柄,用于在main函數(shù)中統(tǒng)一join
- 在_instantiate函數(shù)中實現(xiàn)了app類的動態(tài)的實例化,并在實例化后調(diào)用register_app注冊,最后在app_manager自身的application字典中注冊新初始化的app
522 @staticmethod
523 def _close(app):
524 close_method = getattr(app, 'close', None)
525 if callable(close_method):
526 close_method()
527
528 def uninstantiate(self, name):
529 app = self.applications.pop(name)
530 unregister_app(app)
531 for app_ in SERVICE_BRICKS.values():
532 app_.unregister_observer_all_event(name)
533 app.stop()
534 self._close(app)
535 events = app.events
536 if not events.empty():
537 app.logger.debug('%s events remains %d', app.name, events.qsize())
538
539 def close(self):
540 def close_all(close_dict):
541 for app in close_dict.values():
542 self._close(app)
543 close_dict.clear()
544
545 # This semaphore prevents parallel execution of this function,
546 # as run_apps's finally clause starts another close() call.
547 with self.close_sem:
548 for app_name in list(self.applications.keys()):
549 self.uninstantiate(app_name)
550 assert not self.applications
551 close_all(self.contexts)
- 最后是一些關(guān)閉、清理、刪除當(dāng)前app的函數(shù)
- app_manager至此就分析結(jié)束了
工具方法
48 def lookup_service_brick(name):
49 return SERVICE_BRICKS.get(name)
50
51
52 def _lookup_service_brick_by_ev_cls(ev_cls):
53 return _lookup_service_brick_by_mod_name(ev_cls.__module__)
54
55
56 def _lookup_service_brick_by_mod_name(mod_name):
57 return lookup_service_brick(mod_name.split('.')[-1])
58
59
60 def register_app(app):
61 assert isinstance(app, RyuApp)
62 assert app.name not in SERVICE_BRICKS
63 SERVICE_BRICKS[app.name] = app
64 register_instance(app)
65
66
67 def unregister_app(app):
68 SERVICE_BRICKS.pop(app.name)
69
70
71 def require_app(app_name, api_style=False):
72 """
73 Request the application to be automatically loaded.
74
75 If this is used for "api" style modules, which is imported by a client
76 application, set api_style=True.
77
78 If this is used for client application module, set api_style=False.
79 """
80 iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
81 modules = [module for module in iterable if module is not None]
82 if api_style:
83 m = modules[2] # skip a frame for "api" module
84 else:
85 m = modules[1]
86 m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
87 m._REQUIRED_APP.append(app_name)
88 LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
89
90
- 這些方法在app_manager的函數(shù)中有所調(diào)用
- 可以看出,注冊方法對應(yīng)字典的賦值,刪除方法對應(yīng)字典的刪除,搜索方法對應(yīng)字典的取值
- 讀起來SERVICE_BRICKS與app_manager自身的appcalitions沒有什么區(qū)別
- 可能隨著理解的加深能會慢慢了解這樣的用意叭
- 最后require_app函數(shù)的邏輯是添加依賴app名字到某個module的_REQUIRED_APP列表
app基類
- 由于app在app_manager視角只是一個被管理者,在下文中的事件收發(fā)小節(jié)中介紹app基類更好,這里暫且跳過
ryu/ryu/controller/handler.py
120 def get_dependent_services(cls):
121 services = []
122 for _k, m in inspect.getmembers(cls, _is_method):
123 if _has_caller(m):
124 for ev_cls, c in m.callers.items():
125 service = getattr(sys.modules[ev_cls.__module__],
126 '_SERVICE_NAME', None)
127 if service:
128 # avoid cls that registers the own events (like
129 # ofp_handler)
130 if cls.__module__ != service:
131 services.append(service)
132
133 m = sys.modules[cls.__module__]
134 services.extend(getattr(m, '_REQUIRED_APP', []))
135 services = list(set(services))
136 return services
- 現(xiàn)在回來填坑上文所調(diào)用的獲取本app依賴關(guān)系的函數(shù)get_dependent_services
- 這個函數(shù)位于controller/handler.py,原因是因為涉及到了caller這個ryu用來管理事件依賴的魔法
- 簡單來說是我們通過set_ev_cls注冊監(jiān)聽事件的時候,ryu會通過caller機制計算哪些app會生產(chǎn)此事件,從而計算出當(dāng)前app所依賴的app有哪些
- 例如我們指定了一個自己編寫的app,監(jiān)聽了ofp_event.EventOFPPacketIn這個openflow的packet in事件
- 但這個事件是由ryu.controller.ofp_handler這個app產(chǎn)生的,所以我們的app就對這個app產(chǎn)生了依賴關(guān)系
- 很好理解,本app使用了其他app產(chǎn)生了事件,自然我們就對其產(chǎn)生了依賴
- 而ryu正是通過實現(xiàn)了他自己的caller機制,管理這種依賴
- 134行,可以看出通過設(shè)置_REQUIRED_APP可以顯示的指定我們app所依賴的app
- 前文提過,require_app函數(shù)是用于增加_REQUIRED_APP列表內(nèi)容的方法
- 通過grep搜索require_app函數(shù)的調(diào)用,可以看出是用于api調(diào)用方被調(diào)用方之間的依賴指定
- 所以也就好理解require_app的api_style參數(shù)了
╭─root@testdevice ~/ryu/ryu ?master?
╰─# grep -R require_app
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.rest_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ws_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ofctl_rest')
app/ofctl/api.py:app_manager.require_app('ryu.app.ofctl.service', api_style=True)
base/app_manager.py:def require_app(app_name, api_style=False):
base/app_manager.py: LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
services/protocols/vrrp/api.py:app_manager.require_app('ryu.services.protocols.vrrp.manager', api_style=True)
topology/api.py:app_manager.require_app('ryu.topology.switches', api_style=True)
- 除了get_dependent_services,ryu/ryu/controller/handler.py文件其他的函數(shù)或者類的定義,也都是圍繞著caller有關(guān)
app管理小總結(jié)
420
421 services = []
422 for key, context_cls in cls.context_iteritems():
423 v = self.contexts_cls.setdefault(key, context_cls)
424 assert v == context_cls
425 context_modules.append(context_cls.__module__)
426
427 if issubclass(context_cls, RyuApp):
428 services.extend(get_dependent_services(context_cls))
429
430 # we can't load an app that will be initiataed for
431 # contexts.
432 for i in get_dependent_services(cls):
433 if i not in context_modules:
434 services.append(i)
435 if services:
436 app_lists.extend([s for s in set(services)
437 if s not in app_lists])
- 經(jīng)過閱讀后可以看出,整個app依賴管理的核心就在十多行行代碼
- app的依賴來源也讀到了三種
- 通過context指定(如果context中是一個app的話)
- 通過事件生產(chǎn)消費關(guān)系隱式的指定
- 通過app_manager.require_app函數(shù)顯示指定
- 第一點是在427行進行的判斷
- 第二三點,是在432行進行的獲取
- 同時會在433行進行避免第一點與第二三點之間依賴重復(fù)添加的邏輯
事件管理
- 事件管理正是我們在前文不斷提到的caller機制所實現(xiàn)的
ryu/ryu/controller/handler.py
- 前文提到的get_dependent_services函數(shù)同樣位于本文件中,現(xiàn)在開始分析caller相關(guān)機制
包引入
17 import inspect
18 import logging
19 import sys
20
21 LOG = logging.getLogger('ryu.controller.handler')
22
23 # just represent OF datapath state. datapath specific so should be moved.
24 HANDSHAKE_DISPATCHER = "handshake"
25 CONFIG_DISPATCHER = "config"
26 MAIN_DISPATCHER = "main"
27 DEAD_DISPATCHER = "dead"
- 導(dǎo)入了三個官方庫后,ryu規(guī)定了四個當(dāng)前運行階段
caller
30 class _Caller(object):
31 """Describe how to handle an event class.
32 """
33
34 def __init__(self, dispatchers, ev_source):
35 """Initialize _Caller.
36
37 :param dispatchers: A list of states or a state, in which this
38 is in effect.
39 None and [] mean all states.
40 :param ev_source: The module which generates the event.
41 ev_cls.__module__ for set_ev_cls.
42 None for set_ev_handler.
43 """
44 self.dispatchers = dispatchers
45 self.ev_source = ev_source
46
47
48 # should be named something like 'observe_event'
49 def set_ev_cls(ev_cls, dispatchers=None):
.....省略文檔
77 def _set_ev_cls_dec(handler):
78 if 'callers' not in dir(handler):
79 handler.callers = {}
80 for e in _listify(ev_cls):
81 handler.callers[e] = _Caller(_listify(dispatchers), e.__module__)
82 return handler
83 return _set_ev_cls_dec
84
85
86 def set_ev_handler(ev_cls, dispatchers=None):
87 def _set_ev_cls_dec(handler):
88 if 'callers' not in dir(handler):
89 handler.callers = {}
90 for e in _listify(ev_cls):
91 handler.callers[e] = _Caller(_listify(dispatchers), None)
92 return handler
93 return _set_ev_cls_dec
- 30行見到了caller的真身,只有兩個參數(shù)階段和來源,也沒有任何方法
- caller可以這樣理解:
- 事件有生產(chǎn)者和消費者,當(dāng)生產(chǎn)者生產(chǎn)了對應(yīng)的事件后,消費者會觸發(fā)執(zhí)行相應(yīng)的處理函數(shù)
- 因此生產(chǎn)者就好像消費者的函數(shù)的調(diào)用者一樣,作為一個caller,去call監(jiān)聽這個事件的那些函數(shù)
- 49行見到了我們熟悉的set_ev_cls,他是一個帶參數(shù)的裝飾器
- 帶參數(shù)的裝飾器返回值是裝飾器
- 裝飾器返回值是一個新的函數(shù)
- set_ev_cls這個裝飾器的作用很簡單
- 78行判斷被裝飾的函數(shù)有沒有callers屬性
- 80行遍歷本需要監(jiān)聽的事件列表,并在函數(shù)的caller屬性設(shè)置對應(yīng)的kv
- k是事件類型,v是30行的Caller類的實例,其中包含了在哪個運行階段事件產(chǎn)生后會進行調(diào)用,和事件來源的信息
- 86行的set_ev_handler函數(shù)與set_ev_cls非常相似,只有一點不同:
- Caller類型的ev_source屬性是None
- 那么這個ev_source屬性有什么用呢?與自動加載依賴有關(guān),繼續(xù)看代碼會有更多解釋
- 于是我們明白了,set_ev_cls(handler)僅僅為我們的自定義函數(shù)添加了一個callers變量,記錄著有哪些事件產(chǎn)生時,要調(diào)用本函數(shù)
- 但我們這時候只知道有這樣的事件存在,我要監(jiān)聽他,那么誰會產(chǎn)生這樣的事件呢?我們回顧一下get_dependent_services函數(shù):
120 def get_dependent_services(cls):
121 services = []
122 for _k, m in inspect.getmembers(cls, _is_method):
123 if _has_caller(m):
124 for ev_cls, c in m.callers.items():
125 service = getattr(sys.modules[ev_cls.__module__],
126 '_SERVICE_NAME', None)
127 if service:
128 # avoid cls that registers the own events (like
129 # ofp_handler)
130 if cls.__module__ != service:
131 services.append(service)
132
133 m = sys.modules[cls.__module__]
134 services.extend(getattr(m, '_REQUIRED_APP', []))
135 services = list(set(services))
136 return services
137
138
139 def register_service(service):
140 """
141 Register the ryu application specified by 'service' as
142 a provider of events defined in the calling module.
143
144 If an application being loaded consumes events (in the sense of
145 set_ev_cls) provided by the 'service' application, the latter
146 application will be automatically loaded.
147
148 This mechanism is used to e.g. automatically start ofp_handler if
149 there are applications consuming OFP events.
150 """
151 frame = inspect.currentframe()
152 m_name = frame.f_back.f_globals['__name__']
153 m = sys.modules[m_name]
154 m._SERVICE_NAME = service
- 這次我們來詳細(xì)回顧get_dependent_services函數(shù):
- 121行初始化services列表用于記錄當(dāng)前cls所依賴的app
- 122行遍歷app的所有函數(shù),查看是否含有caller屬性,也就是是否被set_ev_xx修飾過
- 124行遍歷當(dāng)前函數(shù)監(jiān)聽的事件
- 125行再次出現(xiàn)service變量名,這里的service表示當(dāng)前app依賴的app
- 獲取依賴app是通過查找事件類所在模塊的_SERVICE_NAME變量得到的
- 恰好139行的register_service,本文件最后一個函數(shù),就是用來為模塊的_SERVICE_NAME變量賦值的
- 賦值的value是通過參數(shù)指定的
- 搜索調(diào)用register_service的函數(shù),會發(fā)現(xiàn)我們熟悉的ofp_event,而經(jīng)過查看發(fā)現(xiàn)ofp_event.py文件在文件的最末尾處直接調(diào)用了這句話
- 也就是說,只要我們import了ofp_event模塊,就會自動執(zhí)行這條register語句,在ofp_event模塊自身中植入_SERVICE_NAME變量,指示當(dāng)前文件中的事件,是由哪個app產(chǎn)生的
- 就解釋了我們沒有顯式指定ofp_handler這個app,他同樣會被拉起執(zhí)行
controller/dpset.py:handler.register_service('ryu.controller.dpset')
controller/handler.py:def register_service(service):
controller/ofp_event.py:handler.register_service('ryu.controller.ofp_handler')
services/protocols/ovsdb/event.py:handler.register_service('ryu.services.protocols.ovsdb.manager')
services/protocols/vrrp/event.py:handler.register_service('ryu.services.protocols.vrrp.manager')
topology/event.py:handler.register_service('ryu.topology.switches')
94
95
96 def _has_caller(meth):
97 return hasattr(meth, 'callers')
98
99
100 def _listify(may_list):
101 if may_list is None:
102 may_list = []
103 if not isinstance(may_list, list):
104 may_list = [may_list]
105 return may_list
106
107
108 def register_instance(i):
109 for _k, m in inspect.getmembers(i, inspect.ismethod):
110 # LOG.debug('instance %s k %s m %s', i, _k, m)
111 if _has_caller(m):
112 for ev_cls, c in m.callers.items():
113 i.register_handler(ev_cls, m)
114
115
116 def _is_method(f):
117 return inspect.isfunction(f) or inspect.ismethod(f)
118
119
- 最后是本文件剩余的部分,除了幾個工具函數(shù),還有一個register_instance函數(shù)
- 根據(jù)傳參具有caller屬性的方法知道,入?yún)是一個app,而在113行調(diào)用了app的register_handler方法
- 搜索后得知,app_manager會在_instantiate函數(shù),即對app進行初始化時,
- 調(diào)用如下的register_app函數(shù),其中調(diào)用了我們現(xiàn)在關(guān)注的register_instance函數(shù)
- 因此在本節(jié)事件管理分析完畢后,會接著分析事件路由
def register_app(app):
assert isinstance(app, RyuApp)
assert app.name not in SERVICE_BRICKS
SERVICE_BRICKS[app.name] = app
register_instance(app)
事件管理以及依賴計算小結(jié)
- 編寫特定協(xié)議的event模塊時,在文件中執(zhí)行register_service函數(shù),表明當(dāng)前模塊中的event們是由哪個app產(chǎn)生
- 當(dāng)我們的app導(dǎo)入了對應(yīng)的event模塊時,就已經(jīng)執(zhí)行了register_service
- 當(dāng)我們的函數(shù)使用set_ev_cls修飾,并指定監(jiān)聽特定的event時,修飾器會在這個函數(shù)的callers變量中添加相應(yīng)的監(jiān)聽事件信息
- 在run我們編寫的app時,app_manager會使用get_dependent_services函數(shù)尋找當(dāng)前app監(jiān)聽的事件
- 從而找到這個事件所屬的event模塊,再找到event模塊的_SERVICE_NAME變量
- 最終得知如果我要監(jiān)聽在這個事件的話,需要拉起這個app,從而讓其產(chǎn)生這樣的事件以供我消費
- 如果我們沒有在event模塊中對事件生產(chǎn)者進行注冊,或者使用的是set_ev_handler修飾器,那么ryu也就不會自動導(dǎo)入拉起對應(yīng)的app了
事件收發(fā)
- 事件的收發(fā)是多個app在運行階段,進行的事件接收發(fā)送
ryu/ryu/base/app_manager.py
91 class RyuApp(object):
...省略文檔
105
106 _CONTEXTS = {}
...省略文檔
124
125 _EVENTS = []
...省略文檔
132 OFP_VERSIONS = None
...省略文檔
146 @classmethod
147 def context_iteritems(cls):
148 """
149 Return iterator over the (key, contxt class) of application context
150 """
151 return iter(cls._CONTEXTS.items())
153 def __init__(self, *_args, **_kwargs):
154 super(RyuApp, self).__init__()
155 self.name = self.__class__.__name__
156 self.event_handlers = {} # ev_cls -> handlers:list
157 self.observers = {} # ev_cls -> observer-name -> states:set
158 self.threads = []
159 self.main_thread = None
160 self.events = hub.Queue(128)
161 self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
162 if hasattr(self.__class__, 'LOGGER_NAME'):
163 self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
164 else:
165 self.logger = logging.getLogger(self.name)
166 self.CONF = cfg.CONF
167
168 # prevent accidental creation of instances of this class outside RyuApp
169 class _EventThreadStop(event.EventBase):
170 pass
171 self._event_stop = _EventThreadStop()
172 self.is_active = True
- 涉及到了app之間的事件收發(fā),就不得不補一下前面剩下還沒有分析的RyuAPP的app基類
- app基類中定義了app與app間收、發(fā)事件的函數(shù),也定義了app在協(xié)程中運行的事件循環(huán)
- 在app基類的開頭首先定義了類屬性_CONTEXT,我們已經(jīng)很熟悉了,是用來定義本app所依賴的外部類的字典
- 還定義了_EVENTS類屬性用來定義本app能夠產(chǎn)生的事件,后面還會詳細(xì)分析
- OFP_VERSIONS是本app工作于的openflow版本
- app基類初始化函數(shù)中
- event_handlers變量存儲接事件到函數(shù)的映射
- observers是對本app產(chǎn)生的事件監(jiān)聽的app們,也就是本app的消費者
- threads是本app產(chǎn)生的多個協(xié)程
- main_thread用于main函數(shù)join,main_thread退出了才是真推出
- events變量是個隊列,用于存儲其他app發(fā)送來的事件
- 其他變量見名知意
194
195 def register_handler(self, ev_cls, handler):
196 assert callable(handler)
197 self.event_handlers.setdefault(ev_cls, [])
198 self.event_handlers[ev_cls].append(handler)
199
200 def unregister_handler(self, ev_cls, handler):
201 assert callable(handler)
202 self.event_handlers[ev_cls].remove(handler)
203 if not self.event_handlers[ev_cls]:
204 del self.event_handlers[ev_cls]
...省略發(fā)送相關(guān)函數(shù),以及協(xié)程相關(guān)函數(shù)
229 def get_handlers(self, ev, state=None):
230 """Returns a list of handlers for the specific event.
231
232 :param ev: The event to handle.
233 :param state: The current state. ("dispatcher")
234 If None is given, returns all handlers for the event.
235 Otherwise, returns only handlers that are interested
236 in the specified state.
237 The default is None.
238 """
239 ev_cls = ev.__class__
240 handlers = self.event_handlers.get(ev_cls, [])
241 if state is None:
242 return handlers
243
244 def test(h):
245 if not hasattr(h, 'callers') or ev_cls not in h.callers:
246 # dynamically registered handlers does not have
247 # h.callers element for the event.
248 return True
249 states = h.callers[ev_cls].dispatchers
250 if not states:
251 # empty states means all states
252 return True
253 return state in states
254
255 return filter(test, handlers)
- 接下來分析app基類中事件鉤子相關(guān)的函數(shù)
- 首先我們看到了前文提到的調(diào)用鏈,即instantiate_apps -> _instantiate -> register_app -> register_instance -> register_handler,中的最后一環(huán)register_handler
- register_handler做的事情也很簡單,把本app中各函數(shù)的callers收集起來,整理成一個字典,也就是self.event_handlers
- unregister_handler也就是相應(yīng)的刪除函數(shù)了
- 229行g(shù)et_handlers函數(shù),獲取相應(yīng)ev所對應(yīng)的handlers,并進行dispatcher階段的過濾
206 def register_observer(self, ev_cls, name, states=None):
207 states = states or set()
208 ev_cls_observers = self.observers.setdefault(ev_cls, {})
209 ev_cls_observers.setdefault(name, set()).update(states)
210
211 def unregister_observer(self, ev_cls, name):
212 observers = self.observers.get(ev_cls, {})
213 observers.pop(name)
214
215 def unregister_observer_all_event(self, name):
216 for observers in self.observers.values():
217 observers.pop(name, None)
218
219 def observe_event(self, ev_cls, states=None):
220 brick = _lookup_service_brick_by_ev_cls(ev_cls)
221 if brick is not None:
222 brick.register_observer(ev_cls, self.name, states)
223
224 def unobserve_event(self, ev_cls):
225 brick = _lookup_service_brick_by_ev_cls(ev_cls)
226 if brick is not None:
227 brick.unregister_observer(ev_cls, self.name)
...省略其他函數(shù)
256
257 def get_observers(self, ev, state):
258 observers = []
259 for k, v in self.observers.get(ev.__class__, {}).items():
260 if not state or not v or state in v:
261 observers.append(k)
262
263 return observers
264
...省略其他函數(shù)
320 def send_event_to_observers(self, ev, state=None):
321 """
322 Send the specified event to all observers of this RyuApp.
323 """
324
325 for observer in self.get_observers(ev, state):
326 self.send_event(observer, ev, state)
- 接下來分析app基類中注冊事件相關(guān)的函數(shù)
- self.observers是一個字典,key是本app能夠產(chǎn)生的事件類型,value還是一個字典,key是監(jiān)聽這個事件的app名字,value是他指定的監(jiān)聽生效的階段
- register_observer與unregister_observer函數(shù)被其他邏輯調(diào)用,從而向本app注冊或取消注冊事件的目的地
- observe_event和unobserve_event則被本app調(diào)用,調(diào)用對方app的register_observer,向其他app聲明自己的監(jiān)聽
- 320行send_event_to_observers被本app中產(chǎn)生事件的函數(shù)所調(diào)用,向已經(jīng)跟本app注冊了監(jiān)聽的app們發(fā)送事件
265 def send_request(self, req):
266 """
267 Make a synchronous request.
268 Set req.sync to True, send it to a Ryu application specified by
269 req.dst, and block until receiving a reply.
270 Returns the received reply.
271 The argument should be an instance of EventRequestBase.
272 """
273
274 assert isinstance(req, EventRequestBase)
275 req.sync = True
276 req.reply_q = hub.Queue()
277 self.send_event(req.dst, req)
278 # going to sleep for the reply
279 return req.reply_q.get()
...省略其他函數(shù)
301 def _send_event(self, ev, state):
302 self._events_sem.acquire()
303 self.events.put((ev, state))
304
305 def send_event(self, name, ev, state=None):
306 """
307 Send the specified event to the RyuApp instance specified by name.
308 """
309
310 if name in SERVICE_BRICKS:
311 if isinstance(ev, EventRequestBase):
312 ev.src = self.name
313 LOG.debug("EVENT %s->%s %s",
314 self.name, name, ev.__class__.__name__)
315 SERVICE_BRICKS[name]._send_event(ev, state)
316 else:
317 LOG.debug("EVENT LOST %s->%s %s",
318 self.name, name, ev.__class__.__name__)
319
320 def send_event_to_observers(self, ev, state=None):
321 """
322 Send the specified event to all observers of this RyuApp.
323 """
324
325 for observer in self.get_observers(ev, state):
326 self.send_event(observer, ev, state)
327
328 def reply_to_request(self, req, rep):
329 """
330 Send a reply for a synchronous request sent by send_request.
331 The first argument should be an instance of EventRequestBase.
332 The second argument should be an instance of EventReplyBase.
333 """
334
335 assert isinstance(req, EventRequestBase)
336 assert isinstance(rep, EventReplyBase)
337 rep.dst = req.src
338 if req.sync:
339 req.reply_q.put(rep)
340 else:
341 self.send_event(rep.dst, rep)
- 接下來分析app基類中事件發(fā)送相關(guān)的函數(shù)
- 305行是剛剛send_event_to_observers中調(diào)用的send_event函數(shù)
- send_event在設(shè)置了event的src后,調(diào)用了目的app的_send_event函數(shù)
- _send_event只需要在app的events列表中put相應(yīng)的事件
- 最后265和328行是ryu基于當(dāng)前even機制實現(xiàn)的同步的請求響應(yīng)機制
- 請求時發(fā)送的事件中包含了同步請求的標(biāo)識,還包含了取得結(jié)果的返回隊列
- 響應(yīng)時只需把向req帶來的隊列中put返回值即可
174 def start(self):
175 """
176 Hook that is called after startup initialization is done.
177 """
178 self.threads.append(hub.spawn(self._event_loop))
179
180 def stop(self):
181 if self.main_thread:
182 hub.kill(self.main_thread)
183 self.is_active = False
184 self._send_event(self._event_stop, None)
185 hub.joinall(self.threads)
186
187 def set_main_thread(self, thread):
188 """
189 Set self.main_thread so that stop() can terminate it.
190
191 Only AppManager.instantiate_apps should call this function.
192 """
193 self.main_thread = thread
...省略其他事件收發(fā)相關(guān)函數(shù)
281 def _event_loop(self):
282 while self.is_active or not self.events.empty():
283 ev, state = self.events.get()
284 self._events_sem.release()
285 if ev == self._event_stop:
286 continue
287 handlers = self.get_handlers(ev, state)
288 for handler in handlers:
289 try:
290 handler(ev)
291 except hub.TaskExit:
292 # Normal exit.
293 # Propagate upwards, so we leave the event loop.
294 raise
295 except:
296 LOG.exception('%s: Exception occurred during handler processing. '
297 'Backtrace from offending handler '
298 '[%s] servicing event [%s] follows.',
299 self.name, handler.__name__, ev.__class__.__name__)
- 接下來分析app基類中事件接收相關(guān)的函數(shù)
- 還記得我們在分析app_manager的instantiate_apps函數(shù)時,對每個app都調(diào)用了start,并將返回值作為協(xié)程句柄返回以供main函數(shù)join
- 174行可以看到start函數(shù)做的事情只有在hub中添加本app的事件循環(huán),并且如果不繼承覆蓋start函數(shù)的話,并不會返回協(xié)程句柄
- 180行stop函數(shù)則進行相應(yīng)的kill
- 281行就是本app的事件循環(huán)
- 282行可以看出本app處于活躍階段,并且events隊列中有事件時,才會進入循環(huán)
- 283行從隊列中取出事件,287行從本app的handlers變量取出ev事件對應(yīng)的事件處理函數(shù)
- 最后288行依次執(zhí)行這個event對應(yīng)的handler,完成事件的接收工作
451 def _update_bricks(self):
452 for i in SERVICE_BRICKS.values():
453 for _k, m in inspect.getmembers(i, inspect.ismethod):
454 if not hasattr(m, 'callers'):
455 continue
456 for ev_cls, c in m.callers.items():
457 if not c.ev_source:
458 continue
459
460 brick = _lookup_service_brick_by_mod_name(c.ev_source)
461 if brick:
462 brick.register_observer(ev_cls, i.name,
463 c.dispatchers)
464
465 # allow RyuApp and Event class are in different module
466 for brick in SERVICE_BRICKS.values():
467 if ev_cls in brick._EVENTS:
468 brick.register_observer(ev_cls, i.name,
469 c.dispatchers)
- 最后還有一個疑問是,我們知道了handler變量通過怎么樣的調(diào)用鏈完成初始化,那么observers呢?
- 前文提到了app_manager中instantiate_apps函數(shù)調(diào)用了_update_bricks,這里回顧_update_bricks函數(shù),前面我們沒有詳細(xì)分析,現(xiàn)在可以回頭再來看一下
- 452行遍歷了BRICK,即app
- 453行遍歷了app的所有方法
- 454行過濾留下有callers屬性,即監(jiān)聽了事件的函數(shù)
- 456行過濾掉沒有事件來源的事件
- 460行尋找產(chǎn)生這個事件的模塊
- 462行調(diào)用前文提到的register_observer,向?qū)Ψ絘pp表明自己的存在,聲明自己的監(jiān)聽事件和監(jiān)聽階段
- 最后466行,如果事件的產(chǎn)生者并不在event這個類的模塊里,ryu還會尋找所有brick(app)的_EVENTS屬性
- 如果app在_EVENTS中聲明了自己會產(chǎn)生這個事件的話,那么同樣在這個app進行注冊
- 這也就填坑了前面沒有細(xì)說的_EVENTS,就是在這里對其進行了讀取使用
- 因此如果我們自己要編寫新的event類型,同時event定義與事件產(chǎn)生的app在不同模塊,就需要要在_EVENTS指定事件信息
事件收發(fā)小結(jié)
- app的handler和observers兩個變量均在instantiate_apps階段完成變量的計算和賦值,為后續(xù)的事件路由提供信息
- 事件的接收是通過app基類中的事件循環(huán)完成的
- 事件的發(fā)送是通過本app的某函數(shù)主動調(diào)用send_event_to_observers等函數(shù)實現(xiàn)的
TODO