ryu源碼閱讀(一)

資料

  • ryu官方文檔
    • 英文資料可以從官方文檔中的各個資源鏈接深入
  • ryu的python文檔
    • 既可了解整體架構(gòu),又可查詢詳細(xì)方法和參數(shù)
  • ryubook
    • 主要講了ryu使用案例,路由器防火墻等等
  • sdnlab
    • sdnlab是比較好的中文資料來源,里面有很多源碼解讀、操作使用上的文章

起因

  • 在學(xué)校做網(wǎng)絡(luò)安全或者軟件定義網(wǎng)絡(luò)等領(lǐng)域的研究時,并不太關(guān)心所使用的sdn控制器的水平擴展、對數(shù)據(jù)中心的支持等
  • 更多的場景就是幾臺甚至一臺主機的ovs上對理論模型進行實驗仿真,得到仿真結(jié)果后就可以寫在論文里了
  • 我這次遇到的場景是需要在軟件交換機ovs中做802.1x認(rèn)證者的邏輯,不涉及什么創(chuàng)新
  • 但還是想把過程中對ryu的學(xué)習(xí)進行總結(jié),后面如果再用的話,爭取現(xiàn)在留下的文章能夠讓我快速重新?lián)炱饋?/li>
  • 本篇初探主要內(nèi)容是ryu主線源碼解讀,ryu的設(shè)計很優(yōu)美,但如果不去讀源碼的話,很多約定會覺得像魔法
  • 此外ryu并發(fā)模型使用的協(xié)程,導(dǎo)致不讀源碼的話,編寫的app如果使用多線程,調(diào)試的時候也可能會一頭霧水

功能

  • 在讀ryu源碼前,可以想象一下sdn控制器要做的事情都有哪些
  • 帶著ryu如何實現(xiàn)這些功能的疑問去讀源碼會更加清晰
  • 我認(rèn)為的幾個主要的功能點:
    1. 通過openflow協(xié)議管理多個交換機
    2. 通過netconf協(xié)議管理多個交換機
    3. 可選支持ovsdb等協(xié)議完善控制能力
    4. 支持用戶編寫數(shù)據(jù)包處理邏輯,并提供合適的框架簡化代碼
    5. 可以同時管理運行用戶編寫的多個sdn應(yīng)用,并使其協(xié)作
    6. 最好能對http restful api提供框架支持
    7. 統(tǒng)一的網(wǎng)絡(luò)并發(fā)模型管理上述涉及到的網(wǎng)絡(luò)通信
    • 對接交換機
    • 對接restapi調(diào)用者
    • 多應(yīng)用之間進行內(nèi)部通信協(xié)作
    • 通信方式擴展,如unix socket、AMPQ協(xié)議等
  1. 可選實現(xiàn)一些數(shù)據(jù)中心常見的控制協(xié)議,比如bgp

閱讀

一級文件目錄

╭─root@testdevice ~/ryu ?master?
╰─# tree -L 1
.
├── CONTRIBUTING.rst
├── LICENSE
├── MANIFEST.in
├── README.rst
├── bin
├── debian
├── doc
├── etc
├── run_tests.sh
├── ryu
├── setup.cfg
├── setup.py
├── tools
└── tox.ini
  • 先來看文件目錄,幾個rst我們不用關(guān)心
  • bin文件夾里面是ryu用來啟動的可執(zhí)行腳本
from ryu.cmd.manager import main
main()
  • 可以看下ryu-manager啟動腳本內(nèi)容,除了開源許可注釋,只有兩行,從ryu包的cmd.manager導(dǎo)入了main函數(shù)并執(zhí)行
  • 因此我們也就知道了,我們通過ryu-manager --verbose myapp.py命令啟動的控制器實例,是由cmd.manager.main開始的
  • bin文件夾中還有一個ryu可執(zhí)行腳本,允許通過run的subcommand執(zhí)行ryu-manager的邏輯ryu run myapp.py
    • 此外ryu命令通過調(diào)用yu.cmd路徑下的其他函數(shù),實現(xiàn)了[run|of-config-cli|rpc-cli]這三個子命令

ryu二級目錄

╭─root@testdevice ~/ryu/ryu ?master?
╰─# tree -L 1
.
├── __init__.py
├── app
├── base
├── cfg.py
├── cmd
├── contrib
├── controller
├── exception.py
├── flags.py
├── hooks.py
├── lib
├── log.py
├── ofproto
├── services
├── tests
├── topology
└── utils.py

10 directories, 7 files
  • 可以看到文件內(nèi)容不少,但都是實現(xiàn)ryu控制器的核心邏輯代碼
  • 接下來我們繼續(xù)跟啟動的主線

啟動主線

ryu/ryu/cmd/manager.py

包導(dǎo)入

    19  import os
    20  import sys
    21
    22  from ryu.lib import hub
    23  hub.patch(thread=False)
    24
    25  from ryu import cfg
    26
    27  import logging
    28  from ryu import log
    29  log.early_init_log(logging.DEBUG)
    30
    31  from ryu import flags
    32  from ryu import version
    33  from ryu.app import wsgi
    34  from ryu.base.app_manager import AppManager
    35  from ryu.controller import controller
    36  from ryu.topology import switches
  • 注意在入口文件的第四行就已經(jīng)通過ryu.lib.hub引入了協(xié)程庫,并對除線程以外的庫進行了patch
    • patch即為將目標(biāo)模塊,替換為協(xié)程形式的實現(xiàn),但實現(xiàn)的并不完美,有一些地方會導(dǎo)致bug
  • 中間幾行導(dǎo)入了配置管理和日志管理模塊
  • 最后幾行引入了
    • wsgi:用于提供web服務(wù)
    • AppManager:管理多app
    • controller:openflow控制器,使其能夠控制交換機的核心功能
    • switches:未知?。?!

配置管理

    37
    38
    39  CONF = cfg.CONF
    40  CONF.register_cli_opts([
    41      cfg.ListOpt('app-lists', default=[],
    42                  help='application module name to run'),
    43      cfg.MultiStrOpt('app', positional=True, default=[],
    44                      help='application module name to run'),
    45      cfg.StrOpt('pid-file', default=None, help='pid file name'),
    46      cfg.BoolOpt('enable-debugger', default=False,
    47                  help='don\'t overwrite Python standard threading library'
    48                  '(use only for debugging)'),
    49      cfg.StrOpt('user-flags', default=None,
    50                 help='Additional flags file for user applications'),
    51  ])
    52
    53
    54  def _parse_user_flags():
    55      """
    56      Parses user-flags file and loads it to register user defined options.
    57      """
    58      try:
    59          idx = list(sys.argv).index('--user-flags')
    60          user_flags_file = sys.argv[idx + 1]
    61      except (ValueError, IndexError):
    62          user_flags_file = ''
    63
    64      if user_flags_file and os.path.isfile(user_flags_file):
    65          from ryu.utils import _import_module_file
    66          _import_module_file(user_flags_file)
    67
    68
    69  def main(args=None, prog=None):
    70      _parse_user_flags()
    71      try:
    72          CONF(args=args, prog=prog,
    73               project='ryu', version='ryu-manager %s' % version,
    74               default_config_files=['/usr/local/etc/ryu/ryu.conf'])
    75      except cfg.ConfigFilesNotFoundError:
    76          CONF(args=args, prog=prog,
    77               project='ryu', version='ryu-manager %s' % version)
    78
    79      log.init_log()
    80      logger = logging.getLogger(__name__)
    81
  • 配置管理和日志管理不是本篇重點,就不贅述
  • 思考了一下還是把完整的內(nèi)容貼在這里,讓以后的我和讀者有個直觀的感受
  • 因為我在讀其他人的分析源碼的的文章的時候,經(jīng)常出現(xiàn)的疑問就是,這段代碼在哪?

