DirectByteBuffer解析和文件IO詳解

java.nio 包里,是java用于處理IO的新的API,它使用channel、select等模型,重新對(duì)IO操作進(jìn)行了新的實(shí)現(xiàn)。

DirectByteBuffer就是nio包下面的一個(gè)類。這個(gè)類用于保存byte數(shù)組,其特別之處在于:他將數(shù)據(jù)保存在堆外內(nèi)存。不像傳統(tǒng)的對(duì)象,對(duì)象都在堆中。這樣的好處就是對(duì)于 IO操作,減少了內(nèi)存copy次數(shù),從而增加效率。這里以文件IO進(jìn)行講解

在這里我們先把結(jié)論說一下:

a. 傳統(tǒng)的IO操作(就是使用java.io包的api)訪問磁盤文件,數(shù)據(jù)需要copy的次數(shù):

1. 磁盤文件的數(shù)據(jù) copy 內(nèi)核page cache 

2. 內(nèi)核的數(shù)據(jù) copy  應(yīng)用程序空間(即:jvm 堆外內(nèi)存)

3. jvm堆外內(nèi)存  copy  jvm堆內(nèi) 內(nèi)存

為什么2、和3 不合并,將內(nèi)核數(shù)據(jù) copy jvm堆內(nèi)內(nèi)存。 因?yàn)閖vm進(jìn)行系統(tǒng)調(diào)用進(jìn)行讀文件時(shí)候,此時(shí)發(fā)生gc,那么堆內(nèi)存的對(duì)應(yīng)地址就會(huì)移動(dòng),所以直接copy到堆內(nèi)是有問題的。

b. 使用DirectByteBuffer訪問磁盤文件,數(shù)據(jù)需要copy的次數(shù):

   1. 磁盤文件的數(shù)據(jù) copy 內(nèi)核page cache 

   2. 內(nèi)核的數(shù)據(jù) copy  應(yīng)用程序空間(即:DirectByteBuffer)

所以DirectByteBuffer減少了內(nèi)存copy次數(shù)。

1.傳統(tǒng)文件IO解析

文件讀取示例:

 FileInputStream input = new FileInputStream("/data");
 byte[] b = new byte[SIZE]; 
 input.read(b);

byte數(shù)組示堆內(nèi)存對(duì)象,此處將數(shù)據(jù)copy 到j(luò)vm堆內(nèi)存。我們看一下read函數(shù)內(nèi)部實(shí)現(xiàn)

 public int read(byte b[]) throws IOException {  
     return readBytes(b, 0, b.length);
 }
 private native int readBytes(byte b[], int off, int len) throws IOException;

我們看到 read函數(shù)最終調(diào)用 native函數(shù) readBytes。

