文章裝載自:http://blog.csdn.net/juvxiao/article/details/23532617
在Icehouse中, rpc消息隊列相關處理從OpenStack.common.rpc慢慢的都轉移到oslo.messaging上, 現在僅有幾個項目沒有轉移, 將來也會轉, 這個架構更合理, 代碼結構清晰, 且弱耦合.
本文章主要針對rpc, notify基本沒涉及, 有時間總結總結.
Server: 為各個Client提供RPC接口,它是消息的最終處理者;
Client: RPC接口的調用端, 我們常見的cast和call方法就是在這端調用;
Exchange: 理解為一個消息交換機, 把消息分類,告訴何種路由到何種queue;
Topic: 是一個RPC消息的唯一標識; servers監(jiān)聽這個topic的消息; client負責發(fā)出這個topic的消息;
Namespace: servers可以在一個topic上,提供多種方法集合, 這些方法集合通過namespace來分開管理;
Method: 這個慨念很簡單, 就是函數, 即遠程方法調用中的方法;
API version: 也就是server上提供的RPC api接口集合的版本號,openstack中1.0起步, servers可以一次提供多種api version,client每次請求時只需描述它所需要的最低version就ok;
Transport: 可以理解為傳輸載體,這個很好理解, 就是我們使用的消息隊列中間件RabbitMQ, Qpid, ZeroMQ等等, 是負責整個消息處理的系統, 它負責消息傳輸直到提供給clients返回, 使用此系統者, 不用了解細節(jié),? Openstack中實現的主要有這三種, AMQP標準下的rabbitMQ和Qpid, 和非AMQP的ZeroMQ, ZeroMQ更底層, 速度更快, 據說快10倍。
Target這是個很重要的概念, 它描述了信息的處理方式, 該發(fā)哪里去(server屬性)和消息處理端(server)監(jiān)聽什么信息(topic 屬性)。以下是Target的屬性
exchange (defaults to CONF.control_exchange)
topic
server (optional) 它會使server的標示, 如host or host@backend 等等
fanout (defaults to False)這種模式類似于廣播, 符合條件的server都要監(jiān)聽并做處理
namespace (optional)
API version (optional)
1.存在多個接口版本的server, 隨機選擇一個處理遠程調用的方法
比如我們有多個接口版本的servers在監(jiān)聽某個exchange上某個topic的方法調用, 一旦方法調用, 就會隨機選擇一個版本的server來監(jiān)聽。
nova-api 調用'nova' exchange上 topic為'scheduler' 的'run_instance'方法, 然后,會有一個‘nova-scheduler’服務捕獲請求
2.存在多個接口版本的server, 特定server處理遠程調用的方法
比如我們有多種接口版本的servers在監(jiān)聽某個exchange上某個topic的方法調用, 不同于之前的是, 它要求選擇一個特定版本的server來監(jiān)聽。
nova-scheduler 選擇 'foobar' host去運行一個instance,那么就調用'nova' exchange上 topic為'scheduler' 的'run_instance'方法, 它同時指定要在“foobar”這個server上run
3.存在多個接口版本的server, 每個server都要處理遠程調用的方法, 即所謂的fanout
比如我們有多種接口版本的servers在監(jiān)聽某個exchange上某個topic的方法調用, 不同于之前的是, 它要求每個server都要監(jiān)聽并運行這個方法。
nova-compute 周期性的在faout模式調用 'update_service_capabilities' 方法, topic為 'scheduler',exchange為'nova' , 這樣每個nova-scheduler 服務都要捕獲并處理它
Server端:消息隊列的監(jiān)聽會在service啟動的時候開啟, 比如cinder-volume啟動時,會啟動MessageHandlingServer( 下面具體介紹),來監(jiān)聽消息并把消息dispatch到距離的Manager方法中做消息處理.
Client端: 負責消息發(fā)出,方法調用的code是在具體API中,如本例的VolumeAPI, 一般存放在rpcapi.py中.
為了進行詳細解釋,先畫出整體的對象依賴圖:
MessageHandlingServer
這個就是server端,是消息的最終處理者。
Server端的實現有個兩個重要的內部概念:dispatchers 和executors, dispatcher專注在消息的加載并調度到合適的方法。 executor是專注在怎樣從transport中獲得消息并把它分配給dispatcher的策略,是重開一個線程還是在現有線程上繼續(xù)處理。實現dispatchers可以為rpc或者notification, executors可以為block或者eventlet
transport:消息中間件,rabbitMQ or Qpid or ZeroMQ
dispatcher:rpc或者notification
executor:block或者eventlet,默認block, 它描述的是I/O策略,是重開一個線程(EventletExecutor類實現)還是在現有線程上繼續(xù)處理(BlockingExecutor實現)。
MessageHandlingServer的start()方法開始后 , 就會一直獲取消息隊列中信息,并把消息分配給dispatcher, 直到stop()方法被調用。
RPCDispatcher
RPC消息調度器
MessageHandlingServer就依賴它完成從消息到具體處理消息的方法的調度。
它主要干了 兩件事
1) 它利用Transport類(Transport又利用AMQPDriverBase的listen()) 獲得監(jiān)聽器, 并報告給Executor
[python]view plaincopy
deflisten(self,?target):
conn?=self._get_connection(pooled=False)
listener?=?AMQPListener(self,?conn)
conn.declare_topic_consumer(target.topic,?listener)
conn.declare_topic_consumer('%s.%s'%?(target.topic,?target.server),
listener)
conn.declare_fanout_consumer(target.topic,?listener)
returnlistener
以上代碼可以看到listener里做了什么, declare了三個consumer, topic, topic.host, fanout, 正好呼應上面介紹的三種user case。關于consumer, 將會在另一遍博文(介紹amqp的publisher/consumer機制)中來介紹
2) 把監(jiān)聽器poll出來的消息通過這個dispatcher傳到XXXManager中的具體處理方法上
關于connectionpool不詳細介紹了,它是與具體Transport的連接池。
主要都寫在rpcapi.py 中, 本文以Cinder項目為例,講述delete_volume消息發(fā)出的過程。
[python]view plaincopy
classVolumeAPI(object):
BASE_RPC_API_VERSION?='1.0'
def__init__(self,?topic=None):
super(VolumeAPI,self).__init__()
target?=?messaging.Target(topic=CONF.volume_topic,
version=self.BASE_RPC_API_VERSION)
self.client?=?rpc.get_client(target,'1.15')
....
defdelete_volume(self,?ctxt,?volume,?unmanage_only=False):
cctxt?=self.client.prepare(server=volume['host'],?version='1.15')
cctxt.cast(ctxt,'delete_volume',
volume_id=volume['id'],
unmanage_only=unmanage_only)
RPCClient
負責通過消息隊列發(fā)消息到消息隊列中,兩種方法調用的方式:cast? & call。關于這兩種方法也將會在另一遍博文(介紹amqp的publisher/consumer機制)中來介紹
會利用Transport的send方法。