核心邏輯

    82      if CONF.enable_debugger:
    83          msg = 'debugging is available (--enable-debugger option is turned on)'
    84          logger.info(msg)
    85      else:
    86          hub.patch(thread=True)
    87
    88      if CONF.pid_file:
    89          with open(CONF.pid_file, 'w') as pid_file:
    90              pid_file.write(str(os.getpid()))
    91
    92      app_lists = CONF.app_lists + CONF.app
    93      # keep old behavior, run ofp if no application is specified.
    94      if not app_lists:
    95          app_lists = ['ryu.controller.ofp_handler']
    96
    97      app_mgr = AppManager.get_instance()
    98      app_mgr.load_apps(app_lists)
    99      contexts = app_mgr.create_contexts()
   100      services = []
   101      services.extend(app_mgr.instantiate_apps(**contexts))
   102
   103      webapp = wsgi.start_service(app_mgr)
   104      if webapp:
   105          thr = hub.spawn(webapp)
   106          services.append(thr)
   107
   108      try:
   109          hub.joinall(services)
   110      except KeyboardInterrupt:
   111          logger.debug("Keyboard Interrupt received. "
   112                       "Closing RYU application manager...")
   113      finally:
   114          app_mgr.close()
   115
   116
   117  if __name__ == "__main__":
   118      main()
  • 首先一個比較有意思的是82行會對是否啟動debugger調(diào)試模式進行判斷,如果啟動了的話,就不會對線程進行協(xié)程patch
    • 因此如果我們寫的app代碼調(diào)用的庫不得不調(diào)用多線程時,可以嘗試開啟debugger,看情況是否有所變化
  • 接下來92到95行,讀取配置文件或命令行中指定的app列表,如果沒有指定,那么會運行ryu.controller.ofp_handler
    • ryu.controller.ofp_handler就是控制器與交換機進行openflow協(xié)議交互的核心app
    • 那么如果我們指定了自己的app,就不會運行ryu.controller.ofp_handler這樣核心的邏輯了嗎?
    • 后面我們會發(fā)現(xiàn),就算app_lists不是空,我們依舊會在運行自己指定app的同時,運行起來ryu.controller.ofp_handler
    • 這就涉及到了ryu對app的依賴管理和導(dǎo)入機制的設(shè)計,如果不看源碼可能會很膜法
  • 下面97行,get_instance可以猜個大概app_manager使用的單例模式
  • 98行,調(diào)用manager的load_apps函數(shù)把要運行的app的py代碼都動態(tài)的加載進內(nèi)存
  • 99行創(chuàng)建并初始化了應(yīng)用上下文,后面會詳細(xì)說明
    • 這里可以簡單理解為我們編寫的app以來的其他的庫
    • ryu會為我們初始化好,并傳入供我們的app調(diào)用
    • context是個字典,注意相同名字的依賴會只初始化一次,可以存在多app都調(diào)用同一個context的情況,因此也可作為多app共享內(nèi)容的方式之一
  • 100-101行,services是一個列表,存儲了app_manager初始化所有app后返回的內(nèi)容
  • 103-106行,wsgi啟動了他自己的服務(wù),
    • 如果初始化成功,會通過hub.spawn注冊進協(xié)程管理
    • 并將返回結(jié)果append到了services里
    • 由此我們可以理解,services里其實都是hub.spawn返回的每個協(xié)程的句柄
  • 最后的108行開始
    • 嘗試hub.joinall等待所有的協(xié)程實例運行結(jié)束
    • 如果遇到了ctrl+c,或者遇到異常,無論如何調(diào)用app_manager的close方法進行清理
    • app_manager作為app們的管理者,同樣會調(diào)用各app的close方法,完成樹狀的資源清理
  • 92行到114行,這短短的二十多行就是ryu運行控制器核心邏輯的樹根,后面的一切邏輯上的樹枝,都由此而來
    • 其中的邏輯又可主要分為幾大塊:
      • app讀取、拉起,涉及到app間的依賴管理
      • app對其他類實例的依賴,涉及到app對其他類的依賴
      • wsgi對app中涉及restful應(yīng)用的管理
      • 協(xié)程貫穿在ryu中的使用
    • 接下來我們就對其逐一進行分析

app依賴管理

  • 啟動主線中的app_manager是從ryu/ryu/base/app_manager.py中實例化的
  • 而base這個文件夾中,只有一個app_manager.py,可見其重要性
  • 其中不止實現(xiàn)了app管理器,還規(guī)定了了我們編寫的app需要繼承的基類

ryu/ryu/base/app_manager.py

包導(dǎo)入

    26  import inspect
    27  import itertools
    28  import logging
    29  import sys
    30  import os
    31  import gc
    32
    33  from ryu import cfg
    34  from ryu import utils
    35  from ryu.app import wsgi
    36  from ryu.controller.handler import register_instance, get_dependent_services
    37  from ryu.controller.controller import Datapath
    38  from ryu.controller import event
    39  from ryu.controller.event import EventRequestBase, EventReplyBase
    40  from ryu.lib import hub
    41  from ryu.ofproto import ofproto_protocol
    42
    43  LOG = logging.getLogger('ryu.base.app_manager')
    44
    45  SERVICE_BRICKS = {}
  • app_manager引入的內(nèi)容與剛剛cmd中的manager類似
  • 需要注意37行導(dǎo)入了datapath,用于管理網(wǎng)橋的庫
  • 以及38-39行引入的ryu的事件管理庫
  • 42行進行了日志配置
  • 45行文件全局變量SERVICE_BRICKS這個字典,是一個app名稱映射到app實例的字典
    • 但為什么不叫app,突然改叫brick,是閱讀過程中比較困惑的地方
    • 猜測是app在event管理路由這個場景就給他換了個名字
    • 后面會多處出現(xiàn)對這個字典的操作

