Java Nio 之高級(jí)搬磚工(FileChannel)二

Java Nio 系列
Java Nio 之Buffer
Java Nio 之直接內(nèi)存
Java Nio 之高級(jí)搬磚工(FileChannel) 一
Java Nio 之高級(jí)搬磚工(FileChannel)二

前言

前段時(shí)間同事分享了一篇文章給我:為什么Kafka速度這么快? ,這篇文章相信大家也都看了。這篇文章說(shuō)Kafka 有個(gè)作弊的技能 :直接從文件某個(gè)位置處讀取某個(gè)長(zhǎng)度的字節(jié)直接發(fā)送給消費(fèi)者,不需要讀到應(yīng)用程序里然后緩存在ByteBuffer 然后再往 客戶端寫(xiě);當(dāng)時(shí)就對(duì)這項(xiàng)技術(shù)很著迷,上網(wǎng)搜了很多資料 很納悶它是怎么實(shí)現(xiàn)的;上周在介紹FileChannel 的時(shí)候本來(lái)想只寫(xiě)一篇文章的,后來(lái)看到了它的map 、transeferTo以及TranseferFrom 方法就覺(jué)得一篇文章寫(xiě)不完,因?yàn)槿唛L(zhǎng)的文章誰(shuí)都不想看,所以另寫(xiě)一篇來(lái)研究一下 FileChannel 的高性能之處,以及介紹下Kafka是怎么使用的。

談?wù)劻憧截?/h2>

牢騷一下

Kafka 的高性能的重要點(diǎn)之一就在零拷貝上。零拷貝不是真的零拷貝,只不過(guò)是減少了拷貝的次數(shù),為的不是減少DMA的拷貝次數(shù),而是CPU 的拷貝次數(shù),為啥呢?因?yàn)榭截愂莻€(gè)很簡(jiǎn)單的操作,占著CPU 的時(shí)間片簡(jiǎn)直就是高射炮打蚊子。

傳統(tǒng) Linux 服務(wù)器 傳輸數(shù)據(jù) 的流程

  • 1.應(yīng)用程序調(diào)用系統(tǒng)方法read(),切換上下文:用戶——內(nèi)核,操作系統(tǒng)會(huì)先檢查頁(yè)面緩存里是否有要read 的內(nèi)容,如果有則進(jìn)行第二步,如果沒(méi)有則需要讓DMA 從指定磁盤(pán)位置上拷貝數(shù)據(jù)到內(nèi)核緩沖區(qū)中,第一次拷貝由DMA 執(zhí)行
  • 2.CPU 將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū),read 調(diào)用返回,切換上下文:內(nèi)核——用戶,第二次拷貝由CPU 執(zhí)行
  • 3.應(yīng)用程序調(diào)用系統(tǒng)write() 函數(shù) ,切換上下文:用戶——內(nèi)核,CPU 將用戶緩沖區(qū)的數(shù)據(jù)拷貝到socket 緩沖區(qū),第三次拷貝由CPU執(zhí)行
  • 4.write 調(diào)用返回,切換上下文:內(nèi)核——用戶,然后DMA 異步將socket 緩沖區(qū)的數(shù)據(jù)拷貝到協(xié)議引擎中
    總結(jié)一下,需要4次拷貝,其中有兩次是需要CPU的執(zhí)行,切換了4次上下文

零拷貝的兩種實(shí)現(xiàn)方式

mmap + write 方式

  • 何為mmap 呢—— 將一個(gè)文件或者其它對(duì)象映射進(jìn)內(nèi)存。映射到的這塊內(nèi)存區(qū)域在用戶程序使用的內(nèi)存空間 和 棧之間不在內(nèi)核內(nèi)存空間, 因此內(nèi)核程序和用戶程序都可以訪問(wèn),如下草圖:


    image.png
  • mmap 、 munmapmsync()函數(shù)

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  • addr 映射區(qū)的開(kāi)始位置

  • length 映射區(qū)的長(zhǎng)度

  • prot 期望的內(nèi)存保護(hù)標(biāo)志,可由如下幾種方式組合:

    • PROT_EXEC 頁(yè)內(nèi)容可被執(zhí)行
    • PROT_READ 頁(yè)內(nèi)容可被讀取
    • PROT_WRITE 頁(yè)內(nèi)容可被寫(xiě)入
    • PROT_NONE 頁(yè)內(nèi)容不可以被訪問(wèn)
  • flags 影響內(nèi)存區(qū)域的各種特性,可由以下幾種方式組合:

    • MAP_FIXED 使用指定的起始位置,若是addr和length 重疊于現(xiàn)存的映射空間則重疊部分會(huì)丟失,不會(huì)對(duì)地址做出修正,不建議使用該選項(xiàng)。
    • MAP_SHARED 對(duì)該映射區(qū)域的更改會(huì)同步到文件里,而且允許其它映射該文件的進(jìn)程共享該映射區(qū)域
    • MAP_PRIVATE 對(duì)映射區(qū)域的寫(xiě)入操作會(huì)產(chǎn)生一個(gè)映射文件的復(fù)制,即私有的“寫(xiě)入時(shí)復(fù)制”(copy on write)對(duì)此區(qū)域作的任何修改都不會(huì)寫(xiě)回原來(lái)的文件內(nèi)容。
    • MAP_NORESERVE 不要為這個(gè)映射保留交換空間。當(dāng)交換空間被保留,對(duì)映射區(qū)修改的可能會(huì)得到保證。當(dāng)交換空間不被保留,同時(shí)內(nèi)存不足,對(duì)映射區(qū)的修改會(huì)引起段違例信號(hào)
    • MAP_ANONYMOUS 匿名映射,不與任何文件關(guān)聯(lián)
  • fd 被映射對(duì)象,若為匿名映射則為-1

  • offset 被映射對(duì)象的起始偏移

