此系列用來記錄學(xué)習(xí)kungfu系統(tǒng)的點點滴滴
介紹
kungfu系統(tǒng)是知乎大神董可人團隊做的高頻交易系統(tǒng),后端使用c++11/17,前端使用vue/nodejs開發(fā),主要特點包括低延遲交易、開放的策略編寫方式、友好的使用方式、跨平臺運行、靈活的擴展接口,具體介紹不多說,可以看github上的介紹。
探索低延遲 從易筋經(jīng)開始
接下來分析yijinjing的源碼
1、/yijingjing/include/kungfu/common.h
這里定義了FORWARD_DECLARE_PTR和DECLARE_PTR作為smart ptr的宏定義,在其他代碼中,大量使用到這個宏定義。從代碼上看,F(xiàn)ORWARD_DECLARE_PTR引用了DECLARE_PTR,并未做太多處理。而DECLARE_PTR定義了typedef std::shared_ptr<X> X##_ptr。接下來分析其他代碼時要檢查兩者的區(qū)別。
2、/yijingjing/include/kungfu/yijinjing/common.h
- class event,namespace yijinjing
基類,所有成員函數(shù)都是虛函數(shù)
有個data()的內(nèi)聯(lián)函數(shù),使用了reinterpret_cast,做類型的轉(zhuǎn)換。
同時聲明了DECLARE_PTR(event),即event_ptr相當于std::shared_ptr<event>的定義。 - class publisher,namespace yijinjing
跟發(fā)布相關(guān)的接口基類
也聲明了DECLARE_PTR(publisher) - class observer,namespace yijinjing
跟觀察者相關(guān)的接口基類
也聲明了DECLARE_PTR(observer) - 幾個枚舉類型,namespace data
mode:應(yīng)該是處理模型,live是實盤,data是查詢數(shù)據(jù),replay是回看,backtest是回測
category:MD是行情,TD是交易,STRATEGY是策略,SYSTEM是系統(tǒng)
layout:數(shù)據(jù)存儲方式?JOURNAL,SQLITE,NANOMSG,LOG - class locator,namespace data
數(shù)據(jù)定位器的基類
包括了get_env(獲取環(huán)境變量),layout_dir(數(shù)據(jù)存儲目錄),layout_file(數(shù)據(jù)存儲的文件名,參數(shù)包括了下面的location類),list_page_id(從page列表中獲取id?) - class location,namespace data
使用了std::enable_shared_from_this,具體分析參見知乎上的一個問題,大概意思是防止循環(huán)引用,因為class location包含了class locator的智能指針,而該指針的成員函數(shù)參數(shù)包括location的智能指針,例如layout_dir。
屬性包括了mode,category兩個枚舉類型值,也有g(shù)roup,name兩個string值,還有uname,賦值方式是category/group/name/mode,uid是把uname進行hash得出來的字符串。
當然也包含了locator數(shù)據(jù)定位器的智能指針。 - namespace rx
應(yīng)該是跟rxcpp相關(guān)的信息
ReactiveX就是”觀察者模式+迭代器模式+函數(shù)式編程”,它擴展了觀察者模式,通過使用可觀察的對象序列流來表述一系列事件,訂閱者進行占點觀察并對序列流做出反應(yīng)(或持久化或輸出顯示等等);借鑒迭代器模式,對多個對象序列進行迭代輸出,訂閱者可以依次處理不同的對象序列;使用函數(shù)式編程思想(functional programming),極大簡化問題解決的步驟。
rxcpp是ReactiveX用c++語言實現(xiàn)的版本
3、/yijingjing/include/kungfu/yijinjing/journal/common.h
接下來到j(luò)ournal了。journal是易筋經(jīng)的數(shù)據(jù)模型,在知乎文章有簡單介紹,接下來幾個源碼分析都是圍繞這個來進行。首先是common.h。
定義了frame,page,journal,reader,writer的智能指針。
4、/yijingjing/include/kungfu/yijinjing/journal/frame.h
struct frame_header
定義frame的頭部信息,包括length(整個frame的總長度,包括frame的頭部信息和數(shù)據(jù)體的長度),header_length(頭部信息長度),gen_time(該frame的生成時間),trigger_time(該frame的觸發(fā)時間,用于延遲統(tǒng)計),msg_type(數(shù)據(jù)體的類型),source(該frame的來源),dest(該frame的目的地)class frame
繼承了event,可以看出class event的大多數(shù)方法跟frame_header的屬性有對應(yīng)關(guān)系。
包含frame_header屬性
友元類是journal和writer,因為private里包含了很多set屬性方法
注意frame.h并沒有frame.cc
5、/yijingjing/include/kungfu/yijinjing/journal/page.h
struct page_header
同樣,定了一個struct page_header結(jié)構(gòu)體,包括了version(版本),page_header_length(page頭部信息長度),page_size(page大?。?,frame_header_length(frame頭部信息長度,為什么這里也要有這個信息?不是在frame里定義了嗎?),last_frame_position(最后一個frame的位置)等屬性-
class page
包含了location的智能指針,page_id,lazy_(false代表低延遲?),size_(跟page_header的page_size是什么關(guān)系?),page_header(頭部信息),還有dest_id_(跟frame的dest是什么關(guān)系?)
以下是page的結(jié)構(gòu)圖,摘自知乎文章
image
可以看到,page有連續(xù)的frame組成,所以class page里包含了對frame的操作,注意這里連續(xù)的frame并沒有形成一個鏈表。
接下來是跟mmap相關(guān)的操作。 page::load
page_ptr page::load(const data::location_ptr &location, uint32_t dest_id, int page_id, bool is_writing, bool lazy)
首先根據(jù)location數(shù)據(jù)定位器的數(shù)據(jù)類型和dest_id來獲取該page的大小page_size,然后通過get_page_path獲取mmap映射文件的路徑,注意調(diào)用了python代碼
def layout_dir(self, location, layout):
mode = pyyjj.get_mode_name(location.mode)
category = pyyjj.get_category_name(location.category)
p = os.path.join(self._home, category, location.group, location.name, pyyjj.get_layout_name(layout), mode)
if not os.path.exists(p):
os.makedirs(p)
return p
def layout_file(self, location, layout, name):
return os.path.join(self.layout_dir(location, layout), "{}.{}".format(name, pyyjj.get_layout_name(layout)))
然后根據(jù)上述獲取的page_size,path,還有參數(shù)is_writing,lazy去調(diào)用util/mmap.cpp的load_mmap_buffer代碼,申請mmap。這里進入到load_mmap_buffer代碼中。首先根據(jù)如果is_writing為真或lazy為false,則說明是master。通過open返回文件句柄,master的話則是read and write,否則是只讀。同時,如果是master,則通過lseek(fd, size - 1, SEEK_SET)把讀寫位置移到末尾,同時測試是否能寫入。根據(jù)mmap開辟共享內(nèi)存空間,將首指針返回。
這里就回到了page.cpp的load方法中,返回的首指針即address。
接下來把address地址賦給page_header結(jié)構(gòu)體,作為page對象的屬性。auto header = reinterpret_cast<page_header *>(address);
接下來初始化page_header。
返回將上述元素new一個page對象,返回該page對象的智能指針。
- page::find_page_id
int page::find_page_id(const data::location_ptr &location, uint32_t dest_id, int64_t time)
首先根據(jù)location定位器和dest_id去列舉所有page_ids。這里同樣是通過journal.py的list_page_id獲取。包括了文件名正則表達式。
def list_page_id(self, location, dest_id):
page_ids = []
for journal in glob.glob(os.path.join(self.layout_dir(location, pyyjj.layout.JOURNAL), hex(dest_id)[2:] + '.*.journal')):
match = JOURNAL_LOCATION_PATTERN.match(journal[len(self._home) + 1:])
if match:
page_id = match.group(6)
page_ids.append(int(page_id))
return page_ids
接下來就逐個加載page,如果該page的begin_time(也就是該page第一個frame的gen_time)小于參數(shù)time,則返回該page的id(為什么是id,這個id用在什么地方?)
6、/yijingjing/include/kungfu/yijinjing/journal/journal.h
class journal
包括了location定位器的智能指針,dest_id_(又來了,這里也有dest_id_),is_writing_(對應(yīng)page?),lazy_(對應(yīng)page?),current_page_,frame_(當前的frame?),page_frame_nb_(不知,需要看代碼)class reader
包括了lazy_屬性,journal的普通指針*current_,還有一個journal_ptr智能指針的vector journals_。
class reader有join方法,此方法是把多個journal加入到class reader的journals_里,根據(jù)location數(shù)據(jù)定位器的uid和dest_id確定唯一的journal。如果沒有重復(fù)的journal,則把新建一個journal對象放到j(luò)ournals_最后,同時對這個journal戶必須進行seek_to_time操作,在seek_to_time里加載page。最后是sort。實際上并沒有排序,只是從journals_里選出當前frame的生成時間最小的journal,賦給current_屬性。-
class writer
包含了一個mutex鎖writer_mtx_(為什么會有鎖,低延遲系統(tǒng)是不會有鎖的,接下來會研究),一個journal智能指針journal_,frame_id_base_(不清楚?),publisher_ptr智能指針publisher_(終于看到跟通信相關(guān)的類了,接下來會研究。),size_to_write_(待寫入的數(shù)據(jù)大小)writer::writer(const data::location_ptr& location, uint32_t dest_id, bool lazy, publisher_ptr publisher) : publisher_(std::move(publisher)), size_to_write_(0)
根據(jù)location數(shù)據(jù)定位器的uid和dest_id計算出frame_id_base_,然后新建一個journal對象的智能指針,通過journal::seek_to_time加載page和定位current_page和current_frame等信息。注意seek_to_time的time是當前時間。void writer::close_page(int64_t trigger_time)
在writer::open_frame中調(diào)用。參數(shù)是trigger_time。首先獲取當前page的指針,然后加載下一個page,最后根據(jù)之前獲取的當前page的指針信息新建一個frame對象last_page_frame,作為上一個page的last_frame,注意這個frame的data_length為零。然后把這個frame set到past_page里。template<typename T> T &open_data(int64_t trigger_time, int32_t msg_type)
為了研究open_data是如何使用的,隨便找一段代碼。這里就從wingchun里獲取一段代碼,/wingchun/include/kungfu/wingchun/msg.h的class MsgWriter類的void write_data(int msg_type, const std::string &json_str)方法。
-> 這里調(diào)用了writer_->open_data方法,trigger_time為零,msg_type是msg::type::Quote,相當于獲取了journal_的current_page_的current_frame_(注意,本次沒有新增frame,新的frame在新建page或填完frame的data后會自動新建,接下來會說。)。并在data方法中將當前frame地址加上data_frame大小后的地址開始開辟大小為Quote大小的空間,通過reinterpret_cast。這樣就在當前frame開辟了一個空間存放Quote數(shù)據(jù)。
->調(diào)用from_json方法,把json數(shù)據(jù)一點點的寫入到Quote對象里
->然后調(diào)用writer::close_data方法,重置size_to_write屬性為零。然后調(diào)用writer::close_frame方法,參數(shù)是Quote的大小。
->進入writer::close_frame方法,第一步是獲取當前的frame,然后獲取下一個frame的地址next_frame_address,該地址是當前frame的地址加上data_frame的大小和Quote對象的大小,并判斷是否超出current_page_的邊界。然后在next_frame_address開始開辟一段大小為frame_header大小的空間。然后操作當前frame,也就是已寫入數(shù)據(jù)的最新的frame,更新gen_time為當前時間,更新data_length為Quote大小。然后把這個frame的地址作為current_page_的上一個frame的地址。接下來調(diào)用journal::next方法,通過frame::move_to_next方法更新journal的當前frame current_frame_為上述開辟的空間首地址。
->接著解鎖writer_mtx,因為再open_data時通過open_frame加了鎖。
->最后通過publisher發(fā)布出去。這里待研究,發(fā)布了什么東西?怎么發(fā)布?有什么接收者?publisher實例在哪里創(chuàng)建?frame_ptr writer::open_frame(int64_t trigger_time, int32_t msg_type, uint32_t data_length)
首先判斷兩個frame_header的大小加上data_length必須小于當前page current_page_的大小。接著通過try_lock來判斷鎖的情況,如果沒有獲得鎖,則計時,超時退出。否則獲得鎖。進入下一步操作。如果當前journal_的當前frame加上將要加上的frame_header和data大小大于journal_當前page的邊界,則調(diào)用close_page,翻到新的page,并給上一個page set_last_frame。然后獲取最新的frame,更新屬性,注意source是journal_的location數(shù)據(jù)定位器的uid,dest是journal_的dest_id,保持一致。但是這里沒有set data_length。
7、/yijingjing/src/journal/journal.cpp
最重要的方法是void journal::seek_to_time(int64_t nanotime)這里給定一個納秒時間nanotime,然后根據(jù)location_,dest_id_獲取第一個比nanotime的page,如果當前的page不等于獲取出來的page(通過page_id確定比較),則把當前page current_page_加載為獲取出來的page,同時把frame_設(shè)置為當前page的第一個frame的address,也把page_frame_nb_設(shè)置為零。
然后判斷current_page_是否為滿和時間是否小于nanotime,如果都滿足則不斷的把frame_換到下一個,或者加載下一個page。從而實現(xiàn)這篇文章里提到的(可能是吧?)
在每次讀寫進行到 Page 邊界的時候,易筋經(jīng)需要小心進行一系列細節(jié)操作,使得訪問者可以無縫的切換到下一個 Page,涉及到諸如調(diào)用 mmap 創(chuàng)建新的內(nèi)存映射文件,對內(nèi)存映射文件進行預(yù)處理等,這些操作往往相當耗時(毫秒級別),我們無法在實時讀寫數(shù)據(jù)時承受這個負擔。易筋經(jīng)的解決方案,是在后臺啟動并維護一個 PageEngine 進程,該進程負責提前載入緩存一些備用 Page,在需要時便可以立刻交付使用。在這種結(jié)構(gòu)設(shè)計下,由于我們是夸進程的申請、分配、釋放資源,所以需要多做一些安保工作,確保資源不會泄漏。PageEngine 內(nèi)部會小心的記錄申請者的進程編號 pid ,定期查詢客戶進程是否仍然健康存在,發(fā)現(xiàn)僵尸進程則會對其申請的資源進行釋放操作。