app管理器

  • 注意45行后到351行間是app基類與工具方法,此處跳躍進行介紹
   351  class AppManager(object):
   352      # singleton
   353      _instance = None
   354
   355      @staticmethod
   356      def run_apps(app_lists):
   357          """Run a set of Ryu applications
   358
   359          A convenient method to load and instantiate apps.
   360          This blocks until all relevant apps stop.
   361          """
   362          app_mgr = AppManager.get_instance()
   363          app_mgr.load_apps(app_lists)
   364          contexts = app_mgr.create_contexts()
   365          services = app_mgr.instantiate_apps(**contexts)
   366          webapp = wsgi.start_service(app_mgr)
   367          if webapp:
   368              services.append(hub.spawn(webapp))
   369          try:
   370              hub.joinall(services)
   371          finally:
   372              app_mgr.close()
   373              for t in services:
   374                  t.kill()
   375              hub.joinall(services)
   376              gc.collect()
   377
   378      @staticmethod
   379      def get_instance():
   380          if not AppManager._instance:
   381              AppManager._instance = AppManager()
   382          return AppManager._instance
   383
   384      def __init__(self):
   385          self.applications_cls = {}
   386          self.applications = {}
   387          self.contexts_cls = {}
   388          self.contexts = {}
   389          self.close_sem = hub.Semaphore()
  • 首先看379行即我們看到在上面的代碼中調(diào)用的實例獲取函數(shù),也確實是單實例模式
  • 再看384行的初始化函數(shù),不需要傳入任何參數(shù),僅對app和context的加載所需的變量進行初始化
  • 而這個run_apps函數(shù),與我們在manager.py中的run_apps函數(shù)名字都一樣,做的事情也一致
    • 經(jīng)過搜索調(diào)用此函數(shù)的文件后,發(fā)現(xiàn)此函數(shù)僅在ryu/ryu/cmd/ofa_neutron_agent.py被使用
    • 猜測當(dāng)openstack的neutron插件使用ryu控制器時,會與平時ryu-manager的邏輯有所不同
   391      def load_app(self, name):
   392          mod = utils.import_module(name)
   393          clses = inspect.getmembers(mod,
   394                                     lambda cls: (inspect.isclass(cls) and
   395                                                  issubclass(cls, RyuApp) and
   396                                                  mod.__name__ ==
   397                                                  cls.__module__))
   398          if clses:
   399              return clses[0][1]
   400          return None
   401
   402      def load_apps(self, app_lists):
   403          app_lists = [app for app
   404                       in itertools.chain.from_iterable(app.split(',')
   405                                                        for app in app_lists)]
   406          while len(app_lists) > 0:
   407              app_cls_name = app_lists.pop(0)
   408
   409              context_modules = [x.__module__ for x in self.contexts_cls.values()]
   410              if app_cls_name in context_modules:
   411                  continue
   412
   413              LOG.info('loading app %s', app_cls_name)
   414
   415              cls = self.load_app(app_cls_name)
   416              if cls is None:
   417                  continue
   418
   419              self.applications_cls[app_cls_name] = cls
   420
   421              services = []
   422              for key, context_cls in cls.context_iteritems():
   423                  v = self.contexts_cls.setdefault(key, context_cls)
   424                  assert v == context_cls
   425                  context_modules.append(context_cls.__module__)
   426
   427                  if issubclass(context_cls, RyuApp):
   428                      services.extend(get_dependent_services(context_cls))
   429
   430              # we can't load an app that will be initiataed for
   431              # contexts.
   432              for i in get_dependent_services(cls):
   433                  if i not in context_modules:
   434                      services.append(i)
   435              if services:
   436                  app_lists.extend([s for s in set(services)
   437                                    if s not in app_lists])
  • 接下來就是關(guān)鍵的app加載相關(guān)的函數(shù)
  • 上面的分析我們知道m(xù)anager.py調(diào)用app_manager的load_apps作為加載app的入口函數(shù)
  • 首先403行對app_list中的內(nèi)容進行扁平化處理,將逗號分隔的app都拆分成單獨的app重新存入列表
  • 接下的循環(huán)就是加載app類以及處理依賴的邏輯
    • 406行,可以看到循環(huán)的結(jié)束條件是app_list為空
    • 而在407行從app_list中取出一個app進行處理
    • 一般來說這樣使用while 列表不為空來處理列表中的內(nèi)容,都是列表的內(nèi)容會隨著處理而動態(tài)改變
    • 而動態(tài)變化的原因,正是因為只有加載了一個app類之后,才知道他的依賴是什么,才能將其動態(tài)的加入app_list中
      • ryu沒有強硬的規(guī)定依賴關(guān)系必須顯式的寫在配置中或者app類中(除了context)
      • 而是通過調(diào)用關(guān)系,隱含式的給我們實現(xiàn)的類中添加了對應(yīng)的依賴關(guān)系,存疑???
      • 這也是ryu實現(xiàn)非常優(yōu)雅的地方之一
    • 接下來409行確認(rèn)當(dāng)前處理的app是不是只是一個context而已
    • 415行調(diào)用load_app加載單個app
    • 419行可以看出,app_manager.applications_cls的鍵是類名,值是類,而不是實例
      • 注意此階段并沒有對app類進行實例化,只是記錄類的類別
    • 421行初始化了一個列表叫services
      • 這里的services與上文manage.py中用來hub.joinall的services含義完全不同
      • 這里的services僅用于存儲當(dāng)前正在處理的app所依賴的app們
    • 422行迭代app類所聲明的context
      • 如果是context只是一個其他的類,就加入到app_manager的contexts_cls字典中
      • 注意同樣僅存了類的類別,沒有初始化
    • 而425行的context_module列表既存儲了之前app聲明的context,也添加了當(dāng)前app聲明的context
    • 427行,如果context中的類同樣實現(xiàn)了ryu的app_base,那么把他添加到services里,后續(xù)添加到app_list里
      • 說明如果context中聲明的一個依賴同樣實現(xiàn)了ryu的app基類,那么同樣將其以app看待進行加載
    • 432行的get_dependent_services函數(shù)并不在本文件中
      • 但可以根據(jù)其名稱知其含義,即獲取本app所依賴的app們
      • 后續(xù)會對此函數(shù)詳細(xì)解釋
    • 433行的判斷邏輯避免了context中的app類被重復(fù)添加至app_list,而維護context_modules變量的意義即如此
    • 435行將services變量中的app,即本app所依賴的app,添加到待加載的app_list中去
  • 而前面391行的load_app函數(shù),也很簡單,利用python動態(tài)的特性,從對應(yīng)對的模塊中找繼承了app基類的那個用戶實現(xiàn)的app類
   439      def create_contexts(self):
   440          for key, cls in self.contexts_cls.items():
   441              if issubclass(cls, RyuApp):
   442                  # hack for dpset
   443                  context = self._instantiate(None, cls)
   444              else:
   445                  context = cls()
   446              LOG.info('creating context %s', key)
   447              assert key not in self.contexts
   448              self.contexts[key] = context
   449          return self.contexts
  • 接下來create_contexts,也是前文manage.py中調(diào)用的函數(shù)
  • 從內(nèi)容可以看出本函數(shù)對context中記錄的類進行是初始化
  • 將各類的實例以kv的形式存儲在app_manager.context中,并最后返回這個字典
  • 所以在load_apps中context進行了加載,而本函數(shù)進行了context的初始化,生成類的實例
   451      def _update_bricks(self):
   452          for i in SERVICE_BRICKS.values():
   453              for _k, m in inspect.getmembers(i, inspect.ismethod):
   454                  if not hasattr(m, 'callers'):
   455                      continue
   456                  for ev_cls, c in m.callers.items():
   457                      if not c.ev_source:
   458                          continue
   459
   460                      brick = _lookup_service_brick_by_mod_name(c.ev_source)
   461                      if brick:
   462                          brick.register_observer(ev_cls, i.name,
   463                                                  c.dispatchers)
   464
   465                      # allow RyuApp and Event class are in different module
   466                      for brick in SERVICE_BRICKS.values():
   467                          if ev_cls in brick._EVENTS:
   468                              brick.register_observer(ev_cls, i.name,
   469                                                      c.dispatchers)
   470
   471      @staticmethod
   472      def _report_brick(name, app):
   473          LOG.debug("BRICK %s", name)
   474          for ev_cls, list_ in app.observers.items():
   475              LOG.debug("  PROVIDES %s TO %s", ev_cls.__name__, list_)
   476          for ev_cls in app.event_handlers.keys():
   477              LOG.debug("  CONSUMES %s", ev_cls.__name__)
   478
   479      @staticmethod
   480      def report_bricks():
   481          for brick, i in SERVICE_BRICKS.items():
   482              AppManager._report_brick(brick, i)
   483
  • 上面的update_brickcs函數(shù)涉及到了ryu的一個魔法設(shè)計,即caller,后面還會詳細(xì)說
  • 下面的兩個report_bricks是用來打印當(dāng)前控制器所管理的bricks,而這個磚頭就是從文件開頭的全局變量取出
  • 而bricks磚頭,對應(yīng)著我們指定的app,以及我們的app所依賴的app
  • 如下是我運行ryu時verbose模式下打印的BRICK內(nèi)容
    • 其中openflow、hostapd_wif、hostapd_eth、pusher是我編寫的app
    • 而ofp_event對應(yīng)著我們的app依賴的app:ryu.controller.ofp_handler
  • 下面更重要的內(nèi)容是每個BRICK,即app,消費以及產(chǎn)生的事件類型,對于了解多app情況下的event路由情況非常有幫助
    • 其中PROVIDES EventX To {'openflow': {'main'}}中的openflow指的是app名字,main指定是當(dāng)前控制器所處于的狀態(tài)disaptcher,如config配置狀態(tài),main運行狀態(tài)
