Java Nio 系列
Java Nio 之Buffer
Java Nio 之直接內(nèi)存
Java Nio 之高級(jí)搬磚工(FileChannel) 一
Java Nio 之高級(jí)搬磚工(FileChannel)二
前言
前段時(shí)間同事分享了一篇文章給我:為什么Kafka速度這么快? ,這篇文章相信大家也都看了。這篇文章說(shuō)Kafka 有個(gè)作弊的技能 :直接從文件某個(gè)位置處讀取某個(gè)長(zhǎng)度的字節(jié)直接發(fā)送給消費(fèi)者,不需要讀到應(yīng)用程序里然后緩存在ByteBuffer 然后再往 客戶端寫(xiě);當(dāng)時(shí)就對(duì)這項(xiàng)技術(shù)很著迷,上網(wǎng)搜了很多資料 很納悶它是怎么實(shí)現(xiàn)的;上周在介紹FileChannel 的時(shí)候本來(lái)想只寫(xiě)一篇文章的,后來(lái)看到了它的map 、transeferTo以及TranseferFrom 方法就覺(jué)得一篇文章寫(xiě)不完,因?yàn)槿唛L(zhǎng)的文章誰(shuí)都不想看,所以另寫(xiě)一篇來(lái)研究一下 FileChannel 的高性能之處,以及介紹下Kafka是怎么使用的。
談?wù)劻憧截?/h2>
牢騷一下
Kafka 的高性能的重要點(diǎn)之一就在零拷貝上。零拷貝不是真的零拷貝,只不過(guò)是減少了拷貝的次數(shù),為的不是減少DMA的拷貝次數(shù),而是CPU 的拷貝次數(shù),為啥呢?因?yàn)榭截愂莻€(gè)很簡(jiǎn)單的操作,占著CPU 的時(shí)間片簡(jiǎn)直就是高射炮打蚊子。
傳統(tǒng) Linux 服務(wù)器 傳輸數(shù)據(jù) 的流程
- 1.應(yīng)用程序調(diào)用系統(tǒng)方法read(),切換上下文:用戶——內(nèi)核,操作系統(tǒng)會(huì)先檢查頁(yè)面緩存里是否有要read 的內(nèi)容,如果有則進(jìn)行第二步,如果沒(méi)有則需要讓DMA 從指定磁盤(pán)位置上拷貝數(shù)據(jù)到內(nèi)核緩沖區(qū)中,第一次拷貝由DMA 執(zhí)行
- 2.CPU 將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū),read 調(diào)用返回,切換上下文:內(nèi)核——用戶,第二次拷貝由CPU 執(zhí)行
- 3.應(yīng)用程序調(diào)用系統(tǒng)write() 函數(shù) ,切換上下文:用戶——內(nèi)核,CPU 將用戶緩沖區(qū)的數(shù)據(jù)拷貝到socket 緩沖區(qū),第三次拷貝由CPU執(zhí)行
- 4.write 調(diào)用返回,切換上下文:內(nèi)核——用戶,然后DMA 異步將socket 緩沖區(qū)的數(shù)據(jù)拷貝到協(xié)議引擎中
總結(jié)一下,需要4次拷貝,其中有兩次是需要CPU的執(zhí)行,切換了4次上下文
零拷貝的兩種實(shí)現(xiàn)方式
mmap + write 方式
-
何為mmap 呢—— 將一個(gè)文件或者其它對(duì)象映射進(jìn)內(nèi)存。映射到的這塊內(nèi)存區(qū)域在用戶程序使用的內(nèi)存空間 和 棧之間不在內(nèi)核內(nèi)存空間, 因此內(nèi)核程序和用戶程序都可以訪問(wèn),如下草圖:
image.png mmap、munmap和msync()函數(shù)
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
addr映射區(qū)的開(kāi)始位置length映射區(qū)的長(zhǎng)度-
prot期望的內(nèi)存保護(hù)標(biāo)志,可由如下幾種方式組合:- PROT_EXEC 頁(yè)內(nèi)容可被執(zhí)行
- PROT_READ 頁(yè)內(nèi)容可被讀取
- PROT_WRITE 頁(yè)內(nèi)容可被寫(xiě)入
- PROT_NONE 頁(yè)內(nèi)容不可以被訪問(wèn)
-
flags影響內(nèi)存區(qū)域的各種特性,可由以下幾種方式組合:- MAP_FIXED 使用指定的起始位置,若是addr和length 重疊于現(xiàn)存的映射空間則重疊部分會(huì)丟失,不會(huì)對(duì)地址做出修正,不建議使用該選項(xiàng)。
- MAP_SHARED 對(duì)該映射區(qū)域的更改會(huì)同步到文件里,而且允許其它映射該文件的進(jìn)程共享該映射區(qū)域
- MAP_PRIVATE 對(duì)映射區(qū)域的寫(xiě)入操作會(huì)產(chǎn)生一個(gè)映射文件的復(fù)制,即私有的“寫(xiě)入時(shí)復(fù)制”(copy on write)對(duì)此區(qū)域作的任何修改都不會(huì)寫(xiě)回原來(lái)的文件內(nèi)容。
- MAP_NORESERVE 不要為這個(gè)映射保留交換空間。當(dāng)交換空間被保留,對(duì)映射區(qū)修改的可能會(huì)得到保證。當(dāng)交換空間不被保留,同時(shí)內(nèi)存不足,對(duì)映射區(qū)的修改會(huì)引起段違例信號(hào)
- MAP_ANONYMOUS 匿名映射,不與任何文件關(guān)聯(lián)
fd被映射對(duì)象,若為匿名映射則為-1offset被映射對(duì)象的起始偏移
int munmap(void *addr, size_t length);
調(diào)用該函數(shù)可以解除 映射對(duì)象與addr 處開(kāi)始的length 長(zhǎng)度的內(nèi)存空間的映射關(guān)系
addr mmap 函數(shù)返回的映射區(qū)域首地址
length 映射區(qū)域的長(zhǎng)度
int msync ( void * addr , size_t len, int flags) ;
一般情況下 對(duì)映射空間的共享內(nèi)容更改不會(huì)直接寫(xiě)到文件里,當(dāng)然執(zhí)行完 munmap 函數(shù)也可以,除了執(zhí)行它,還可以執(zhí)行 msync 函數(shù)來(lái)將修改的共享內(nèi)容同步到文件
說(shuō)說(shuō)mmap + write 的 流程
- 用戶程序調(diào)用mmap函數(shù),將 文件內(nèi)容映射到內(nèi)存映射區(qū)域。先由內(nèi)存空間切換到內(nèi)核空間,然后由內(nèi)核空間切換到用戶空間,完成兩次上下文切換,DMA 將文件內(nèi)容拷貝到內(nèi)存映射區(qū)域
- 調(diào)用write 函數(shù),cpu將 內(nèi)存映射區(qū)域的內(nèi)容拷貝到 socket緩沖區(qū),程序調(diào)用返回然后DMA 異步從socket 緩沖區(qū)拷貝到協(xié)議引擎的緩沖區(qū)
發(fā)生 1次cpu 拷貝,2次DMA 拷貝
來(lái)看看Java 下的mmap
抽象類(lèi) MappedByteBuffer
定義
直接字節(jié)緩沖區(qū),其內(nèi)容是文件的內(nèi)存映射區(qū)域。可由FileChannel#map 方法創(chuàng)建。該類(lèi)通過(guò)增加對(duì)內(nèi)存映射區(qū)域的特定操作擴(kuò)展了ByteFuffer 類(lèi)。映射字節(jié)緩沖區(qū)與它所映射的文件直到它自己被垃圾回收之前都是存在的。
tips
映射字節(jié)緩沖區(qū)的內(nèi)容任何時(shí)候都可以被修改,例如,映射文件對(duì)應(yīng)的區(qū)域被當(dāng)前程序或者其他程序所更改。至于是否發(fā)生或者什么時(shí)候發(fā)生,都由操作系統(tǒng)來(lái)決定。
映射字節(jié)緩沖區(qū)的部分或者全部在任何時(shí)候都會(huì)變得不可訪問(wèn),例如,映射的文件被截?cái)嗔?。嘗試訪問(wèn)不可訪問(wèn)的映射字節(jié)的緩沖區(qū)的那一部分,將會(huì)有不友好的異常拋出。需要強(qiáng)烈的提醒,避免讓當(dāng)前程序或者其他程序?qū)@個(gè)映射文件進(jìn)行操作,除了讀或者寫(xiě)它的內(nèi)容。
方法
load() 該方法會(huì)盡最大可能將映射文件里的內(nèi)容加載到物理內(nèi)存中,可能會(huì)在加載的時(shí)候?qū)е乱恍╉?yè)面錯(cuò)誤和IO操作。
isLoad() 返回 映射文件內(nèi)容是否駐留在物理內(nèi)存中。
force() 對(duì)映射內(nèi)存區(qū)域的寫(xiě)入,并不會(huì)直接同步到文件中, 在解除映射關(guān)系的時(shí)候修改的內(nèi)容才會(huì)同步到文件中。 調(diào)用該方法會(huì)將對(duì)映射區(qū)域的修改同步到磁盤(pán),這就與上面的方法msync方法對(duì)應(yīng)。
FileChannel # map 方法
方法簽名
public abstract MappedByteBuffer map(MapMode mode,long position, long size) throws IOException;
參數(shù)小解
mode 為MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一個(gè),分別表示 只讀,可讀可寫(xiě)和寫(xiě)時(shí)復(fù)制與上述 mmap 方法中flags參數(shù)對(duì)應(yīng)
position 從文件的哪里開(kāi)始映射,對(duì)應(yīng)上述 mmap 方法 中的offset參數(shù)
size 從文件position處開(kāi)始映射多少個(gè)字節(jié)
Java 語(yǔ)言實(shí)現(xiàn) mmap+write
簡(jiǎn)述:將文件a.txt 中的0到14個(gè)字節(jié)發(fā)給服務(wù)端
package zym.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
* file channel map study
*
* @author 24160
*/
public class FileChannelMapStudy {
public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
public static final int INT_BYTES_LENGTH = 4;
public static void main(String[] args) {
prepareEnviroment();
try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
long size = fileChannel.size();
//將a.txt 文件映射到內(nèi)存緩沖區(qū),從0位置處映射,映射10個(gè)字節(jié)長(zhǎng)度,該映射內(nèi)存緩沖區(qū)只可讀
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
//創(chuàng)建一個(gè)SocketChannel實(shí)例
SocketChannel client = SocketChannel.open();
//連接服務(wù)端
client.connect(new InetSocketAddress("127.0.0.1", 8080));
//寫(xiě)文件內(nèi)容到服務(wù)端
client.write(mappedByteBuffer);
//讀取文件內(nèi)容 網(wǎng)絡(luò)協(xié)議為 head + body 如6zengyi
ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
while (client.read(head) != 0) {}
//切換讀寫(xiě)模式
head.flip();
//讀取body
ByteBuffer body = ByteBuffer.allocate(head.getInt());
while (client.read(body) != 0) {}
//切換讀寫(xiě)模式
body.flip();
System.out.println(String.format("發(fā)送字節(jié)成功,服務(wù)端返回:%s", new String(body.array())));
} catch (IOException e) {
e.printStackTrace();
}
}
private static void prepareEnviroment() {
try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
//將a.txt 映射文件到內(nèi)存映射區(qū)域,模式為可讀可寫(xiě)
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
//放進(jìn)去一個(gè)int 為10
mappedByteBuffer.putInt(10);
mappedByteBuffer.put("zengyiming".getBytes());
//強(qiáng)制刷盤(pán)
mappedByteBuffer.force();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務(wù)端代碼詳見(jiàn):NioServer.java
下面我們來(lái)看看kafka 是如何使用mmap,kafka AbstractIndex.scala 代碼片段
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
/* 如果是新創(chuàng)建則給file 預(yù)留分配空間 maxIndexSize 不超過(guò)50MB 單位為字節(jié) */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
/* 開(kāi)始內(nèi)存映射文件*/
_length = raf.length()
val idx = {
if (writable)
/*如果可寫(xiě),則映射模式為可讀可寫(xiě)*/
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
/*若可讀,則映射模式為可讀*/
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
/*為下一個(gè)條目 設(shè)置 buffer 中的position值*/
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
//如果這是一個(gè)預(yù)先存在的索引,則假設(shè)它有效并將位置設(shè)置為最后一個(gè)條目
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex)
}
}
kafka 的索引文件是映射到內(nèi)存映射區(qū)域的,對(duì)消息偏移量的讀寫(xiě)都是基于MappedByteBuffer 之上,當(dāng)然牛逼的kafka 作者們 發(fā)明了一個(gè)簡(jiǎn)單且緩存命中友好的二叉查找算法,這個(gè)算法有機(jī)會(huì)和大家聊下。