jint readBytes(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len, jfieldID fid) {
    jint nread;
    char stackBuf[ BUF_SIZE];
    char *buf = NULL;
    FD fd;
    if (IS_NULL(bytes)) {
        JNU_ThrowNullPointerException(env, NULL);
        return -1;
    }
    if (outOfBounds(env, off, len, bytes)) {
        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
        return -1;
    }
    if (len == 0) {
        return 0;
    } else if (len > BUF_SIZE) {
        buf = malloc(len);
        if (buf == NULL) {
            JNU_ThrowOutOfMemoryError(env, NULL);
            return 0;
        }
    } else {
        buf = stackBuf;
    }
    fd = GET_FD(this, fid);
    if (fd == -1) {
        JNU_ThrowIOException(env, "Stream Closed");
        nread = -1;
    } else {
        nread = IO_Read(fd, buf, len);
        if (nread > 0) {
            ( * env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
        } else if (nread == -1) {
            JNU_ThrowIOExceptionWithLastError(env, "Read error");
        } else { /* EOF */
            nread = -1;
        }
    }
    if (buf != stackBuf) {
        free(buf);
    }
    return nread;
}

我們看到最終通過IO_Read將緩沖數(shù)據(jù)讀到buf中去,這個(gè)IO_Read其實(shí)是一個(gè)宏定義:

define IO_Read handleRead

handleRead函數(shù)實(shí)現(xiàn)如下,這里你可以看到這里進(jìn)行了read系統(tǒng)調(diào)用:

 ssize_t handleRead(FD fd, void *buf, jint len)  { 
     ssize_t result; 
     RESTARTABLE(read(fd, buf, len), result); 
     return result; 
 }

buf返回之后,由SetByteArrayRegion這個(gè)JNI函數(shù)拷貝到了bytes,它的具體實(shí)現(xiàn)如下(下面定義了一個(gè)通用的宏函數(shù)來表示各種數(shù)據(jù)類型數(shù)組區(qū)域的設(shè)置,可以將Result宏替換成Byte即可理解):

 JNI_ENTRY(void,  jni_Set##Result##ArrayRegion(JNIEnv *env, ElementType##Array array, jsize start,  jsize len, const ElementType *buf))  
   JNIWrapper("Set" XSTR(Result) "ArrayRegion");  
   DTRACE_PROBE5(hotspot_jni, Set##Result##ArrayRegion__entry, env, array, start, len, buf); 
   DT_VOID_RETURN_MARK(Set##Result##ArrayRegion);  
   typeArrayOop dst = typeArrayOop(JNIHandles::resolve_non_null(array));  
   if (start < 0 || len < 0 || ((unsigned int)start + (unsigned int)len > (unsigned int)dst->length())) {  
     THROW(vmSymbols::java_lang_ArrayIndexOutOfBoundsException());  
   } else { 
     if (len > 0) {  
       int sc = TypeArrayKlass::cast(dst->klass())->log2_element_size();  
       memcpy((u_char*) dst->Tag##_at_addr(start), 
              (u_char*) buf,  
              len << sc);    
     } 
  }  
 JNI_END

(以上內(nèi)容部門來源:https://www.zhihu.com/question/65415926

由此可見,native方法,readBytes而采用了C Heap - JVM Heap進(jìn)行內(nèi)存拷貝的方式進(jìn)行數(shù)據(jù)傳遞。

而readBytes 通過調(diào)用 handleRead 進(jìn)行讀寫。handleRead就是讀取內(nèi)核緩存區(qū)數(shù)據(jù)。內(nèi)核數(shù)據(jù)來源文件。

2. DirectByteBuffer

DirectByteBuffer 是構(gòu)建在堆外的內(nèi)存的對(duì)象。

DirectByteBuffer是包級(jí)別可訪問的,通過 ByteBuffer.allocateDirect(int capacity) 進(jìn)行構(gòu)造。


 public static ByteBuffer allocateDirect(int capacity) { 
      return new DirectByteBuffer(capacity);
 }

我們看一下DirectByteBuffer 構(gòu)造函數(shù)實(shí)現(xiàn)


 DirectByteBuffer(int cap) {// package-private 
     super(-1,0, cap, cap); 
     boolean pa = VM.isDirectMemoryPageAligned(); 
     int ps = Bits.pageSize(); 
     long size = Math.max(1L, (long)cap + (pa ? ps :0)); 
     Bits.reserveMemory(size, cap); 
     long base =0; 
     try { 
         base =unsafe.allocateMemory(size); 
     }catch (OutOfMemoryError x) { 
         Bits.unreserveMemory(size, cap); 
         throw x;
     }
 
     unsafe.setMemory(base, size, (byte)0); 
     if (pa && (base % ps !=0)) { 
         // Round up to page boundary
         address = base + ps - (base & (ps -1)); 
     }else { 
         address = base; 
     }
 
     cleaner = Cleaner.create(this,new Deallocator(base, size, cap));    
     att =null;
 }

這里我們主要關(guān)注這幾個(gè)地方:

1.unsafe.allocateMemory(size);

利用 unsafe 類在堆外內(nèi)存(C_HEAP)中分配了一塊空間,這是一個(gè) native 函數(shù),轉(zhuǎn)到進(jìn)行堆外內(nèi)存分配的 C/C++ 代碼

 inline char* AllocateHeap( size_t size, MEMFLAGS flags, address pc = 0, AllocFailType alloc_failmode = AllocFailStrategy::EXIT_OOM){ 
    // ... 省略 
   char*p=(char*)os::malloc(size, flags, pc); 
   // 分配在 C_HEAP 上并返回指向內(nèi)存區(qū)域的指針 
   // ... 省略 
   return p; 
 }

2.cleaner = Cleaner.create(this,new Deallocator(base, size, cap));

cleaner對(duì)象是對(duì)DirectByteBuffer占用對(duì)堆外內(nèi)存進(jìn)行清理。DirectByteBuffer.cleaner().clean() 進(jìn)行手動(dòng)清理。我們看一下clean() 函數(shù)


 public void clean() { 
     //....省略 
     this.thunk.run(); 
     //....省略 
 }

其中 thunk就是我們 Cleaner.create(this,new Deallocator(base, size, cap)); 中的Deallocator??匆幌翫eallocator。

 private static class Deallocator implements Runnable  { 
 //。。。省略 
     public void run() { 
         if (address ==0) { 
             // Paranoia 
             return; 
            } 
         unsafe.freeMemory(address); 
         address =0; 
         Bits.unreserveMemory(size,capacity); 
     } 
 }

可以看到其是一個(gè)線程進(jìn)行 堆外內(nèi)存的釋放動(dòng)作。

cleaner是PhantomReference的子類。

PhantomReference它其實(shí)主要是用來跟蹤對(duì)象何時(shí)被回收的,它不能影響gc決策,但是gc過程中如果發(fā)現(xiàn)某個(gè)對(duì)象除了只有PhantomReference引用它之外,并沒有其他的地方引用它了,那將會(huì)把這個(gè)引用放到j(luò)ava.lang.ref.Reference.pending隊(duì)列里,在gc完畢的時(shí)候通知ReferenceHandler這個(gè)守護(hù)線程去執(zhí)行一些后置處理。這個(gè)處理方法中,就會(huì)判斷是否是cleaner對(duì)象,如果是,就執(zhí)行clean()函數(shù)。

因此DirectByteBuffer并不需要我們手動(dòng)清理內(nèi)存。當(dāng)jvm進(jìn)行g(shù)c(oldgc)的時(shí)候,就會(huì)清理沒有引用的 dirctByteBuffer。

當(dāng)我們一直申請(qǐng)DirectByteBuffer。其實(shí)占用的是堆外內(nèi)存,堆內(nèi)內(nèi)存只是占用一個(gè)引用。如果一直觸發(fā)不了gc,那么堆外內(nèi)存就不會(huì)回收,導(dǎo)致jvm進(jìn)程占用內(nèi)存很大。我們可以通過-XX:MaxDirectMemorySize限制DirecByteBuffer占用堆外內(nèi)存的大小

3.Bits.reserveMemory(size, cap);

 static void reserveMemory(long size,int cap) { 
     synchronized (Bits.class) { 
         if (!memoryLimitSet && VM.isBooted()) { 
             maxMemory = VM.maxDirectMemory(); 
             memoryLimitSet =true; 
         } 
         // -XX:MaxDirectMemorySize limits the total capacity rather than the 
         // actual memory usage, which will differ when buffers are page 
         // aligned. 
         if (cap <=maxMemory -totalCapacity) { 
             reservedMemory += size; 
             totalCapacity += cap; 
             count++; 
             return; 
         } 
     } 
     System.gc(); 
     try { 
         Thread.sleep(100); 
     }catch (InterruptedException x) { 
         // Restore interrupt status 
         Thread.currentThread().interrupt(); 
     } 
     synchronized (Bits.class) { 
         if (totalCapacity + cap >maxMemory) 
             throw new OutOfMemoryError("Direct buffer memory"); 
         reservedMemory += size; 
         totalCapacity += cap; 
         count++; 
         } 
 }

該函數(shù)用于統(tǒng)計(jì)DirectByteBuffer占用的大小。VM.maxDirectMemory()是jvm允許申請(qǐng)的最大DirectBuffer的大?。╔X:MaxDirectMemorySize 通過這個(gè)參數(shù)設(shè)置)

如果發(fā)現(xiàn)當(dāng)前申請(qǐng)的空間,大于限制的空間,就會(huì)觸發(fā)一次gc,上面說過gc會(huì)回收哪些之前不使用的directBuffer。然后再次申請(qǐng)。

VM.maxDirectMemory() 大小是如何設(shè)置的內(nèi),在VM類有這樣一段代碼

 public static void saveAndRemoveProperties(Properties var0) { 
     //....
     String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize"); 
     if (var1 !=null) { 
         if (var1.equals("-1")) { 
             directMemory = Runtime.getRuntime().maxMemory(); 
         }else {
             long var2 = Long.parseLong(var1); 
             if (var2 > -1L) { 
             directMemory = var2; 
             } 
     } 
     //...
 }

"sun.nio.MaxDirectMemorySize" 這個(gè)屬性就是通過 -XX:MaxDirectMemorySize 這個(gè)參數(shù)設(shè)置的。如果我們不指定這個(gè)jvm參數(shù),筆者在jdk8中測(cè)試了一下,默認(rèn)是-1,這樣就導(dǎo)致directBufffer內(nèi)存限制為進(jìn)程最大內(nèi)存。當(dāng)然這也是一個(gè)潛在風(fēng)險(xiǎn)。

風(fēng)險(xiǎn)案例:

筆者曾在線上運(yùn)行一個(gè)應(yīng)用。該應(yīng)用就是從消息隊(duì)列中消費(fèi)數(shù)據(jù),然后將數(shù)據(jù)處理后存到Hbase中。但是應(yīng)用運(yùn)行每次運(yùn)行2周左右,機(jī)器就會(huì)出現(xiàn)swap占用過大。經(jīng)過分析,是jvm進(jìn)程占用內(nèi)存太大,但是分析jvm相關(guān)參數(shù)(堆、線程大?。?,并沒有設(shè)置的很大。最后發(fā)現(xiàn)原來是directBuffer占用達(dá)到了10G。后面通過-XX:MaxDirectMemorySize=2048m 限制directbuffer使用量,解決了問題。每次directBuffer占用達(dá)到2G,就會(huì)觸發(fā)一次fullgc,將之前的無用directbuffer回收掉。hbase一個(gè)坑,有時(shí)間筆者會(huì)整理這個(gè)案例。

3.DirectByteBuffer文件IO

文件讀取示例:

FileChannel filechannel=new RandomAccessFile("/data/appdatas/cat/mmm","rw").getChannel(); 
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SIZE); 
filechannel.read(byteBuffer)

我們看一下read函數(shù)

 public int read(ByteBuffer var1)throws IOException { 
    //。。。。 
    var3 = IOUtil.read(this.fd, var1, -1L,this.nd); 
    //。。。。 
 }

主要邏輯調(diào)用IOUtil.read。我們看一下這個(gè)函數(shù)

 static int read(FileDescriptor var0, ByteBuffer var1,long var2, NativeDispatcher var4)throws IOException { 
     if (var1.isReadOnly()) { 
         throw new IllegalArgumentException("Read-only buffer");
     }else if (var1instanceof DirectBuffer) { 
         return readIntoNativeBuffer(var0, var1, var2, var4); 
     }else { 
         ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); 
         int var7; 
     try { 
         int var6 = readIntoNativeBuffer(var0, var5, var2, var4); 
         var5.flip(); 
         if (var6 >0) { 
             var1.put(var5); 
         } 
         var7 = var6; 
     }finally { 
         Util.offerFirstTemporaryDirectBuffer(var5); 
     } 
     return var7; 
     } 
 }

主要方法就是通過 readIntoNativeBuffer 這個(gè)函數(shù)將數(shù)據(jù)讀入 directBuffer中,其中readIntoNativeBuffer也是調(diào)用一個(gè)native方法。

通過上面的代碼,我們會(huì)看到,如果fielchannel.read(ByteBuffer) 也可以傳入一個(gè)HeapByteBuffer,這個(gè)類是堆中。如果是這個(gè)類,那么內(nèi)部讀取的時(shí)候,會(huì)把數(shù)據(jù)先讀到DirectByteBuffer中,然后在copy到HeapByteBuffer中。Util.getTemporaryDirectBuffer(var1.remaining());就是獲取一個(gè)DirectBuffer對(duì)像。因?yàn)镈irectBuffer創(chuàng)建的時(shí)候,開銷比較大,所以使用的時(shí)候一般會(huì)用一個(gè)池子來管理。有興趣可以看一下Util這個(gè)類里面的實(shí)現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容