BRICK openflow
  CONSUMES EventOFPPacketIn
  CONSUMES EventJoin
  CONSUMES EventLeave
  CONSUMES EventOFPSwitchFeatures
BRICK hostapd_wif
  PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
  PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK hostapd_eth
  PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
  PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK pusher
  CONSUMES EventJoin
  CONSUMES EventLeave
BRICK ofp_event
  PROVIDES EventOFPPacketIn TO {'openflow': {'main'}}
  PROVIDES EventOFPSwitchFeatures TO {'openflow': {'config'}}
  CONSUMES EventOFPEchoReply
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPHello
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPPortStatus
  CONSUMES EventOFPSwitchFeatures
   484      def _instantiate(self, app_name, cls, *args, **kwargs):
   485          # for now, only single instance of a given module
   486          # Do we need to support multiple instances?
   487          # Yes, maybe for slicing.
   488          LOG.info('instantiating app %s of %s', app_name, cls.__name__)
   489
   490          if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
   491              ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
   492
   493          if app_name is not None:
   494              assert app_name not in self.applications
   495          app = cls(*args, **kwargs)
   496          register_app(app)
   497          assert app.name not in self.applications
   498          self.applications[app.name] = app
   499          return app
   500
   501      def instantiate(self, cls, *args, **kwargs):
   502          app = self._instantiate(None, cls, *args, **kwargs)
   503          self._update_bricks()
   504          self._report_brick(app.name, app)
   505          return app
   506
   507      def instantiate_apps(self, *args, **kwargs):
   508          for app_name, cls in self.applications_cls.items():
   509              self._instantiate(app_name, cls, *args, **kwargs)
   510
   511          self._update_bricks()
   512          self.report_bricks()
   513
   514          threads = []
   515          for app in self.applications.values():
   516              t = app.start()
   517              if t is not None:
   518                  app.set_main_thread(t)
   519                  threads.append(t)
   520          return threads
  • 接下來是用來進行app初始化的函數(shù)
  • 首先manager.py調(diào)用的app初始化入口函數(shù)是instantiate_apps,
    • 在本函數(shù)里可以看到遍歷了app_manager的app class,并調(diào)用_instantiate實際初始化
    • 后面調(diào)用_update_bricks更新app間的事件依賴關(guān)系,并通過report打印結(jié)果
    • 最后如同我們上面分析manager.py中的猜測一樣,instantiate_apps函數(shù)返回了注冊的所有app的協(xié)程句柄,用于在main函數(shù)中統(tǒng)一join
  • 在_instantiate函數(shù)中實現(xiàn)了app類的動態(tài)的實例化,并在實例化后調(diào)用register_app注冊,最后在app_manager自身的application字典中注冊新初始化的app
   522      @staticmethod
   523      def _close(app):
   524          close_method = getattr(app, 'close', None)
   525          if callable(close_method):
   526              close_method()
   527
   528      def uninstantiate(self, name):
   529          app = self.applications.pop(name)
   530          unregister_app(app)
   531          for app_ in SERVICE_BRICKS.values():
   532              app_.unregister_observer_all_event(name)
   533          app.stop()
   534          self._close(app)
   535          events = app.events
   536          if not events.empty():
   537              app.logger.debug('%s events remains %d', app.name, events.qsize())
   538
   539      def close(self):
   540          def close_all(close_dict):
   541              for app in close_dict.values():
   542                  self._close(app)
   543              close_dict.clear()
   544
   545          # This semaphore prevents parallel execution of this function,
   546          # as run_apps's finally clause starts another close() call.
   547          with self.close_sem:
   548              for app_name in list(self.applications.keys()):
   549                  self.uninstantiate(app_name)
   550              assert not self.applications
   551              close_all(self.contexts)
  • 最后是一些關(guān)閉、清理、刪除當(dāng)前app的函數(shù)
  • app_manager至此就分析結(jié)束了

工具方法

    48  def lookup_service_brick(name):
    49      return SERVICE_BRICKS.get(name)
    50
    51
    52  def _lookup_service_brick_by_ev_cls(ev_cls):
    53      return _lookup_service_brick_by_mod_name(ev_cls.__module__)
    54
    55
    56  def _lookup_service_brick_by_mod_name(mod_name):
    57      return lookup_service_brick(mod_name.split('.')[-1])
    58
    59
    60  def register_app(app):
    61      assert isinstance(app, RyuApp)
    62      assert app.name not in SERVICE_BRICKS
    63      SERVICE_BRICKS[app.name] = app
    64      register_instance(app)
    65
    66
    67  def unregister_app(app):
    68      SERVICE_BRICKS.pop(app.name)
    69
    70
    71  def require_app(app_name, api_style=False):
    72      """
    73      Request the application to be automatically loaded.
    74
    75      If this is used for "api" style modules, which is imported by a client
    76      application, set api_style=True.
    77
    78      If this is used for client application module, set api_style=False.
    79      """
    80      iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
    81      modules = [module for module in iterable if module is not None]
    82      if api_style:
    83          m = modules[2]  # skip a frame for "api" module
    84      else:
    85          m = modules[1]
    86      m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
    87      m._REQUIRED_APP.append(app_name)
    88      LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
    89
    90
  • 這些方法在app_manager的函數(shù)中有所調(diào)用
  • 可以看出,注冊方法對應(yīng)字典的賦值,刪除方法對應(yīng)字典的刪除,搜索方法對應(yīng)字典的取值
  • 讀起來SERVICE_BRICKS與app_manager自身的appcalitions沒有什么區(qū)別
  • 可能隨著理解的加深能會慢慢了解這樣的用意叭
  • 最后require_app函數(shù)的邏輯是添加依賴app名字到某個module的_REQUIRED_APP列表
    • _REQUIRED_APP下文我們還會見到

