使用Golang在數(shù)秒內(nèi)讀取16GB的文件

By Ohm Patel

當(dāng)今世界的任何計(jì)算機(jī)系統(tǒng)每天都會(huì)生成大量的日志或數(shù)據(jù)。隨著系統(tǒng)的增長(zhǎng),將調(diào)試數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)中是不可行的,因?yàn)樗鼈兪遣豢勺兊?,而且只用于分析和故障解決目的。因此,組織傾向于將其存儲(chǔ)在文件中,這些文件駐留在本地磁盤(pán)存儲(chǔ)中。

我們將使用Golang從16 GB的.txt或.log文件中提取數(shù)百萬(wàn)行日志。

Lets Code…! 開(kāi)始編碼...!

讓我們先打開(kāi)文件。我們將使用標(biāo)準(zhǔn)的Go os.File用于任何文件IO。

 f, err := os.Open(fileName) if err != nil {
       fmt.Println("cannot able to read the file", err)
       return
}// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦文件被打開(kāi),我們有以下兩個(gè)選項(xiàng)繼續(xù)進(jìn)行

  1. 逐行讀取文件,這有助于減少對(duì)內(nèi)存的壓力,但將花費(fèi)更多的時(shí)間在IO。
  2. 一次將整個(gè)文件讀入內(nèi)存并處理該文件,這會(huì)消耗更多內(nèi)存,但會(huì)顯著增加時(shí)間。

當(dāng)文件太大時(shí),比如16GB,我們無(wú)法將整個(gè)文件加載到內(nèi)存中。但是第一個(gè)選項(xiàng)對(duì)我們來(lái)說(shuō)也是不可行的,因?yàn)槲覀兿M趲酌腌妰?nèi)處理文件。

但你猜怎么著,還有第三種選擇。瞧…!在將整個(gè)文件加載到內(nèi)存時(shí),我們將使用bufio.NewReader()塊加載文件,在Go中可用。

r := bufio.NewReader(f)for {buf := make([]byte,4*1024) //the chunk sizen, err := r.Read(buf) //loading chunk into buffer
       buf = buf[:n]if n == 0 {
       
         if err != nil {
           fmt.Println(err)
           break
         }
         if err == io.EOF {
           break
         }
         return err
      }
}

一旦我們有了數(shù)據(jù)塊,我們將fork一個(gè)線程,即Go例程,來(lái)與其他數(shù)據(jù)塊并發(fā)地處理每個(gè)數(shù)據(jù)塊。以上代碼將更改為-

//sync pools to reuse the memory and decrease the preassure on //Garbage CollectorlinesPool := sync.Pool{New: func() interface{} {
            lines := make([]byte, 500*1024)
            return lines
    }}stringPool := sync.Pool{New: func() interface{} {
              lines := ""
              return lines
    }}slicePool := sync.Pool{New: func() interface{} {
               lines := make([]string, 100)
               return lines
    }}r := bufio.NewReader(f)var wg sync.WaitGroup //wait group to keep track off all threadsfor {
         
         buf := linesPool.Get().([]byte)
         n, err := r.Read(buf)
         buf = buf[:n]if n == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
         }nextUntillNewline, err := r.ReadBytes('\n')//read entire line
         
         if err != io.EOF {
             buf = append(buf, nextUntillNewline...)
         }
         
         wg.Add(1)
         go func() { 
          
            //process each chunk concurrently
            //start -> log start time, end -> log end time
            
            ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)wg.Done()
         
         }()
    }wg.Wait()}

上面的代碼引入了兩個(gè)新的優(yōu)化:-

  1. sync.Pool是一個(gè)強(qiáng)大的實(shí)例池,可以重用它來(lái)減少垃圾收集器的壓力。我們將重新使用分配給各個(gè)片的內(nèi)存。它幫助我們減少內(nèi)存消耗,使我們的工作速度顯著加快。
  2. 幫助我們并行處理緩沖區(qū)塊的 Go Routines ,大大提高了處理速度。

現(xiàn)在讓我們實(shí)現(xiàn)ProcessChunk函數(shù),它將處理日志行,這些日志行是這種格式的

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我們將根據(jù)命令行提供的時(shí)間戳提取日志。

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {//another wait group to process every chunk further                             
          var wg2 sync.WaitGrouplogs := stringPool.Get().(string)logs = string(chunk)linesPool.Put(chunk) //put back the chunk in pool//split the string by "\n", so that we have slice of logs
          logsSlice := strings.Split(logs, "\n")stringPool.Put(logs) //put back the string poolchunkSize := 100 //process the bunch of 100 logs in threadn := len(logsSlice)noOfThread := n / chunkSizeif n%chunkSize != 0 { //check for overflow 
             noOfThread++
          }length := len(logsSlice)//traverse the chunk
         for i := 0; i < length; i += chunkSize {
             
             wg2.Add(1)//process each chunk in saperate chunk
             go func(s int, e int) {
                for i:= s; i<e;i++{
                   text := logsSlice[i]if len(text) == 0 {
                      continue
                   }
               
                logParts := strings.SplitN(text, ",", 2)
                logCreationTimeString := logParts[0]
                logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)if err != nil {
                     fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                     return
                }// check if log's timestamp is inbetween our desired period
              if logCreationTime.After(start) && logCreationTime.Before(end) {
              
                fmt.Println(text)
               }
            }
            textSlice = nil
            wg2.Done()
         
         }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
       //passing the indexes for processing}  
       wg2.Wait() //wait for a chunk to finish
       logsSlice = nil}

上面的代碼使用16GB的日志文件進(jìn)行基準(zhǔn)測(cè)試。

提取日志所需的時(shí)間約為25秒。

下面是整個(gè)項(xiàng)目的代碼.

