題目
題目?jī)?nèi)容
Topic類似于水壩(蓄積功能,消峰填谷之利器),Queue類似于水渠;每當(dāng)新建一個(gè)Queue的時(shí)候,可以選擇綁定到幾個(gè)Topic,類似于水渠從水壩引水; 每個(gè)Topic可以被任意多個(gè)Queue綁定,這點(diǎn)與現(xiàn)實(shí)生活不太一樣,因?yàn)閿?shù)據(jù)可以多次拷貝; 在發(fā)送的時(shí)候,可以選擇發(fā)送到Topic,也可以選擇直接發(fā)送到Queue;直接發(fā)送到Queue的數(shù)據(jù)只能被對(duì)應(yīng)Queue消費(fèi),不能被其他Queue讀取到; 在消費(fèi)的時(shí)候,除了要讀取綁定的Topic的數(shù)據(jù),還要去取直接發(fā)送到該Queue的數(shù)據(jù)。
程序目標(biāo)
實(shí)現(xiàn)以下接口:
- Producer的createBytesMessageToTopic(topic, body)
- Producer的createBytesMessageToQueue(queue, body)
- Producer的send(message)
- PullConsumer的attachQueue(queue, topics)
- PullConsumer的poll()
程序校驗(yàn)邏輯
- 10~20個(gè)線程(位于同一進(jìn)程中)各自獨(dú)立調(diào)用Producer發(fā)送消息(每個(gè)線程啟動(dòng)一個(gè)Producer,每條消息隨機(jī)發(fā)送到某個(gè)Topic或者Queue),持續(xù)時(shí)間T1,請(qǐng)注意把消息數(shù)據(jù)寫(xiě)入磁盤中
- 強(qiáng)行kill Producer進(jìn)程,未寫(xiě)入磁盤的消息都會(huì)丟失
- 10~20個(gè)線程(位于同一進(jìn)程中)獨(dú)立調(diào)用Consumer收取消息(每個(gè)線程啟動(dòng)一個(gè)Consumer,attach到指定的Queue,不同的Consumer不會(huì)attach同一個(gè)Queue),驗(yàn)證消息順序準(zhǔn)確性,可靠性,消費(fèi)持續(xù)的時(shí)間為T2,消費(fèi)到的總消息數(shù)目為N
- 以N/(t1+t2)來(lái)衡量性能
補(bǔ)充說(shuō)明
- 測(cè)試時(shí),topic和queue的數(shù)目大約是100個(gè)(其中queue的數(shù)目與消費(fèi)者線程數(shù)相等);
- 測(cè)試時(shí),消息大小不會(huì)超過(guò)256K;
- 可靠性是指,消息不能丟失,且消息的內(nèi)容不能被篡改;在測(cè)試消費(fèi)的時(shí)候,會(huì)對(duì)消息的body,headers,properties的內(nèi)容進(jìn)行校驗(yàn);
- header與properties中key和value都不會(huì)插入null或空值;
- 僅允許依賴JavaSE8包含的lib;
消息順序的說(shuō)明
- 順序只針對(duì)單個(gè)topic或者queue,不同topic,不同queue,topic與queue之間都不用考慮順序;
- 消息產(chǎn)品的一個(gè)重要特性是順序保證,也就是消息消費(fèi)的順序要與發(fā)送的時(shí)間順序保持一致;
- 在多發(fā)送端的情況下,保證全局順序代價(jià)比較大,只要求各個(gè)發(fā)送端的順序有保障即可; 舉個(gè)例子P1發(fā)送M11,M12,M13,P2發(fā)送M21,M22,M23,在消費(fèi)的時(shí)候,只要求保證M11,M12,M13(M21,M22,M23)的順序,也就是說(shuō)實(shí)際消費(fèi)順序?yàn)镸11,M21,M12,M13,M22,M23 正確,M11,M21,M22,M12,M13,M23 正確,M11,M13,M21,M22,M23,M12 錯(cuò)誤,M12與M13的順序顛倒了;
題目解讀
- 題目要求實(shí)現(xiàn)五個(gè)接口,分別對(duì)應(yīng)于Producer生產(chǎn)消息、發(fā)送消息,Consumer綁定topic和queue、消費(fèi)消息;
- topic可以存儲(chǔ)數(shù)據(jù),queue也可以存儲(chǔ)數(shù)據(jù);
- Producer可以把消息發(fā)送到任意的topic和queue中,但是一條消息只能發(fā)送到一個(gè)topic或queue中;
- Consumer和queue數(shù)量相等,兩者一一對(duì)應(yīng),一個(gè)Consumer綁定一個(gè)queue和多個(gè)topic,不同的Consumer綁定不同的queue,topic可以相同;
- Consumer消費(fèi)數(shù)據(jù)時(shí),只保證對(duì)應(yīng)Producer局部有序,即Consumer消費(fèi)某topic/queue的消息時(shí),來(lái)自同一Producer的數(shù)據(jù)其接收順序與發(fā)送順序相同;
答辯資料解讀
初賽第六名代碼鏈接(本人非作者):
https://github.com/whutjs/MessageSystem
生產(chǎn)者架構(gòu)