app基類

  • 由于app在app_manager視角只是一個被管理者,在下文中的事件收發(fā)小節(jié)中介紹app基類更好,這里暫且跳過

ryu/ryu/controller/handler.py

120 def get_dependent_services(cls):
121     services = []
122     for _k, m in inspect.getmembers(cls, _is_method):
123         if _has_caller(m):
124             for ev_cls, c in m.callers.items():
125                 service = getattr(sys.modules[ev_cls.__module__],
126                                   '_SERVICE_NAME', None)
127                 if service:
128                     # avoid cls that registers the own events (like
129                     # ofp_handler)
130                     if cls.__module__ != service:
131                         services.append(service)
132
133     m = sys.modules[cls.__module__]
134     services.extend(getattr(m, '_REQUIRED_APP', []))
135     services = list(set(services))
136     return services
  • 現(xiàn)在回來填坑上文所調(diào)用的獲取本app依賴關(guān)系的函數(shù)get_dependent_services
  • 這個函數(shù)位于controller/handler.py,原因是因為涉及到了caller這個ryu用來管理事件依賴的魔法
  • 簡單來說是我們通過set_ev_cls注冊監(jiān)聽事件的時候,ryu會通過caller機制計算哪些app會生產(chǎn)此事件,從而計算出當(dāng)前app所依賴的app有哪些
    • 例如我們指定了一個自己編寫的app,監(jiān)聽了ofp_event.EventOFPPacketIn這個openflow的packet in事件
    • 但這個事件是由ryu.controller.ofp_handler這個app產(chǎn)生的,所以我們的app就對這個app產(chǎn)生了依賴關(guān)系
    • 很好理解,本app使用了其他app產(chǎn)生了事件,自然我們就對其產(chǎn)生了依賴
    • 而ryu正是通過實現(xiàn)了他自己的caller機制,管理這種依賴
  • 134行,可以看出通過設(shè)置_REQUIRED_APP可以顯示的指定我們app所依賴的app
    • 前文提過,require_app函數(shù)是用于增加_REQUIRED_APP列表內(nèi)容的方法
    • 通過grep搜索require_app函數(shù)的調(diào)用,可以看出是用于api調(diào)用方被調(diào)用方之間的依賴指定
    • 所以也就好理解require_app的api_style參數(shù)了
╭─root@testdevice ~/ryu/ryu ?master?
╰─# grep -R require_app
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.rest_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ws_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ofctl_rest')
app/ofctl/api.py:app_manager.require_app('ryu.app.ofctl.service', api_style=True)
base/app_manager.py:def require_app(app_name, api_style=False):
base/app_manager.py:    LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
services/protocols/vrrp/api.py:app_manager.require_app('ryu.services.protocols.vrrp.manager', api_style=True)
topology/api.py:app_manager.require_app('ryu.topology.switches', api_style=True)
  • 除了get_dependent_services,ryu/ryu/controller/handler.py文件其他的函數(shù)或者類的定義,也都是圍繞著caller有關(guān)

app管理小總結(jié)

   420
   421              services = []
   422              for key, context_cls in cls.context_iteritems():
   423                  v = self.contexts_cls.setdefault(key, context_cls)
   424                  assert v == context_cls
   425                  context_modules.append(context_cls.__module__)
   426
   427                  if issubclass(context_cls, RyuApp):
   428                      services.extend(get_dependent_services(context_cls))
   429
   430              # we can't load an app that will be initiataed for
   431              # contexts.
   432              for i in get_dependent_services(cls):
   433                  if i not in context_modules:
   434                      services.append(i)
   435              if services:
   436                  app_lists.extend([s for s in set(services)
   437                                    if s not in app_lists])
  • 經(jīng)過閱讀后可以看出,整個app依賴管理的核心就在十多行行代碼
  • app的依賴來源也讀到了三種
    1. 通過context指定(如果context中是一個app的話)
    2. 通過事件生產(chǎn)消費關(guān)系隱式的指定
    3. 通過app_manager.require_app函數(shù)顯示指定
  • 第一點是在427行進行的判斷
  • 第二三點,是在432行進行的獲取
  • 同時會在433行進行避免第一點與第二三點之間依賴重復(fù)添加的邏輯

事件管理

  • 事件管理正是我們在前文不斷提到的caller機制所實現(xiàn)的

ryu/ryu/controller/handler.py

  • 前文提到的get_dependent_services函數(shù)同樣位于本文件中,現(xiàn)在開始分析caller相關(guān)機制

包引入

 17 import inspect
 18 import logging
 19 import sys
 20
 21 LOG = logging.getLogger('ryu.controller.handler')
 22
 23 # just represent OF datapath state. datapath specific so should be moved.
 24 HANDSHAKE_DISPATCHER = "handshake"
 25 CONFIG_DISPATCHER = "config"
 26 MAIN_DISPATCHER = "main"
 27 DEAD_DISPATCHER = "dead"
  • 導(dǎo)入了三個官方庫后,ryu規(guī)定了四個當(dāng)前運行階段
    • 握手階段、配置階段、運行階段、清理階段

caller

 30 class _Caller(object):
 31     """Describe how to handle an event class.
 32     """
 33
 34     def __init__(self, dispatchers, ev_source):
 35         """Initialize _Caller.
 36
 37         :param dispatchers: A list of states or a state, in which this
 38                             is in effect.
 39                             None and [] mean all states.
 40         :param ev_source: The module which generates the event.
 41                           ev_cls.__module__ for set_ev_cls.
 42                           None for set_ev_handler.
 43         """
 44         self.dispatchers = dispatchers
 45         self.ev_source = ev_source
 46
 47
 48 # should be named something like 'observe_event'
 49 def set_ev_cls(ev_cls, dispatchers=None):