int munmap(void *addr, size_t length);
調(diào)用該函數(shù)可以解除 映射對(duì)象與addr 處開(kāi)始的length 長(zhǎng)度的內(nèi)存空間的映射關(guān)系

addr mmap 函數(shù)返回的映射區(qū)域首地址
length 映射區(qū)域的長(zhǎng)度

int msync ( void * addr , size_t len, int flags) ;
一般情況下 對(duì)映射空間的共享內(nèi)容更改不會(huì)直接寫(xiě)到文件里,當(dāng)然執(zhí)行完 munmap 函數(shù)也可以,除了執(zhí)行它,還可以執(zhí)行 msync 函數(shù)來(lái)將修改的共享內(nèi)容同步到文件

說(shuō)說(shuō)mmap + write 的 流程

  • 用戶程序調(diào)用mmap函數(shù),將 文件內(nèi)容映射到內(nèi)存映射區(qū)域。先由內(nèi)存空間切換到內(nèi)核空間,然后由內(nèi)核空間切換到用戶空間,完成兩次上下文切換,DMA 將文件內(nèi)容拷貝到內(nèi)存映射區(qū)域
  • 調(diào)用write 函數(shù),cpu將 內(nèi)存映射區(qū)域的內(nèi)容拷貝到 socket緩沖區(qū),程序調(diào)用返回然后DMA 異步從socket 緩沖區(qū)拷貝到協(xié)議引擎的緩沖區(qū)
    發(fā)生 1次cpu 拷貝,2次DMA 拷貝

來(lái)看看Java 下的mmap

抽象類(lèi) MappedByteBuffer

定義

直接字節(jié)緩沖區(qū),其內(nèi)容是文件的內(nèi)存映射區(qū)域。可由FileChannel#map 方法創(chuàng)建。該類(lèi)通過(guò)增加對(duì)內(nèi)存映射區(qū)域的特定操作擴(kuò)展了ByteFuffer 類(lèi)。映射字節(jié)緩沖區(qū)與它所映射的文件直到它自己被垃圾回收之前都是存在的。

tips

映射字節(jié)緩沖區(qū)的內(nèi)容任何時(shí)候都可以被修改,例如,映射文件對(duì)應(yīng)的區(qū)域被當(dāng)前程序或者其他程序所更改。至于是否發(fā)生或者什么時(shí)候發(fā)生,都由操作系統(tǒng)來(lái)決定。
映射字節(jié)緩沖區(qū)的部分或者全部在任何時(shí)候都會(huì)變得不可訪問(wèn),例如,映射的文件被截?cái)嗔?。嘗試訪問(wèn)不可訪問(wèn)的映射字節(jié)的緩沖區(qū)的那一部分,將會(huì)有不友好的異常拋出。需要強(qiáng)烈的提醒,避免讓當(dāng)前程序或者其他程序?qū)@個(gè)映射文件進(jìn)行操作,除了讀或者寫(xiě)它的內(nèi)容。

方法

load() 該方法會(huì)盡最大可能將映射文件里的內(nèi)容加載到物理內(nèi)存中,可能會(huì)在加載的時(shí)候?qū)е乱恍╉?yè)面錯(cuò)誤和IO操作。
isLoad() 返回 映射文件內(nèi)容是否駐留在物理內(nèi)存中。
force() 對(duì)映射內(nèi)存區(qū)域的寫(xiě)入,并不會(huì)直接同步到文件中, 在解除映射關(guān)系的時(shí)候修改的內(nèi)容才會(huì)同步到文件中。 調(diào)用該方法會(huì)將對(duì)映射區(qū)域的修改同步到磁盤(pán),這就與上面的方法msync方法對(duì)應(yīng)。

FileChannel # map 方法

方法簽名

public abstract MappedByteBuffer map(MapMode mode,long position, long size)  throws IOException;

參數(shù)小解

mode 為MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一個(gè),分別表示 只讀,可讀可寫(xiě)和寫(xiě)時(shí)復(fù)制與上述 mmap 方法中flags參數(shù)對(duì)應(yīng)
position 從文件的哪里開(kāi)始映射,對(duì)應(yīng)上述 mmap 方法 中的offset參數(shù)
size 從文件position處開(kāi)始映射多少個(gè)字節(jié)

Java 語(yǔ)言實(shí)現(xiàn) mmap+write

簡(jiǎn)述:將文件a.txt 中的0到14個(gè)字節(jié)發(fā)給服務(wù)端