func main() {
    
        s := time.Now()
        args := os.Args[1:]
        if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
            fmt.Println("Please give proper command line arguments")
            return
        }
        startTimeArg := args[1]
        finishTimeArg := args[3]
        fileName := args[5]
    
        file, err := os.Open(fileName)
        
        if err != nil {
            fmt.Println("cannot able to read the file", err)
            return
        }
        
        defer file.Close() //close after checking err
        
        queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
        if err != nil {
            fmt.Println("Could not able to parse the start time", startTimeArg)
            return
        }
    
        queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
        if err != nil {
            fmt.Println("Could not able to parse the finish time", finishTimeArg)
            return
        }
    
        filestat, err := file.Stat()
        if err != nil {
            fmt.Println("Could not able to get the file stat")
            return
        }
    
        fileSize := filestat.Size()
        offset := fileSize - 1
        lastLineSize := 0
    
        for {
            b := make([]byte, 1)
            n, err := file.ReadAt(b, offset)
            if err != nil {
                fmt.Println("Error reading file ", err)
                break
            }
            char := string(b[0])
            if char == "\n" {
                break
            }
            offset--
            lastLineSize += n
        }
    
        lastLine := make([]byte, lastLineSize)
        _, err = file.ReadAt(lastLine, offset+1)
    
        if err != nil {
            fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
            return
        }
    
        logSlice := strings.SplitN(string(lastLine), ",", 2)
        logCreationTimeString := logSlice[0]
    
        lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
        if err != nil {
            fmt.Println("can not able to parse time : ", err)
        }
    
        if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
            Process(file, queryStartTime, queryFinishTime)
        }
    
        fmt.Println("\nTime taken - ", time.Since(s))
    }
    
    func Process(f *os.File, start time.Time, end time.Time) error {
    
        linesPool := sync.Pool{New: func() interface{} {
            lines := make([]byte, 250*1024)
            return lines
        }}
    
        stringPool := sync.Pool{New: func() interface{} {
            lines := ""
            return lines
        }}
    
        r := bufio.NewReader(f)
    
        var wg sync.WaitGroup
    
        for {
            buf := linesPool.Get().([]byte)
    
            n, err := r.Read(buf)
            buf = buf[:n]
    
            if n == 0 {
                if err != nil {
                    fmt.Println(err)
                    break
                }
                if err == io.EOF {
                    break
                }
                return err
            }
    
            nextUntillNewline, err := r.ReadBytes('\n')
    
            if err != io.EOF {
                buf = append(buf, nextUntillNewline...)
            }
    
            wg.Add(1)
            go func() {
                ProcessChunk(buf, &linesPool, &stringPool, start, end)
                wg.Done()
            }()
    
        }
    
        wg.Wait()
        return nil
    }
    
    func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
    
        var wg2 sync.WaitGroup
    
        logs := stringPool.Get().(string)
        logs = string(chunk)
    
        linesPool.Put(chunk)
    
        logsSlice := strings.Split(logs, "\n")
    
        stringPool.Put(logs)
    
        chunkSize := 300
        n := len(logsSlice)
        noOfThread := n / chunkSize
    
        if n%chunkSize != 0 {
            noOfThread++
        }
    
        for i := 0; i < (noOfThread); i++ {
    
            wg2.Add(1)
            go func(s int, e int) {
                defer wg2.Done() //to avaoid deadlocks
                for i := s; i < e; i++ {
                    text := logsSlice[i]
                    if len(text) == 0 {
                        continue
                    }
                    logSlice := strings.SplitN(text, ",", 2)
                    logCreationTimeString := logSlice[0]
    
                    logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
                    if err != nil {
                        fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
                        return
                    }
    
                    if logCreationTime.After(start) && logCreationTime.Before(end) {
                        //fmt.Println(text)
                    }
                }
                
    
            }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
        }
    
        wg2.Wait()
        logsSlice = nil
    }

你可以通過(guò)ohm.patel1997@gmail.com聯(lián)系我。

任何疑問(wèn)和改進(jìn)是最受歡迎的。??

你也可以發(fā)表評(píng)論下面進(jìn)一步懷疑和贊揚(yáng)總是受歡迎的。????

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 前期誤操作,導(dǎo)致數(shù)據(jù)庫(kù)表刪除,雖然數(shù)據(jù)量不多,但是通過(guò)binlog恢復(fù)比較麻煩,通過(guò)備份文件來(lái)恢復(fù),備份文件...
    daoshud1閱讀 735評(píng)論 0 0
  • Golang 操作文件的讀取的方法很多,適用的場(chǎng)景也是各不相同,在此我們將文件的讀取分為如下幾種 :文件整體讀取文...
    楚江云閱讀 9,241評(píng)論 0 6
  • 參考Golang文件操作整理golang中的文件讀寫(xiě) 一、API 參考Go語(yǔ)言學(xué)習(xí)筆記(五)文件操作 1.os.F...
    合肥黑閱讀 15,723評(píng)論 0 8
  • 讀寫(xiě)文件是數(shù)據(jù)分析中常用的操作。Python內(nèi)置了讀寫(xiě)文件的函數(shù)。需要了解的是,在磁盤(pán)上讀寫(xiě)文件的功能都是由操作系...
    無(wú)敵的肉包閱讀 378評(píng)論 1 2
  • 從文件中讀取數(shù)據(jù)文本文件可存儲(chǔ)的數(shù)據(jù)量多得難以置信:天氣數(shù)據(jù)、交通數(shù)據(jù)、社會(huì)經(jīng)濟(jì)數(shù)據(jù)、文學(xué)作品等。每當(dāng)需要分析或修...
    阿耀王子閱讀 4,502評(píng)論 0 3

友情鏈接更多精彩內(nèi)容