.....省略文檔
 77     def _set_ev_cls_dec(handler):
 78         if 'callers' not in dir(handler):
 79             handler.callers = {}
 80         for e in _listify(ev_cls):
 81             handler.callers[e] = _Caller(_listify(dispatchers), e.__module__)
 82         return handler
 83     return _set_ev_cls_dec
 84
 85
 86 def set_ev_handler(ev_cls, dispatchers=None):
 87     def _set_ev_cls_dec(handler):
 88         if 'callers' not in dir(handler):
 89             handler.callers = {}
 90         for e in _listify(ev_cls):
 91             handler.callers[e] = _Caller(_listify(dispatchers), None)
 92         return handler
 93     return _set_ev_cls_dec
  • 30行見到了caller的真身,只有兩個參數(shù)階段和來源,也沒有任何方法
    • caller可以這樣理解:
      • 事件有生產(chǎn)者和消費者,當(dāng)生產(chǎn)者生產(chǎn)了對應(yīng)的事件后,消費者會觸發(fā)執(zhí)行相應(yīng)的處理函數(shù)
      • 因此生產(chǎn)者就好像消費者的函數(shù)的調(diào)用者一樣,作為一個caller,去call監(jiān)聽這個事件的那些函數(shù)
  • 49行見到了我們熟悉的set_ev_cls,他是一個帶參數(shù)的裝飾器
    • 帶參數(shù)的裝飾器返回值是裝飾器
    • 裝飾器返回值是一個新的函數(shù)
    • set_ev_cls這個裝飾器的作用很簡單
      • 78行判斷被裝飾的函數(shù)有沒有callers屬性
      • 80行遍歷本需要監(jiān)聽的事件列表,并在函數(shù)的caller屬性設(shè)置對應(yīng)的kv
      • k是事件類型,v是30行的Caller類的實例,其中包含了在哪個運行階段事件產(chǎn)生后會進行調(diào)用,和事件來源的信息
  • 86行的set_ev_handler函數(shù)與set_ev_cls非常相似,只有一點不同:
    • Caller類型的ev_source屬性是None
    • 那么這個ev_source屬性有什么用呢?與自動加載依賴有關(guān),繼續(xù)看代碼會有更多解釋
  • 于是我們明白了,set_ev_cls(handler)僅僅為我們的自定義函數(shù)添加了一個callers變量,記錄著有哪些事件產(chǎn)生時,要調(diào)用本函數(shù)
  • 但我們這時候只知道有這樣的事件存在,我要監(jiān)聽他,那么誰會產(chǎn)生這樣的事件呢?我們回顧一下get_dependent_services函數(shù):
120 def get_dependent_services(cls):
121     services = []
122     for _k, m in inspect.getmembers(cls, _is_method):
123         if _has_caller(m):
124             for ev_cls, c in m.callers.items():
125                 service = getattr(sys.modules[ev_cls.__module__],
126                                   '_SERVICE_NAME', None)
127                 if service:
128                     # avoid cls that registers the own events (like
129                     # ofp_handler)
130                     if cls.__module__ != service:
131                         services.append(service)
132
133     m = sys.modules[cls.__module__]
134     services.extend(getattr(m, '_REQUIRED_APP', []))
135     services = list(set(services))
136     return services
137
138
139 def register_service(service):
140     """
141     Register the ryu application specified by 'service' as
142     a provider of events defined in the calling module.
143
144     If an application being loaded consumes events (in the sense of
145     set_ev_cls) provided by the 'service' application, the latter
146     application will be automatically loaded.
147
148     This mechanism is used to e.g. automatically start ofp_handler if
149     there are applications consuming OFP events.
150     """
151     frame = inspect.currentframe()
152     m_name = frame.f_back.f_globals['__name__']
153     m = sys.modules[m_name]
154     m._SERVICE_NAME = service
  • 這次我們來詳細(xì)回顧get_dependent_services函數(shù):
    • 121行初始化services列表用于記錄當(dāng)前cls所依賴的app
    • 122行遍歷app的所有函數(shù),查看是否含有caller屬性,也就是是否被set_ev_xx修飾過
    • 124行遍歷當(dāng)前函數(shù)監(jiān)聽的事件
    • 125行再次出現(xiàn)service變量名,這里的service表示當(dāng)前app依賴的app
      • 獲取依賴app是通過查找事件類所在模塊的_SERVICE_NAME變量得到的
      • 恰好139行的register_service,本文件最后一個函數(shù),就是用來為模塊的_SERVICE_NAME變量賦值的
      • 賦值的value是通過參數(shù)指定的
      • 搜索調(diào)用register_service的函數(shù),會發(fā)現(xiàn)我們熟悉的ofp_event,而經(jīng)過查看發(fā)現(xiàn)ofp_event.py文件在文件的最末尾處直接調(diào)用了這句話
      • 也就是說,只要我們import了ofp_event模塊,就會自動執(zhí)行這條register語句,在ofp_event模塊自身中植入_SERVICE_NAME變量,指示當(dāng)前文件中的事件,是由哪個app產(chǎn)生的
      • 就解釋了我們沒有顯式指定ofp_handler這個app,他同樣會被拉起執(zhí)行
controller/dpset.py:handler.register_service('ryu.controller.dpset')
controller/handler.py:def register_service(service):
controller/ofp_event.py:handler.register_service('ryu.controller.ofp_handler')
services/protocols/ovsdb/event.py:handler.register_service('ryu.services.protocols.ovsdb.manager')
services/protocols/vrrp/event.py:handler.register_service('ryu.services.protocols.vrrp.manager')
topology/event.py:handler.register_service('ryu.topology.switches')
 94
 95
 96 def _has_caller(meth):
 97     return hasattr(meth, 'callers')
 98
 99
100 def _listify(may_list):
101     if may_list is None:
102         may_list = []
103     if not isinstance(may_list, list):
104         may_list = [may_list]
105     return may_list
106
107
108 def register_instance(i):
109     for _k, m in inspect.getmembers(i, inspect.ismethod):
110         # LOG.debug('instance %s k %s m %s', i, _k, m)
111         if _has_caller(m):
112             for ev_cls, c in m.callers.items():
113                 i.register_handler(ev_cls, m)
114
115
116 def _is_method(f):
117     return inspect.isfunction(f) or inspect.ismethod(f)
118
119
  • 最后是本文件剩余的部分,除了幾個工具函數(shù),還有一個register_instance函數(shù)
  • 根據(jù)傳參具有caller屬性的方法知道,入?yún)是一個app,而在113行調(diào)用了app的register_handler方法
  • 搜索后得知,app_manager會在_instantiate函數(shù),即對app進行初始化時,
  • 調(diào)用如下的register_app函數(shù),其中調(diào)用了我們現(xiàn)在關(guān)注的register_instance函數(shù)
  • 因此在本節(jié)事件管理分析完畢后,會接著分析事件路由
def register_app(app):
    assert isinstance(app, RyuApp)
    assert app.name not in SERVICE_BRICKS
    SERVICE_BRICKS[app.name] = app
    register_instance(app)

事件管理以及依賴計算小結(jié)

  1. 編寫特定協(xié)議的event模塊時,在文件中執(zhí)行register_service函數(shù),表明當(dāng)前模塊中的event們是由哪個app產(chǎn)生
  2. 當(dāng)我們的app導(dǎo)入了對應(yīng)的event模塊時,就已經(jīng)執(zhí)行了register_service
  3. 當(dāng)我們的函數(shù)使用set_ev_cls修飾,并指定監(jiān)聽特定的event時,修飾器會在這個函數(shù)的callers變量中添加相應(yīng)的監(jiān)聽事件信息
  4. 在run我們編寫的app時,app_manager會使用get_dependent_services函數(shù)尋找當(dāng)前app監(jiān)聽的事件
  5. 從而找到這個事件所屬的event模塊,再找到event模塊的_SERVICE_NAME變量
  6. 最終得知如果我要監(jiān)聽在這個事件的話,需要拉起這個app,從而讓其產(chǎn)生這樣的事件以供我消費
  7. 如果我們沒有在event模塊中對事件生產(chǎn)者進行注冊,或者使用的是set_ev_handler修飾器,那么ryu也就不會自動導(dǎo)入拉起對應(yīng)的app了

