4 cyber Reader接收數(shù)據(jù)
cyber收發(fā)數(shù)據(jù)有幾種類型:進程內(nèi),進程間,跨主機。進程間的收發(fā)基于共享內(nèi)存,跨主機的收發(fā)基于fastrtps網(wǎng)路庫。這里以進程間的場景為例說明。
ShmReceiver基于共享內(nèi)存實現(xiàn)Reciever接口,它委托ShmDispatcher從共享內(nèi)存讀取數(shù)據(jù)。
DataDispatcher/DataVisitor及相關(guān)類,實現(xiàn)一寫多讀的生產(chǎn)/消費模式。ShmReceiver讀取的數(shù)據(jù),會復(fù)制到DataDispatcher的環(huán)形緩存,這個緩存由DataVisitor提供。(ReceiverManager負責將DataDispatcher連接到receiver)。
每個Reader實例創(chuàng)建一個DataVisitor實例,并向DataDispatcher注冊它。DataVisitor的環(huán)形緩存是在構(gòu)造函數(shù)創(chuàng)建的。
Reader通過指定的回調(diào)函數(shù)接收消息。Reader通過RoutineFactory創(chuàng)建了一個協(xié)程,從DataVisitor讀取消息,然后調(diào)用這個回調(diào)函數(shù)。這樣Reader就收到消息了。

4.1 ConditionNotifier與MulticastNotifier
NotifierBase用于一組數(shù)據(jù)的生產(chǎn)/消費通知,這里一個數(shù)據(jù)單元稱作block。也
就是,當寫入一個block完成時,writer會通知一個或一組reader。
- 虛擬成員函數(shù) Notify()用于通知,Listen()用于得到通知。
作為NotfiBase的派生類,ConditionNotifier和MutlicastNotifier的區(qū)別是,前者通知一個消費者,后者通知所有消費者。
ConditionNotifier基于共享內(nèi)存實現(xiàn)。
- 成員key_ 是基于”/apollo/cyber/transport/shm/notifier”的hash值,用于創(chuàng)建共享內(nèi)存
- 成員managed_shm_指向創(chuàng)建的共享內(nèi)存,其中存放一組Indicator實例,也就是成員indicator_。Indicator保存block信息,如host_id_、channel_id、block_index_等。
MulticastNotifier基于udp組播實現(xiàn)。
+notifier_fd_是socket fd,使用組播地址239.255.0.100/8888。

4.2 Segment
基于共享內(nèi)存的通信使用Segment傳輸數(shù)據(jù),每個channel一個Segment實例。
- 成員channel_id_是channel id
- 成員managed_shm_ 是共享內(nèi)存塊。成員state_ 和blocks_在managed_shm_分配,分別保存meta信息和block數(shù)據(jù)。每個block一塊數(shù)據(jù)。
- 如果共享內(nèi)存塊還沒創(chuàng)建,OpenOrCreate()創(chuàng)建它,如果已經(jīng)創(chuàng)建了,OpenOnly()打開它。
- 成員函數(shù)AcquireBlockToWrite()/ReleaseWrittenBlocks()用于寫block,AcquireBlockToRead()/ReleaseReadBlock()用于讀block。
XsiSegment和PosixSegment的區(qū)別是創(chuàng)建共享內(nèi)存塊managed_shm_ 的方式不同。
SegmentFactory根據(jù)配置文件的選項創(chuàng)建XsiSegment/PosixSegment實例。

4.3 Dispatcher / ListenserHandler
ListenerHandlerBase定義了消息通知接口,ListenerHandler基于Signal實現(xiàn)這個接口。
- 成員signal_ 是Signal實例。成員函數(shù)Connect()將指定回調(diào)函數(shù)連接到signal_ 上,以便得到通知。
- 成員signal_conns_ 是從某種類型id到Signal實例的映射。
- 成員函數(shù)Run()使用給定的消息,調(diào)用signal_中的回調(diào)函數(shù)。
Dispatcher從傳輸層接收消息,并向上層派發(fā)。
- 成員msg_listeners_ 是從channel_id到ListenserHandleBase(實際上就是ListenerHandler)實例的映射。
- 成員函數(shù)AdListener()/RemoveListner()注冊指定的回調(diào)函數(shù)。它在msg_listeners_查找MessageListener實例,如果沒有就增加一個新的;將回調(diào)函數(shù)連接MessageListener的成員signal_上。這樣當MessageListener::Run()放入消息時,回調(diào)函數(shù)會被調(diào)用。