package zym.netty.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

/**
 * file channel map study
 *
 * @author 24160
 */
public class FileChannelMapStudy {

    public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
    public static final int INT_BYTES_LENGTH = 4;

    public static void main(String[] args) {
        prepareEnviroment();
        try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
            long size = fileChannel.size();
            //將a.txt 文件映射到內(nèi)存緩沖區(qū),從0位置處映射,映射10個(gè)字節(jié)長(zhǎng)度,該映射內(nèi)存緩沖區(qū)只可讀
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
            //創(chuàng)建一個(gè)SocketChannel實(shí)例
            SocketChannel client = SocketChannel.open();
            //連接服務(wù)端
            client.connect(new InetSocketAddress("127.0.0.1", 8080));
            //寫(xiě)文件內(nèi)容到服務(wù)端
            client.write(mappedByteBuffer);
            //讀取文件內(nèi)容 網(wǎng)絡(luò)協(xié)議為 head + body  如6zengyi
            ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
            while (client.read(head) != 0) {}
            //切換讀寫(xiě)模式
            head.flip();
            //讀取body
            ByteBuffer body = ByteBuffer.allocate(head.getInt());
            while (client.read(body) != 0) {}
            //切換讀寫(xiě)模式
            body.flip();
            System.out.println(String.format("發(fā)送字節(jié)成功,服務(wù)端返回:%s", new String(body.array())));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void prepareEnviroment() {
        try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
            //將a.txt 映射文件到內(nèi)存映射區(qū)域,模式為可讀可寫(xiě)
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
            //放進(jìn)去一個(gè)int 為10
            mappedByteBuffer.putInt(10);
            mappedByteBuffer.put("zengyiming".getBytes());
            //強(qiáng)制刷盤(pán)
            mappedByteBuffer.force();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服務(wù)端代碼詳見(jiàn):NioServer.java
下面我們來(lái)看看kafka 是如何使用mmap,kafka AbstractIndex.scala 代碼片段

 @volatile
  protected var mmap: MappedByteBuffer = {
    val newlyCreated = file.createNewFile()
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
    try {
      /* 如果是新創(chuàng)建則給file 預(yù)留分配空間 maxIndexSize 不超過(guò)50MB 單位為字節(jié) */
      if(newlyCreated) {
        if(maxIndexSize < entrySize)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
      }

      /* memory-map the file */
    /* 開(kāi)始內(nèi)存映射文件*/
      _length = raf.length()
      val idx = {
        if (writable)
          /*如果可寫(xiě),則映射模式為可讀可寫(xiě)*/
          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
        else
        /*若可讀,則映射模式為可讀*/
          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
      }
      /* set the position in the index for the next entry */
    /*為下一個(gè)條目 設(shè)置 buffer 中的position值*/
      if(newlyCreated)
        idx.position(0)
      else
        // if this is a pre-existing index, assume it is valid and set position to last entry
        //如果這是一個(gè)預(yù)先存在的索引,則假設(shè)它有效并將位置設(shè)置為最后一個(gè)條目
        idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
      idx
    } finally {
      CoreUtils.swallow(raf.close(), AbstractIndex)
    }
  }

kafka 的索引文件是映射到內(nèi)存映射區(qū)域的,對(duì)消息偏移量的讀寫(xiě)都是基于MappedByteBuffer 之上,當(dāng)然牛逼的kafka 作者們 發(fā)明了一個(gè)簡(jiǎn)單且緩存命中友好的二叉查找算法,這個(gè)算法有機(jī)會(huì)和大家聊下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 健康運(yùn)動(dòng) 本周運(yùn)動(dòng)五天,其中兩天快走,三天Keep塑形。 減重達(dá)到51.6公斤,結(jié)果還算滿意但仍需繼續(xù)努力減到51...
    主豐閱讀 249評(píng)論 0 1
  • 繼續(xù)昨天,今天情緒升級(jí),爆發(fā),爭(zhēng)吵,埋怨,哭泣,絕望。但是作為一家人。最奇妙的地方就是吵過(guò)就好,我是沒(méi)心沒(méi)肺的不知...
    文倩_1e24閱讀 78評(píng)論 0 0
  • 張愛(ài)玲是一個(gè)不老的話題,張愛(ài)玲的小說(shuō)是讀不盡的蒼涼,“張迷”是不斷涌現(xiàn)的群體。 著名作家張愛(ài)玲的短篇小說(shuō)《傾城之戀...
    李連十三閱讀 528評(píng)論 0 1
  • 今天是小年,吃了餃子,去游泳,在不到一米深的水里,游了幾個(gè)回合,突然發(fā)現(xiàn)自己會(huì)游泳了,當(dāng)然還不會(huì)換氣。雖說(shuō)量變引...
    sl6503閱讀 515評(píng)論 0 0
  • 001 在終極模塊,我們啃了8本書(shū),你收獲最大的10個(gè)點(diǎn)是什么? 1、生命有無(wú)限可能 我生活在一家老企業(yè)里,有很多...
    小草杜杜閱讀 363評(píng)論 3 4

友情鏈接更多精彩內(nèi)容