之前六月份的時(shí)候有參加過(guò)阿里舉辦的第四屆中間件性能大賽,學(xué)到了不少東西,所以之后經(jīng)常會(huì)關(guān)注一下天池那邊阿里舉辦的程序設(shè)計(jì)大賽,九月底的時(shí)候注意到了這一屆的POLARDB數(shù)據(jù)庫(kù)性能大賽,很早就報(bào)了名。預(yù)熱賽10月25日開(kāi)始了~,初賽11月5日正式開(kāi)始,11月19日結(jié)束,我這篇文章發(fā)布的時(shí)候初賽就已經(jīng)結(jié)束了。因?yàn)?1月之前一直在找實(shí)習(xí),所以一直沒(méi)有做什么準(zhǔn)備,11月10號(hào)左右開(kāi)始編寫(xiě)第一版的代碼,到11月18號(hào)晚上放棄繼續(xù)嘗試。最后初賽成績(jī)是42名,時(shí)間是240.69秒,和大佬們比起來(lái)還是有很大差距的。寫(xiě)這篇博客,主要還是想分享一下這段時(shí)間參賽的思路,和一點(diǎn)一點(diǎn)慢慢提升的經(jīng)歷。
GitHub: https://github.com/AlexZFX/engine 當(dāng)前只?更新了初賽代碼,復(fù)賽結(jié)束后會(huì)繼續(xù)更新。
題目介紹
PolarDB作為軟硬件結(jié)合的代表, 充分使用新硬件, 榨干硬件的紅利來(lái)為用戶獲取極致的數(shù)據(jù)性能, 其中在PolarDB 的設(shè)計(jì)中, 我們使用 Optane SSD作為所有熱數(shù)據(jù)的寫(xiě)入緩沖區(qū), 通過(guò)kernel bypass的方式, 實(shí)現(xiàn)了極致的性能。所以本次比賽就以O(shè)ptane SSD盤(pán)為背景,參賽者在其基礎(chǔ)之上探索實(shí)現(xiàn)一種高效的kv存儲(chǔ)引擎
以上是阿里云官方給的比賽背景,具體的題目?jī)?nèi)容如下
初賽賽題(完整請(qǐng)點(diǎn)擊查看): 實(shí)現(xiàn)一個(gè)簡(jiǎn)化、高效的kv存儲(chǔ)引擎,支持Write、Read接口。
程序評(píng)測(cè)邏輯 評(píng)測(cè)程序分為2個(gè)階段:
1)Recover正確性評(píng)測(cè) 此階段評(píng)測(cè)程序會(huì)并發(fā)寫(xiě)入特定數(shù)據(jù)(key 8B、value
4KB)同時(shí)進(jìn)行任意次kill
-9來(lái)模擬進(jìn)程意外退出(參賽引擎需要保證進(jìn)程意外退出時(shí)數(shù)據(jù)持久化不丟失),接著重新打開(kāi)DB,調(diào)用Read接口來(lái)進(jìn)行正確性校驗(yàn)2)性能評(píng)測(cè)
- 隨機(jī)寫(xiě)入:64個(gè)線程并發(fā)隨機(jī)寫(xiě)入,每個(gè)線程使用Write各寫(xiě)100萬(wàn)次隨機(jī)數(shù)據(jù)(key 8B、value 4KB)
- 隨機(jī)讀?。?4個(gè)線程并發(fā)隨機(jī)讀取,每個(gè)線程各使用Read讀取100萬(wàn)次隨機(jī)數(shù)據(jù) 注:2.2階段會(huì)對(duì)所有讀取的kv校驗(yàn)是否匹配,如不通過(guò)則終止,評(píng)測(cè)失敗
總體說(shuō)來(lái)我們能得到的要求和信息為以下幾點(diǎn):
- 實(shí)現(xiàn)一個(gè)KV型數(shù)據(jù)庫(kù)的核心邏輯,主要為open、read、write三個(gè)接口。
- 支持多線程并發(fā)讀寫(xiě)。
- 保證在方法成功返回的情況下數(shù)據(jù)不丟失(kill保證不丟失的前提是已經(jīng)正確返回了,如果沒(méi)有的話是不算做丟失的)。
- key和value的長(zhǎng)度是確定的 key為8B,value為4KB。
- 可以使用Java語(yǔ)言或者C++,Java內(nèi)存限制3G,C++限制2G。
- 磁盤(pán)占用不超過(guò) 320G
編寫(xiě)過(guò)程
完整的參賽過(guò)程大概是一周時(shí)間,這一周進(jìn)行了非常多的嘗試,成績(jī)也從第一次跑通時(shí)的900多s到最后穩(wěn)定在240s,接下來(lái)會(huì)細(xì)細(xì)的說(shuō)一說(shuō)每一版的思路和進(jìn)階過(guò)程。(下面的標(biāo)題寫(xiě)的key value分別表示采用的文件數(shù))
大體思路
先做一些簡(jiǎn)單的計(jì)算,
key + offset = ( 8 + 8 ) * 64000000 / 1024 / 1024 = 977M
value = 4096 * 64000000 / 1024 / 1024 / 1024 = 245G
可見(jiàn)磁盤(pán)和內(nèi)存的限制相對(duì)來(lái)說(shuō)不會(huì)造成很大的影響,對(duì)合理的設(shè)計(jì)來(lái)說(shuō)還是充足的。
因?yàn)閗ey是一個(gè)8B的byte數(shù)組,故轉(zhuǎn)化成一個(gè)long型的數(shù)字很簡(jiǎn)單并且非常有利于接下來(lái)計(jì)算的事情。所以下文討論的key都是建立在long型的基礎(chǔ)上的。
初始主體的思路是這樣的
- 用HashMap在內(nèi)存中維護(hù)所有的key-offset對(duì),數(shù)據(jù)庫(kù)open時(shí)完成文件中的key和offset的加載工作,read時(shí)只需要找到對(duì)應(yīng)key的offset,然后在相應(yīng)value文件中進(jìn)行讀取即可。這里的hashmap選取是一個(gè)很重要的事情,因?yàn)镴ava自帶的hashmap是對(duì)對(duì)象的存儲(chǔ),故一個(gè)Long型的KV對(duì)要占用約40B的內(nèi)存,這樣的話3G的內(nèi)存會(huì)爆掉,最后選擇的是HPPC開(kāi)源的基礎(chǔ)類型的HashMap,選擇的原因主要參考了群里大佬的文章《應(yīng)用JMH測(cè)試大型HashMap的性能》,他也寫(xiě)了一些關(guān)于本次比賽的總結(jié)與分析,推薦大家關(guān)注并學(xué)習(xí),我和大佬的差距還是很大的。
- 因?yàn)関alue共有250G左右的內(nèi)容,必然要進(jìn)行分片,初始打算是對(duì)key進(jìn)行hash,然后將key的hash結(jié)果相同的value存儲(chǔ)在同一個(gè)文件中,key和該value對(duì)應(yīng)的offset存在同一個(gè)文件中。
- 所有的key和value不論是否重復(fù),都直接在文件尾利用追加寫(xiě),這樣在加載的時(shí)候,后面出現(xiàn)的key必然會(huì)覆蓋掉出現(xiàn)過(guò)的key,可以不用考慮key的重復(fù)問(wèn)題。
- 因?yàn)橐紤]到在進(jìn)程被kill的時(shí)候能保證數(shù)據(jù)不丟失,故不能對(duì)key或者value進(jìn)行緩存或者異步寫(xiě)入,否則可能會(huì)導(dǎo)致校驗(yàn)階段的失敗,則write接口被調(diào)用的時(shí)候都會(huì)直接對(duì)數(shù)據(jù)進(jìn)行落盤(pán)操作。
第一版 FileChannel讀寫(xiě) 1 key + 128 value 381.79s
首先想的是要跑出成績(jī),把所有的key都寫(xiě)在了一個(gè)文件里,一開(kāi)始忽略了一個(gè)小點(diǎn),把key和offset分開(kāi)寫(xiě)入了文件,導(dǎo)致出現(xiàn)了一些key和value不匹配的問(wèn)題。很顯然的問(wèn)題是寫(xiě)key和寫(xiě)offset會(huì)出現(xiàn)線程問(wèn)題,可能導(dǎo)致本來(lái)應(yīng)該是KeyValueKeyValue形式的數(shù)據(jù),被寫(xiě)成KeyKeyValueValue的形式,所以出錯(cuò)之后直接加了個(gè)synchronized關(guān)鍵字,得出第一次的成績(jī)968s,很快修改了這個(gè)簡(jiǎn)單的小問(wèn)題,得到一個(gè)明顯有大幅提升的成績(jī)381.79s,這時(shí)的代碼主要是這樣的。
@Override
public void open(String path) throws EngineException {
File file = new File(path);
// 創(chuàng)建目錄
if (!file.exists()) {
if (!file.mkdir()) {
throw new EngineException(RetCodeEnum.IO_ERROR, "創(chuàng)建文件目錄失敗:" + path);
} else {
logger.info("創(chuàng)建文件目錄成功:" + path);
}
}
//創(chuàng)建 FILE_COUNT個(gè)FileChannel 順序?qū)懭? RandomAccessFile randomAccessFile;
if (file.isDirectory()) {
for (int i = 0; i < FILE_COUNT; i++) {
try {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
FileChannel channel = randomAccessFile.getChannel();
fileChannels[i] = channel;
// 從 length處直接寫(xiě)入
offsets[i] = new AtomicLong(randomAccessFile.length());
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一個(gè)目錄");
}
File keyFile = new File(path + File.separator + "key");
if (!keyFile.exists()) {
try {
keyFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
// 從 index 文件建立 hashmap
try {
randomAccessFile = new RandomAccessFile(keyFile, "rw");
keyFileChannel = randomAccessFile.getChannel();
ByteBuffer keyBuffer = ByteBuffer.allocate(KEY_LEN);
ByteBuffer offBuffer = ByteBuffer.allocate(KEY_LEN);
keyFileOffset = new AtomicLong(randomAccessFile.length());
long temp = 0, maxOff = keyFileOffset.get();
while (temp < maxOff) {
keyBuffer.position(0);
keyFileChannel.read(keyBuffer, temp);
temp += KEY_LEN;
offBuffer.position(0);
keyFileChannel.read(offBuffer, temp);
temp += KEY_LEN;
keyBuffer.position(0);
offBuffer.position(0);
keyMap.put(keyBuffer.getLong(), offBuffer.getLong());
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(byte[] key, byte[] value) throws EngineException {
long numkey = Util.bytes2long(key);
int hash = hash(numkey);
long off = offsets[hash].getAndAdd(VALUE_LEN);
keyMap.put(numkey, off + 1);
try {
//key和offset寫(xiě)入文件
localKey.get().putLong(0, numkey).putLong(8, off + 1);
localKey.get().position(0);
keyFileChannel.write(localKey.get(), keyFileOffset.getAndAdd(KEY_AND_OFF_LEN));
//將value寫(xiě)入buffer
localBufferValue.get().position(0);
localBufferValue.get().put(value, 0, VALUE_LEN);
//buffer寫(xiě)入文件
localBufferValue.get().position(0);
fileChannels[hash].write(localBufferValue.get(), off);
} catch (IOException e) {
throw new EngineException(RetCodeEnum.IO_ERROR, "寫(xiě)入數(shù)據(jù)出錯(cuò)");
}
}
@Override
public byte[] read(byte[] key) throws EngineException {
long numkey = Util.bytes2long(key);
int hash = hash(numkey);
// key 不存在會(huì)返回0,避免跟位置0混淆,off寫(xiě)加一,讀減一
long off = keyMap.get(numkey);
if (off == 0) {
throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");
}
try {
localBufferValue.get().position(0);
fileChannels[hash].read(localBufferValue.get(), off - 1);
} catch (IOException e) {
throw new EngineException(RetCodeEnum.IO_ERROR, "讀取數(shù)據(jù)出錯(cuò)");
}
localBufferValue.get().position(0);
localBufferValue.get().get(localByteValue.get(), 0, VALUE_LEN);
return localByteValue.get();
}
- 這里的localKey 和 localBufferValue 都是 ThreadLocal 的 DirectByteBuffer(關(guān)于HeapByteBuffer和Direct的的一些差別后面會(huì)提到一些),用于作為 FileChannel(FileChannel參數(shù)都是在open的時(shí)候進(jìn)行的初始化) 寫(xiě)入的參數(shù),避免了每次寫(xiě)入都要allocate一塊新內(nèi)存的消耗。
- 這里的Hash采用的方法是與 0x7F 進(jìn)行求 & ,得到的結(jié)果劃分為128個(gè)value文件(這種hash的很簡(jiǎn)單且高效,帶來(lái)的隱患是在key的分布不均勻情況下可能導(dǎo)致某個(gè)文件非常大之類的現(xiàn)象,在后面也會(huì)提及)。
這個(gè)時(shí)候open的時(shí)間將近90s,很顯然是一個(gè)超出可承受范圍的結(jié)果。所以接下來(lái)很快對(duì)這一部分進(jìn)行了優(yōu)化。
第二版 FileChannel 64線程open 64 key 128 value 260.96s
open時(shí)間過(guò)長(zhǎng),所以這成了我們關(guān)注的一個(gè)重點(diǎn),這段時(shí)間我們做了很多改動(dòng),改動(dòng)的過(guò)程主要是這樣的。
-
單key文件,單個(gè)map,將完整的offset分為64份讀取 無(wú)成績(jī)
這一做法其實(shí)還沒(méi)跑出成績(jī)就被我們過(guò)渡掉了,因?yàn)楸镜剡M(jìn)行的測(cè)試一直過(guò)不去,我們第一時(shí)間想到的原因是因?yàn)?,單個(gè)key文件并發(fā)初始化的時(shí)候,后面出現(xiàn)的相等的key不一定會(huì)把前面的key覆蓋掉,所以會(huì)出現(xiàn)值不對(duì)的狀況。所以解決方案只能是所有相同的key必須要嚴(yán)格有序的讀取。 -
64個(gè)key文件,單個(gè)map 301.49s
因?yàn)樯厦嫠龅脑?,所以選擇對(duì)key也進(jìn)行一次hash,按照hash的結(jié)果將key劃分在了64個(gè)不同的key文件中,這樣的結(jié)果是相同的key一定會(huì)在相同文件中按照先后順序被寫(xiě)入,故讀取的時(shí)候一定是嚴(yán)格有序的。
這個(gè)版本本地的小量測(cè)試也通過(guò)了,以為沒(méi)有問(wèn)題,但線上失敗,這時(shí)我們才開(kāi)始關(guān)注hppc的map本身的線程安全性,給map的put加鎖后提交,果然通過(guò)了,得分301.49s。
簡(jiǎn)單看了一下源碼,顯然是線程不安全的,所以促使我們接下來(lái)的一次分map改動(dòng)。 -
64個(gè)key文件,64個(gè)map 260.96s
于是我們進(jìn)行了一次map的拆分,根據(jù)key的文件個(gè)數(shù)直接拆為了64個(gè)hashmap,差別是這樣拆分讓我們的map容量無(wú)法確定,簡(jiǎn)單線上用log測(cè)試了一下之后完成了64map的版本。并發(fā)問(wèn)題解決好之后,這一版本的分?jǐn)?shù)又有了不少提升,260.96s。這時(shí)候的open已經(jīng)被壓到了10s以內(nèi),但其實(shí)還是有提升的空間。
這版的主要改動(dòng)在open地方,下面貼出了這版的open方法。
@Override
public void open(String path) throws EngineException {
File file = new File(path);
// 創(chuàng)建目錄
if (!file.exists()) {
if (!file.mkdir()) {
throw new EngineException(RetCodeEnum.IO_ERROR, "創(chuàng)建文件目錄失敗:" + path);
} else {
logger.info("創(chuàng)建文件目錄成功:" + path);
}
}
RandomAccessFile randomAccessFile;
// file是一個(gè)目錄時(shí)進(jìn)行接下來(lái)的操作
if (file.isDirectory()) {
try {
//先構(gòu)建keyFileChannel 和 初始化 map
for (int i = 0; i < THREAD_NUM; i++) {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
FileChannel channel = randomAccessFile.getChannel();
keyFileChannels[i] = channel;
keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());
}
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
if (!(keyOffsets[i].get() == 0)) {
final long off = keyOffsets[i].get();
final int finalI = i;
executor.execute(() -> {
int start = 0;
long key;
int keyHash;
while (start < off) {
try {
localKey.get().position(0);
keyFileChannels[finalI].read(localKey.get(), start);
start += KEY_AND_OFF_LEN;
localKey.get().position(0);
key = localKey.get().getLong();
keyHash = keyFileHash(key);
keyMap[keyHash].put(key, localKey.get().getInt());
} catch (IOException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
});
} else {
countDownLatch.countDown();
}
}
countDownLatch.await();
executor.shutdownNow();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
//創(chuàng)建 FILE_COUNT個(gè)FileChannel 供write順序?qū)懭? for (int i = 0; i < FILE_COUNT; i++) {
try {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
FileChannel channel = randomAccessFile.getChannel();
fileChannels[i] = channel;
// 從 length處直接寫(xiě)入
valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一個(gè)目錄");
}
}
這一部分里我們還做了一些小事
- 通過(guò)在close方法中添加log,統(tǒng)計(jì)了一些線上key和value的數(shù)量,發(fā)現(xiàn)key和value都非常均勻,每個(gè)key和value文件的大小都很相近(其實(shí)這里有個(gè)讓我們后面踩到的小坑,偷偷的感受到了測(cè)評(píng)程序的一些陰險(xiǎn))。
- 將offset替換為了int型的數(shù)值,每次讀取的時(shí)候進(jìn)行一次12位的移位操作之后再?gòu)膙alue文件中讀取,這樣的好處是節(jié)約了一些內(nèi)存,可以用來(lái)做一些其他的事情。
第三版 用mmap讀open,64key 64value 245.18s
之前一直考慮著用mmap,在java里面對(duì)應(yīng)的就是MappedByteBuffer,因?yàn)槲也淮_定mmap能不能在kill -9 被殺進(jìn)程的情況保證數(shù)據(jù)的完整性,同時(shí),如果都用mmap寫(xiě)入的話,會(huì)讓我無(wú)法確定文件的大?。╩map映射時(shí)要預(yù)先指定文件大?。瑹o(wú)法在kill之后能從指定的位置追加寫(xiě)入。所以打算一步一步,最后再考慮使用這個(gè)。
但是open的時(shí)候使用mmap讀一定是沒(méi)有風(fēng)險(xiǎn)的,所以又進(jìn)行了一次對(duì)open的改動(dòng),這時(shí)還是64個(gè)key文件和128個(gè)value文件,得到的跑分是248.58,open過(guò)程被壓縮在了1s以內(nèi),大約600ms左右,這個(gè)open速度我們就基本已經(jīng)滿足了。
后來(lái)改成了64個(gè)value文件,每次只進(jìn)行一次hash就可以確定key和value文件的位置,并且讀寫(xiě)速度似乎都有略微進(jìn)步,達(dá)到了245.18s。
這時(shí)的open代碼如下
@Override
public void open(String path) throws EngineException {
File file = new File(path);
// 創(chuàng)建目錄
if (!file.exists()) {
if (!file.mkdir()) {
throw new EngineException(RetCodeEnum.IO_ERROR, "創(chuàng)建文件目錄失敗:" + path);
} else {
logger.info("創(chuàng)建文件目錄成功:" + path);
}
}
RandomAccessFile randomAccessFile;
// file是一個(gè)目錄時(shí)進(jìn)行接下來(lái)的操作
if (file.isDirectory()) {
try {
//先構(gòu)建keyFileChannel 和 初始化 map
for (int i = 0; i < THREAD_NUM; i++) {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
FileChannel channel = randomAccessFile.getChannel();
keyFileChannels[i] = channel;
keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());
}
ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
if (!(keyOffsets[i].get() == 0)) {
final long off = keyOffsets[i].get();
final int finalI = i;
executor.execute(() -> {
int start = 0;
try {
MappedByteBuffer mappedByteBuffer = keyFileChannels[finalI].map(FileChannel.MapMode.READ_ONLY, 0, off);
while (start < off) {
start += KEY_AND_OFF_LEN;
keyMap[finalI].put(mappedByteBuffer.getLong(), mappedByteBuffer.getInt());
}
unmap(mappedByteBuffer);
countDownLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
});
} else {
countDownLatch.countDown();
}
}
countDownLatch.await();
executor.shutdownNow();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
//創(chuàng)建 FILE_COUNT個(gè)FileChannel 供write順序?qū)懭? for (int i = 0; i < FILE_COUNT; i++) {
try {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
FileChannel channel = randomAccessFile.getChannel();
fileChannels[i] = channel;
// 從 length處直接寫(xiě)入
valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一個(gè)目錄");
}
}
這一版當(dāng)中我們也發(fā)現(xiàn)了一些問(wèn)題,閱讀了許多文章,總結(jié)主要如下:
- 在open過(guò)程中遇到過(guò)一次OOM,但是按照我的JVM參數(shù)和預(yù)計(jì)是不會(huì)出現(xiàn)這一現(xiàn)象,同時(shí)在這期間我有測(cè)試過(guò)使用mmap讀256個(gè)文件的value,但是卻出現(xiàn)了MappedByteBuffer爆掉的情況(Java的MappedByteBuffer有限制一次映射不能超過(guò)2G內(nèi)存)。通過(guò)線上的log測(cè)試,發(fā)現(xiàn)測(cè)評(píng)時(shí)最開(kāi)始的一部分是寫(xiě)入了大量的hash值相同的key,value也超過(guò)了2G的大小。但這一部分的測(cè)評(píng)其實(shí)沒(méi)有體現(xiàn)在官方的log中,這個(gè)發(fā)現(xiàn)也為最后的改動(dòng)提供了一點(diǎn)幫助。
- 閱讀了千里碼賽碼會(huì)的總結(jié)分享,學(xué)到了一些mmap的內(nèi)容,知道了在程序異常退出的時(shí)候,哪怕mmap的內(nèi)存數(shù)據(jù)并沒(méi)有落盤(pán),kernel也會(huì)在你的進(jìn)程被kill之后,回寫(xiě)到磁盤(pán)。這里就已經(jīng)是內(nèi)核態(tài)的操作了,只要服務(wù)器不真正的斷電,數(shù)據(jù)的安全性是有保證的,這還是非常有幫助的。
第四版 mmap讀寫(xiě)key,F(xiàn)ileChannel讀寫(xiě)value,64 + 64 240.69s
有第三版最后發(fā)現(xiàn)的內(nèi)容,我們打算再對(duì)key的寫(xiě)入做一些改動(dòng),也就是將fileChannel寫(xiě)入key的方式改動(dòng)為mmap寫(xiě)入。而mmap映射的文件大小選擇一個(gè)稍大的值,open之后的寫(xiě)入offset通過(guò)value文件的大小來(lái)確定(valuelen / 4096 * 12),這一優(yōu)化帶來(lái)的大約2~3s的提升。
除此之外,還進(jìn)行了簡(jiǎn)單的jvm調(diào)優(yōu)工作,將新生代和老年代的比例進(jìn)行了調(diào)整,將原來(lái)1:1的比例調(diào)整為了6:1,這部分優(yōu)化帶來(lái)了大約2s的性能提升。
最后完整的代碼這一塊我就直接貼在下面,對(duì)整個(gè)過(guò)程有興趣的也可以去我的github上clone下來(lái)查看。
package com.alibabacloud.polar_race.engine.common;
import com.alibabacloud.polar_race.engine.common.exceptions.EngineException;
import com.alibabacloud.polar_race.engine.common.exceptions.RetCodeEnum;
import com.carrotsearch.hppc.LongIntHashMap;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class EngineRace extends AbstractEngine {
private static Logger logger = LoggerFactory.getLogger(EngineRace.class);
// key+offset 長(zhǎng)度 16B
private static final int KEY_AND_OFF_LEN = 12;
// 線程數(shù)量
private static final int THREAD_NUM = 64;
// value 長(zhǎng)度 4K
private static final int VALUE_LEN = 4096;
//每個(gè)map存儲(chǔ)的key數(shù)量
private static final int PER_MAP_COUNT = 1024000;
private static final int SHIFT_NUM = 12;
// 存放 value 的文件數(shù)量 128
private static final int FILE_COUNT = 64;
private static final int HASH_VALUE = 0x3F;
private static final LongIntHashMap[] keyMap = new LongIntHashMap[THREAD_NUM];
static {
for (int i = 0; i < THREAD_NUM; i++) {
keyMap[i] = new LongIntHashMap(PER_MAP_COUNT, 0.98);
}
}
//key 文件的fileChannel
private static FileChannel[] keyFileChannels = new FileChannel[THREAD_NUM];
private static AtomicInteger[] keyOffsets = new AtomicInteger[THREAD_NUM];
private static MappedByteBuffer[] keyMappedByteBuffers = new MappedByteBuffer[THREAD_NUM];
//value 文件的fileChannel
private static FileChannel[] fileChannels = new FileChannel[FILE_COUNT];
private static AtomicInteger[] valueOffsets = new AtomicInteger[FILE_COUNT];
private static FastThreadLocal<ByteBuffer> localBufferValue = new FastThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() throws Exception {
return ByteBuffer.allocate(VALUE_LEN);
}
};
@Override
public void open(String path) throws EngineException {
File file = new File(path);
// 創(chuàng)建目錄
if (!file.exists()) {
if (!file.mkdir()) {
throw new EngineException(RetCodeEnum.IO_ERROR, "創(chuàng)建文件目錄失?。? + path);
} else {
logger.info("創(chuàng)建文件目錄成功:" + path);
}
}
RandomAccessFile randomAccessFile;
// file是一個(gè)目錄時(shí)進(jìn)行接下來(lái)的操作
if (file.isDirectory()) {
try {
//先 創(chuàng)建 FILE_COUNT個(gè)FileChannel 供write順序?qū)懭?,并由此文件獲取value文件的大小
for (int i = 0; i < FILE_COUNT; i++) {
try {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
FileChannel channel = randomAccessFile.getChannel();
fileChannels[i] = channel;
// 從 length處直接寫(xiě)入
valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
keyOffsets[i] = new AtomicInteger(valueOffsets[i].get() * KEY_AND_OFF_LEN);
} catch (IOException e) {
e.printStackTrace();
}
}
//先構(gòu)建keyFileChannel 和 初始化 map
for (int i = 0; i < THREAD_NUM; i++) {
randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
FileChannel channel = randomAccessFile.getChannel();
keyFileChannels[i] = channel;
keyMappedByteBuffers[i] = channel.map(FileChannel.MapMode.READ_WRITE, 0, PER_MAP_COUNT * 20);
}
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
if (!(keyOffsets[i].get() == 0)) {
final long off = keyOffsets[i].get();
final int finalI = i;
final MappedByteBuffer buffer = keyMappedByteBuffers[i];
new Thread(() -> {
int start = 0;
while (start < off) {
start += KEY_AND_OFF_LEN;
keyMap[finalI].put(buffer.getLong(), buffer.getInt());
}
countDownLatch.countDown();
}).start();
} else {
countDownLatch.countDown();
}
}
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
} else {
throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一個(gè)目錄");
}
}
@Override
public void write(byte[] key, byte[] value) throws EngineException {
long numkey = Util.bytes2long(key);
int hash = valueFileHash(numkey);
int off = valueOffsets[hash].getAndIncrement();
try {
ByteBuffer keyBuffer = keyMappedByteBuffers[hash].slice();
keyBuffer.position(keyOffsets[hash].getAndAdd(KEY_AND_OFF_LEN));
keyBuffer.putLong(numkey).putInt(off);
//將value寫(xiě)入buffer
ByteBuffer valueBuffer = localBufferValue.get();
valueBuffer.clear();
valueBuffer.put(value);
valueBuffer.flip();
fileChannels[hash].write(valueBuffer, ((long) off) << SHIFT_NUM);
} catch (IOException e) {
throw new EngineException(RetCodeEnum.IO_ERROR, "寫(xiě)入數(shù)據(jù)出錯(cuò)");
}
}
@Override
public byte[] read(byte[] key) throws EngineException {
long numkey = Util.bytes2long(key);
int hash = valueFileHash(numkey);
long off = keyMap[hash].getOrDefault(numkey, -1);
ByteBuffer buffer = localBufferValue.get();
if (off == -1) {
throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");
}
try {
buffer.clear();
fileChannels[hash].read(buffer, off << SHIFT_NUM);
} catch (IOException e) {
throw new EngineException(RetCodeEnum.IO_ERROR, "讀取數(shù)據(jù)出錯(cuò)");
}
return buffer.array();
}
@Override
public void range(byte[] lower, byte[] upper, AbstractVisitor visitor) throws EngineException {
}
@Override
public void close() {
for (int i = 0; i < FILE_COUNT; i++) {
try {
keyFileChannels[i].close();
fileChannels[i].close();
} catch (IOException e) {
logger.error("close error");
}
}
}
private static int valueFileHash(long key) {
return (int) (key & HASH_VALUE);
}
}
這一版寫(xiě)的代碼和之前有點(diǎn)不同如下:
- 這里value讀寫(xiě)的時(shí)候用的ByteBuffer是HeapByteBuffer,本來(lái)用DirectByteBuffer是想著對(duì)外內(nèi)存相對(duì)來(lái)說(shuō)寫(xiě)入會(huì)更快,但實(shí)際上將byte[] value寫(xiě)入buffer的時(shí)候避免不了將 其從堆內(nèi)拷貝到堆外的過(guò)程。而查看了FileChannel的write方法源碼時(shí),發(fā)現(xiàn)其對(duì)文件的寫(xiě)入都是基于DirectByteBuffer進(jìn)行的,其本身會(huì)維護(hù)一個(gè)堆外內(nèi)存的緩存,測(cè)試之后發(fā)現(xiàn)兩者的性能相差無(wú)幾,所以也沒(méi)有再關(guān)注這個(gè)
- mmap寫(xiě)入key的時(shí)候調(diào)用了slice方法,目的是獲取MappedByteBuffer的一個(gè)切片,因?yàn)槠鋵?xiě)入是非線程安全的,實(shí)質(zhì)上內(nèi)部是通過(guò)調(diào)用unsafe的putByte實(shí)現(xiàn)的。
總結(jié)
初賽剛寫(xiě)的時(shí)候有一點(diǎn)中間件性能大賽復(fù)賽類似的地方,不過(guò)相比來(lái)說(shuō)還是多學(xué)會(huì)了許多知識(shí)。我其實(shí)也嘗試了利用unsafe來(lái)實(shí)現(xiàn)內(nèi)存拷貝的一部分,但是似乎并沒(méi)有起到一個(gè)好的效果,感覺(jué)主要還是我的使用姿勢(shì)有些不正確,我把這一部分的有關(guān)代碼放在了github的unsafe分支中,有興趣也可以簡(jiǎn)單查看一下。
正在進(jìn)行的是復(fù)賽,相比初賽來(lái)說(shuō)增加了一個(gè)全量順序遍歷的需求,難度更大,也更有意思了,感覺(jué)復(fù)賽更考的是一部分設(shè)計(jì)方面的東西了,接下來(lái)還是會(huì)使用Java繼續(xù)參加,如果有所收獲的話,還會(huì)再寫(xiě)一篇博客進(jìn)行相應(yīng)的總結(jié)。