生產(chǎn)者架構(gòu)圖
- 每個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)輸出文件;
- 生產(chǎn)者每生產(chǎn)一條消息,就把消息編碼后寫(xiě)入ByteBuffer中,再把ByteBuffer中二進(jìn)制數(shù)據(jù)寫(xiě)入對(duì)應(yīng)topic或queue的Cache中,然后清空ByteBuffer;
- 當(dāng)topic/queue對(duì)應(yīng)的Cache存滿后,就把這個(gè)Cache中所有二進(jìn)制數(shù)據(jù)寫(xiě)到輸出文件對(duì)應(yīng)的ByteBuffer中;
- 寫(xiě)入ByteBuffer時(shí)依次寫(xiě)入topic/queue的編號(hào)、數(shù)據(jù)長(zhǎng)度的類型、數(shù)據(jù)長(zhǎng)度和多條消息的二進(jìn)制數(shù)據(jù);
- ByteBuffer存滿后就把數(shù)據(jù)刷到輸出文件中;
消費(fèi)者架構(gòu)

消費(fèi)者架構(gòu)圖
- 一個(gè)文件對(duì)應(yīng)一個(gè)FileReadCache,一個(gè)消費(fèi)者對(duì)應(yīng)一個(gè)FileReadCache;
- 每個(gè)消費(fèi)者擁有一個(gè)ConcurrentLinkedQueue,調(diào)用poll()方法讀取消息;
- 若queue中沒(méi)有消息,且對(duì)應(yīng)的FileReadCache沒(méi)有讀完,則通過(guò)FileReadCache解碼出一條消息,然后MessageLoader把消息分發(fā)到訂閱了此topic/queue的Consumer的queue中;
- 若queue中沒(méi)有消息,且對(duì)應(yīng)的FileReadCache已經(jīng)讀完,則降低該消費(fèi)者線程的優(yōu)先級(jí),這樣其它Consumer就會(huì)多占用CPU來(lái)解碼消息,并把該消費(fèi)者需要的消息發(fā)過(guò)來(lái),存到queue中;
- 所有的FileReadCache都把對(duì)應(yīng)的文件讀完了,MessageLoader發(fā)送endMsg到所有Consumer的queue中,這樣所有Consumer就消費(fèi)結(jié)束;
消息存儲(chǔ)結(jié)構(gòu)

消息存儲(chǔ)結(jié)構(gòu)
- 每個(gè)Producer對(duì)應(yīng)一個(gè)輸出文件,文件中存儲(chǔ)二進(jìn)制數(shù)據(jù);
- 文件中數(shù)據(jù)的結(jié)構(gòu):topic/queue的編號(hào)、數(shù)據(jù)長(zhǎng)度的類型、數(shù)據(jù)長(zhǎng)度、多條消息的二進(jìn)制數(shù)據(jù),然后是下一個(gè)topic/queue的數(shù)據(jù);
- 消息的結(jié)構(gòu):消息頭標(biāo)識(shí)、消息id的長(zhǎng)度、消息id的二進(jìn)制數(shù)據(jù)、key的長(zhǎng)度、key的二進(jìn)制數(shù)據(jù)、val的長(zhǎng)度、val的二進(jìn)制數(shù)據(jù)、消息體開(kāi)頭標(biāo)識(shí)、消息體的長(zhǎng)度、消息體的二進(jìn)制數(shù)據(jù),然后是下一條消息的數(shù)據(jù);
如何解碼二進(jìn)制數(shù)據(jù)
- 因?yàn)槲募亲芳訉?xiě)入的,因此最開(kāi)始的消息寫(xiě)在文件的最前面;
- 讀取數(shù)據(jù)時(shí),把當(dāng)前topic/queue對(duì)應(yīng)的整塊Cache數(shù)據(jù)加載到ByteBuffer中,一次解析出一條消息,再把該消息push給訂閱了該topic/queue的Consumer;
- 當(dāng)前topic/queue的消息被解析完后,再加載下一個(gè)topic/queue對(duì)應(yīng)的Cache數(shù)據(jù);
如何保證數(shù)據(jù)的有序性

有序性示例
圖中producer1發(fā)送3條數(shù)據(jù)給topic1、2條數(shù)據(jù)給topic2,consumer1訂閱了topic1和topic2;需保證consumer1中來(lái)自topic1的3條數(shù)據(jù)其順序與其在producer1中相同,來(lái)自topic2的2條數(shù)據(jù)其順序與其在producer1相同,來(lái)自不同topic的數(shù)據(jù)沒(méi)有順序要求;
原理:File中消息的順序與Producer一致,F(xiàn)ileReadCache從前往后依次解析File中的消息,然后把消息push給Consumer,這樣Consumer就保證了消息局部有順;