4.4 ShmDispatcher
ShmDispatcher基于共享內(nèi)存實現(xiàn)Dispatcher接口。
- 成員host_id_是本地主機id。
- 成員segments_ 是從channel_id到Segement實例的映射。從segments_獲取消息數(shù)據(jù),一個channel有一個Segment。成員函數(shù)AddSegment()向segments增加Segment實例。
- 當某個channel的Segment有消息到達時,可以從成員notifier_得到通知。notifier_負責多個channel。
- 注意ShmDispatcher有自己版本的AddListener(),使用模板參數(shù)Message。其中定義定義了一個將Message適配到ReadableBlock的函數(shù)。后面這個函數(shù)使用模板參數(shù)ReadableBlock調(diào)用Dispatcher::AddListener()。
- 成員thread_是接收消息的線程。它等待成員notifier_ 的通知;根據(jù)得到的channel調(diào)用ReadMessage()。
- 在ReadMessage()中,從相應(yīng)channel的Segment獲取ReadableBlock類型數(shù)據(jù),從數(shù)據(jù)中反序列化,得到MessageInfo信息(沒有數(shù)據(jù)部分),調(diào)用OnMessage()。
- 在OnMessage()中,從msg_listeners_中找到相應(yīng)channel_id的ListenerHandler實例,調(diào)用ListenerHandler::Run();從msg_listeners_找到對應(yīng)channel的ListenerHandler實例,然后調(diào)用功能ListenerHandler::Run()。這樣之前ShmDispatcher::AddListener()中定義的適配函數(shù)會被調(diào)用。
- 這個適配函數(shù)從ReadableBlock解析出Message,然后調(diào)用使用者的回調(diào)函數(shù)。

4.5 Receiver
Receiver負責接收消息。
- 成員函數(shù)OnNewMessage()在消息到達被調(diào)用,然后它會調(diào)用成員msg_listener_。后者由使用者通過Receiver構(gòu)造函數(shù)指定。
- 虛擬成員函數(shù)Enable()開始接收消息,Disable()停止接收消息。派生類一般在這里注冊/反注冊O(shè)nNewMessage()。
4.6 ShmReceiver
ShmReceiver基于共享內(nèi)存實現(xiàn)Receiver接口,它將主要工作委托給ShmDispatcher。
- 成員dispatcher_ 是ShmDispatcher實例。
- ShmReceiver定義了自己版本的Enable()/Disable(),以便把Receiver::OnNewMessage()加入到dispatcher_的監(jiān)聽者隊列中。當dispatcher_收到消息時,OnNewMessage()會調(diào)用使用者的回調(diào)函數(shù)。這個回調(diào)函數(shù)在ShmReceiver/Receiver的構(gòu)造函數(shù)中指定。

4.7 HybridReceiver
HybridReceiver是混合型Receiver實例,它實際上是將工作委托給自己的成員去處理,這些成員其他基本類型的Receiver,如ShmReceiver。

4.8 Transport
Transport負責根據(jù)指定的Mode創(chuàng)建Receiver/Transmiter實例。
4.9 cyber如何使用DataVisitor向多個Reader推送消息
DataVisitor在cyber中的使用場景如下:
- 在全局函數(shù)CreateRoutineFactory()中,創(chuàng)建RoutineFactory的實例。它的成員create_routine是函數(shù)指針,成員data_visitor_是DataVisitor實例。如前面介紹DataVisitor時所述,它的成員notifier_是一個回調(diào)函數(shù)。
- 用這個RoutineFactory實例作為參數(shù),調(diào)用Scheduler::CreateTask()。
- 創(chuàng)建CRoutine實例,協(xié)程處理函數(shù)設(shè)置為RoutineFactory::create_routine,
- 定義一個函數(shù),將它設(shè)置為RoutineFactory.data_visitor_.notifier_。