事件收發(fā)

  • 事件的收發(fā)是多個app在運行階段,進行的事件接收發(fā)送

ryu/ryu/base/app_manager.py

 91 class RyuApp(object):
...省略文檔
105
106     _CONTEXTS = {}
...省略文檔
124
125     _EVENTS = []
...省略文檔
132     OFP_VERSIONS = None
...省略文檔
146     @classmethod
147     def context_iteritems(cls):
148         """
149         Return iterator over the (key, contxt class) of application context
150         """
151         return iter(cls._CONTEXTS.items())
153     def __init__(self, *_args, **_kwargs):
154         super(RyuApp, self).__init__()
155         self.name = self.__class__.__name__
156         self.event_handlers = {}        # ev_cls -> handlers:list
157         self.observers = {}     # ev_cls -> observer-name -> states:set
158         self.threads = []
159         self.main_thread = None
160         self.events = hub.Queue(128)
161         self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
162         if hasattr(self.__class__, 'LOGGER_NAME'):
163             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
164         else:
165             self.logger = logging.getLogger(self.name)
166         self.CONF = cfg.CONF
167
168         # prevent accidental creation of instances of this class outside RyuApp
169         class _EventThreadStop(event.EventBase):
170             pass
171         self._event_stop = _EventThreadStop()
172         self.is_active = True
  • 涉及到了app之間的事件收發(fā),就不得不補一下前面剩下還沒有分析的RyuAPP的app基類
  • app基類中定義了app與app間收、發(fā)事件的函數(shù),也定義了app在協(xié)程中運行的事件循環(huán)
  • 在app基類的開頭首先定義了類屬性_CONTEXT,我們已經(jīng)很熟悉了,是用來定義本app所依賴的外部類的字典
  • 還定義了_EVENTS類屬性用來定義本app能夠產(chǎn)生的事件,后面還會詳細(xì)分析
  • OFP_VERSIONS是本app工作于的openflow版本
  • app基類初始化函數(shù)中
    • event_handlers變量存儲接事件到函數(shù)的映射
    • observers是對本app產(chǎn)生的事件監(jiān)聽的app們,也就是本app的消費者
    • threads是本app產(chǎn)生的多個協(xié)程
    • main_thread用于main函數(shù)join,main_thread退出了才是真推出
    • events變量是個隊列,用于存儲其他app發(fā)送來的事件
    • 其他變量見名知意
194
195     def register_handler(self, ev_cls, handler):
196         assert callable(handler)
197         self.event_handlers.setdefault(ev_cls, [])
198         self.event_handlers[ev_cls].append(handler)
199
200     def unregister_handler(self, ev_cls, handler):
201         assert callable(handler)
202         self.event_handlers[ev_cls].remove(handler)
203         if not self.event_handlers[ev_cls]:
204             del self.event_handlers[ev_cls]
...省略發(fā)送相關(guān)函數(shù),以及協(xié)程相關(guān)函數(shù)
229     def get_handlers(self, ev, state=None):
230         """Returns a list of handlers for the specific event.
231
232         :param ev: The event to handle.
233         :param state: The current state. ("dispatcher")
234                       If None is given, returns all handlers for the event.
235                       Otherwise, returns only handlers that are interested
236                       in the specified state.
237                       The default is None.
238         """
239         ev_cls = ev.__class__
240         handlers = self.event_handlers.get(ev_cls, [])
241         if state is None:
242             return handlers
243
244         def test(h):
245             if not hasattr(h, 'callers') or ev_cls not in h.callers:
246                 # dynamically registered handlers does not have
247                 # h.callers element for the event.
248                 return True
249             states = h.callers[ev_cls].dispatchers
250             if not states:
251                 # empty states means all states
252                 return True
253             return state in states
254
255         return filter(test, handlers)
  • 接下來分析app基類中事件鉤子相關(guān)的函數(shù)
  • 首先我們看到了前文提到的調(diào)用鏈,即instantiate_apps -> _instantiate -> register_app -> register_instance -> register_handler,中的最后一環(huán)register_handler
  • register_handler做的事情也很簡單,把本app中各函數(shù)的callers收集起來,整理成一個字典,也就是self.event_handlers
  • unregister_handler也就是相應(yīng)的刪除函數(shù)了
  • 229行g(shù)et_handlers函數(shù),獲取相應(yīng)ev所對應(yīng)的handlers,并進行dispatcher階段的過濾
206     def register_observer(self, ev_cls, name, states=None):
207         states = states or set()
208         ev_cls_observers = self.observers.setdefault(ev_cls, {})
209         ev_cls_observers.setdefault(name, set()).update(states)
210
211     def unregister_observer(self, ev_cls, name):
212         observers = self.observers.get(ev_cls, {})
213         observers.pop(name)
214
215     def unregister_observer_all_event(self, name):
216         for observers in self.observers.values():
217             observers.pop(name, None)
218
219     def observe_event(self, ev_cls, states=None):
220         brick = _lookup_service_brick_by_ev_cls(ev_cls)
221         if brick is not None:
222             brick.register_observer(ev_cls, self.name, states)
223
224     def unobserve_event(self, ev_cls):
225         brick = _lookup_service_brick_by_ev_cls(ev_cls)
226         if brick is not None:
227             brick.unregister_observer(ev_cls, self.name)
...省略其他函數(shù)
256
257     def get_observers(self, ev, state):
258         observers = []
259         for k, v in self.observers.get(ev.__class__, {}).items():
260             if not state or not v or state in v:
261                 observers.append(k)
262
263         return observers
264
...省略其他函數(shù)
320     def send_event_to_observers(self, ev, state=None):
321         """
322         Send the specified event to all observers of this RyuApp.
323         """
324
325         for observer in self.get_observers(ev, state):
326             self.send_event(observer, ev, state)
  • 接下來分析app基類中注冊事件相關(guān)的函數(shù)
  • self.observers是一個字典,key是本app能夠產(chǎn)生的事件類型,value還是一個字典,key是監(jiān)聽這個事件的app名字,value是他指定的監(jiān)聽生效的階段
  • register_observer與unregister_observer函數(shù)被其他邏輯調(diào)用,從而向本app注冊或取消注冊事件的目的地
  • observe_event和unobserve_event則被本app調(diào)用,調(diào)用對方app的register_observer,向其他app聲明自己的監(jiān)聽
  • 320行send_event_to_observers被本app中產(chǎn)生事件的函數(shù)所調(diào)用,向已經(jīng)跟本app注冊了監(jiān)聽的app們發(fā)送事件