當消息到達時,DataDispatcher::dispatch()被調(diào)用,消息保存到成員buffer_中;同時成員notifier_被調(diào)用,這個回調(diào)函數(shù)會通知Scheduler更新當前協(xié)程的等待狀態(tài),進入可調(diào)度狀態(tài);如果協(xié)程的處理線程處于等待狀態(tài),還會喚醒它。
-
回到RoutineFactory::create_routine的實現(xiàn)。這個函數(shù)在一個for()循環(huán)中,
- 調(diào)用DataVisitor::TryFetch(),嘗試從成員buffer_ 獲取消息。如果有消息,則調(diào)用用戶提供的回調(diào)函數(shù);如果沒有,則調(diào)用CRoutine::Yield(),協(xié)程放棄剩余的時間片,進入等待。
對于Reader,它在Reader::Init()中定義了一個回調(diào)函數(shù)。這樣Reader實例就得到了消息。

4.10 ReceiverManager
ReceieverManager基于Transport創(chuàng)建Receiver實例。這里定義了一個函數(shù)綁定到Receiver實例上。當消息到達時,調(diào)用DataDispatcher::Dispatch()推送它,保存到DataVisitor實例中。

5 cyber Writer發(fā)送數(shù)據(jù)
cyber收發(fā)數(shù)據(jù)有幾種類型:進程內(nèi),進程間,跨主機。進程間的收發(fā)基于共享內(nèi)存,跨主機的收發(fā)基于fastrtps網(wǎng)路庫。這里以進程間的場景為例說明。
5.1 Transmitter
Transmitter負責發(fā)送消息。
- 成員函數(shù)Transmit()發(fā)送消息。
- 虛擬成員函數(shù)Enable()使能發(fā)送消息,Disable()停止發(fā)送消息。

5.2 ShmTrasmitter
ShmTransmitter基于共享內(nèi)存實現(xiàn)Transmitter接口。
- 成員channel_id_ 是channel id
- 成員segment_ 是Segment實例,數(shù)據(jù)寫入這里。
- 成員notifer_是Notifier實例,用于通知對端數(shù)據(jù)準備好了。
5.3 HybridTransmitter
HybridTransmitter是混合型Transmitter實例,它實際上是將工作委托給自己的成員去處理,這些成員其他基本類型的Transmitter,如ShmTransmitter。
6 cyber 跨主機收發(fā)數(shù)據(jù)的場景
前面以進程間的場景說明了cyber如何收發(fā)數(shù)據(jù),這里說明跨主機收發(fā)數(shù)據(jù)的場景。
RtpsDispatcher派生自Dispatcher,負責從fastrtps接收指定channel的消息,然后派發(fā)給上層的DataDispatcher/DataVisitor實例,進而被Reader接收。
- 成員participant是Participant實例。
- 成員 subs_是一個從channel id到Subcriber實例的映射。Subriber的成員包括fastrtps_Subscriber和SubListener實例,用于監(jiān)聽fastrtps上指定channel。

RtpsTransmitter派生自Transmitter,負責通過fastrtps發(fā)送指定channel的消息。消息發(fā)送之前,會先序列化成字符串。
- 成員participant是Participant實例。
- 成員publisher_ 是fastrtps_Publisher實例。

7 cyber 收發(fā)消息的底層消息
進程間通信時,ShmTransmitter/ShmDispatcher傳輸?shù)南卧荝eadableBlock/WritableBlock;跨主機通信時,RtpsTranmistter/RtpsDispatcher傳輸?shù)南卧荱nderlayMessage。
MessageInfo中保存發(fā)送端的Id,消息序列號等。傳輸時需要它來標記消息。

8 cyber跨主機收發(fā)數(shù)據(jù)的qos
cyber可以在配置文件cyber/conf/cyber.conf中,給跨主機通信設(shè)置質(zhì)量保證選項。
這些選項讀入后保存在RoleAttributes的成員qos_profle中,這是一個QosProfile實例。
當創(chuàng)建fastrtps_Subscriber/fastrtps_Publisher時,這些選項會作為創(chuàng)建的參數(shù)傳入。

相關(guān)鏈接
百度 Apollo 8.0 Cyber 源代碼分析(一)
百度 Apollo 8.0 Cyber 源代碼分析(二)
百度 Apollo 8.0 Cyber 源代碼分析(三)
百度 Apollo 8.0 Cyber 源代碼分析(四)
百度 Apollo 8.0 Cyber 源代碼分析(五)