265     def send_request(self, req):
266         """
267         Make a synchronous request.
268         Set req.sync to True, send it to a Ryu application specified by
269         req.dst, and block until receiving a reply.
270         Returns the received reply.
271         The argument should be an instance of EventRequestBase.
272         """
273
274         assert isinstance(req, EventRequestBase)
275         req.sync = True
276         req.reply_q = hub.Queue()
277         self.send_event(req.dst, req)
278         # going to sleep for the reply
279         return req.reply_q.get()
...省略其他函數(shù)
301     def _send_event(self, ev, state):
302         self._events_sem.acquire()
303         self.events.put((ev, state))
304
305     def send_event(self, name, ev, state=None):
306         """
307         Send the specified event to the RyuApp instance specified by name.
308         """
309
310         if name in SERVICE_BRICKS:
311             if isinstance(ev, EventRequestBase):
312                 ev.src = self.name
313             LOG.debug("EVENT %s->%s %s",
314                       self.name, name, ev.__class__.__name__)
315             SERVICE_BRICKS[name]._send_event(ev, state)
316         else:
317             LOG.debug("EVENT LOST %s->%s %s",
318                       self.name, name, ev.__class__.__name__)
319
320     def send_event_to_observers(self, ev, state=None):
321         """
322         Send the specified event to all observers of this RyuApp.
323         """
324
325         for observer in self.get_observers(ev, state):
326             self.send_event(observer, ev, state)
327
328     def reply_to_request(self, req, rep):
329         """
330         Send a reply for a synchronous request sent by send_request.
331         The first argument should be an instance of EventRequestBase.
332         The second argument should be an instance of EventReplyBase.
333         """
334
335         assert isinstance(req, EventRequestBase)
336         assert isinstance(rep, EventReplyBase)
337         rep.dst = req.src
338         if req.sync:
339             req.reply_q.put(rep)
340         else:
341             self.send_event(rep.dst, rep)
  • 接下來分析app基類中事件發(fā)送相關(guān)的函數(shù)
  • 305行是剛剛send_event_to_observers中調(diào)用的send_event函數(shù)
  • send_event在設(shè)置了event的src后,調(diào)用了目的app的_send_event函數(shù)
  • _send_event只需要在app的events列表中put相應(yīng)的事件
  • 最后265和328行是ryu基于當(dāng)前even機制實現(xiàn)的同步的請求響應(yīng)機制
    • 請求時發(fā)送的事件中包含了同步請求的標(biāo)識,還包含了取得結(jié)果的返回隊列
    • 響應(yīng)時只需把向req帶來的隊列中put返回值即可
174     def start(self):
175         """
176         Hook that is called after startup initialization is done.
177         """
178         self.threads.append(hub.spawn(self._event_loop))
179
180     def stop(self):
181         if self.main_thread:
182             hub.kill(self.main_thread)
183         self.is_active = False
184         self._send_event(self._event_stop, None)
185         hub.joinall(self.threads)
186
187     def set_main_thread(self, thread):
188         """
189         Set self.main_thread so that stop() can terminate it.
190
191         Only AppManager.instantiate_apps should call this function.
192         """
193         self.main_thread = thread
...省略其他事件收發(fā)相關(guān)函數(shù)
281     def _event_loop(self):
282         while self.is_active or not self.events.empty():
283             ev, state = self.events.get()
284             self._events_sem.release()
285             if ev == self._event_stop:
286                 continue
287             handlers = self.get_handlers(ev, state)
288             for handler in handlers:
289                 try:
290                     handler(ev)
291                 except hub.TaskExit:
292                     # Normal exit.
293                     # Propagate upwards, so we leave the event loop.
294                     raise
295                 except:
296                     LOG.exception('%s: Exception occurred during handler processing. '
297                                   'Backtrace from offending handler '
298                                   '[%s] servicing event [%s] follows.',
299                                   self.name, handler.__name__, ev.__class__.__name__)
  • 接下來分析app基類中事件接收相關(guān)的函數(shù)
  • 還記得我們在分析app_manager的instantiate_apps函數(shù)時,對每個app都調(diào)用了start,并將返回值作為協(xié)程句柄返回以供main函數(shù)join
  • 174行可以看到start函數(shù)做的事情只有在hub中添加本app的事件循環(huán),并且如果不繼承覆蓋start函數(shù)的話,并不會返回協(xié)程句柄
  • 180行stop函數(shù)則進行相應(yīng)的kill
  • 281行就是本app的事件循環(huán)
    • 282行可以看出本app處于活躍階段,并且events隊列中有事件時,才會進入循環(huán)
    • 283行從隊列中取出事件,287行從本app的handlers變量取出ev事件對應(yīng)的事件處理函數(shù)
    • 最后288行依次執(zhí)行這個event對應(yīng)的handler,完成事件的接收工作
451     def _update_bricks(self):
452         for i in SERVICE_BRICKS.values():
453             for _k, m in inspect.getmembers(i, inspect.ismethod):
454                 if not hasattr(m, 'callers'):
455                     continue
456                 for ev_cls, c in m.callers.items():
457                     if not c.ev_source:
458                         continue
459
460                     brick = _lookup_service_brick_by_mod_name(c.ev_source)
461                     if brick:
462                         brick.register_observer(ev_cls, i.name,
463                                                 c.dispatchers)
464
465                     # allow RyuApp and Event class are in different module
466                     for brick in SERVICE_BRICKS.values():
467                         if ev_cls in brick._EVENTS:
468                             brick.register_observer(ev_cls, i.name,
469                                                     c.dispatchers)
  • 最后還有一個疑問是,我們知道了handler變量通過怎么樣的調(diào)用鏈完成初始化,那么observers呢?
  • 前文提到了app_manager中instantiate_apps函數(shù)調(diào)用了_update_bricks,這里回顧_update_bricks函數(shù),前面我們沒有詳細(xì)分析,現(xiàn)在可以回頭再來看一下
  • 452行遍歷了BRICK,即app
  • 453行遍歷了app的所有方法
  • 454行過濾留下有callers屬性,即監(jiān)聽了事件的函數(shù)
  • 456行過濾掉沒有事件來源的事件
  • 460行尋找產(chǎn)生這個事件的模塊
  • 462行調(diào)用前文提到的register_observer,向?qū)Ψ絘pp表明自己的存在,聲明自己的監(jiān)聽事件和監(jiān)聽階段
  • 最后466行,如果事件的產(chǎn)生者并不在event這個類的模塊里,ryu還會尋找所有brick(app)的_EVENTS屬性
    • 如果app在_EVENTS中聲明了自己會產(chǎn)生這個事件的話,那么同樣在這個app進行注冊
    • 這也就填坑了前面沒有細(xì)說的_EVENTS,就是在這里對其進行了讀取使用
    • 因此如果我們自己要編寫新的event類型,同時event定義與事件產(chǎn)生的app在不同模塊,就需要要在_EVENTS指定事件信息

事件收發(fā)小結(jié)

  • app的handler和observers兩個變量均在instantiate_apps階段完成變量的計算和賦值,為后續(xù)的事件路由提供信息
  • 事件的接收是通過app基類中的事件循環(huán)完成的
  • 事件的發(fā)送是通過本app的某函數(shù)主動調(diào)用send_event_to_observers等函數(shù)實現(xiàn)的

TODO

  • ryu的wsgi
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容