E1.2 Go語言實現超大文本文件按行排序和去重復行

對超大文本文件進行排序(這里的排序一般指按行進行排序),是一種很特殊需求,這種“超大”的文本文件一般是指遠遠超出內存大小因而無法一次加載進內存來進行排序的文件,它的處理方式和一般的排序算法肯定會有所不同。另外,有時候還需要對文本進行去除重復行的任務。本文將給出一個算法思路和實際示例。


-> 首先用一個io.Reader對象來分段讀取文件中的內容,假設文件大小有10G(字節(jié))之大,而可用內存僅為4G,比較保守地我們可以每次讀取200M左右的數據寫入依次編號的文件中,也就是用200M字節(jié)大小的字節(jié)切片作為緩沖區(qū);這樣會產生50個新的文件,每個文件大小約為200M字節(jié);

-> 在寫入這50個文件時,可以先進行按行排序;這樣就得到50個內部已經排好序的文本文件;而由于每個文件的數據量遠遠小于內存總數量,所以這個排序在內存中是沒有問題的;

-> 新建一個用于寫入最終結果的文本文件,并用bufio.Writer來進行追加寫入,且可以每次只追加一行;

-> 然后同時打開前面那50個排好序的文件,用bufio.Reader來從每個文件讀取第一行,并比較這50個第一行中哪個排名最靠前(最大或最小,根據升序或降序的排序要求),假設第5個文件中的這行最靠前,則將這一行追加到結果文件,然后再從第5個文件中讀取下一行替代已被追加到結果的那行重新與其他行做比較;

-> 如此循環(huán)重復,直至所有的文件被讀取完畢;此時的輸出文件就是已經經過排序好的最終文件了。


這樣,循環(huán)執(zhí)行完畢之后,結果文件就將是完全排好序之后的超大文本文件了。


下面是使用這個思路實現的實際示例代碼:

本文中的代碼可以在https://github.com/topxeq/goexamples/tree/master/duplicate1中找到。

package?main

import?(

????"bufio"

????"io"

????"os"

????"path/filepath"

????"sort"

????"strings"

????"github.com/topxeq/tk"

)

func?main()?{

????var?errT?error

????//?獲取第1個命令行參數(實際上是第二個命令行參數,可執(zhí)行文件名是第1個,序號為0)

????fileNameT?:=?tk.GetParameterByIndexWithDefaultValue(os.Args,?1,?"")

????//?如果命令行參數中沒有指定文件名則報錯

????if?fileNameT?==?""?{

????????tk.Pl("文件名不能為空")

????????return

????}

????//?如果文件不存在也報錯

????if?!tk.IfFileExists(fileNameT)?{

????????tk.Pl("文件?%v?不存在。",?fileNameT)

????????return

????}

????//?limitLineCountT限制每個分塊文件的大?。ㄐ袛担?/p>

????//?從命令行參數中可以用-size=100000這樣的參數來設置,默認為5000000行

????limitLineCountT?:=?tk.GetSwitchWithDefaultIntValue(os.Args,?"-size=",?5000000)

????//?總行數

????lineCountT?:=?0

????//?分塊文件數

????fileCountT?:=?0

????//?打開原始文件準備進行讀取

????fileT,?errT?:=?os.Open(fileNameT)

????if?errT?!=?nil?{

????????tk.Pl("打開文件時發(fā)生錯誤:%v",?errT)

????????return

????}

????//?創(chuàng)建一個緩沖式讀取器對象

????readerT?:=?bufio.NewReader(fileT)

????//?ifEOFT用于判斷是否讀到了文件末尾

????ifEOFT?:=?false

????//?臨時變量,用于存儲字符串

????var?tmps?string

????//?反復循環(huán)從源文件中讀取行,直至讀到文件末尾

????//?每次讀取最多l(xiāng)imitLineCountT行,寫入臨時文件中,超出則繼續(xù)寫到下一個臨時文件中

????//?臨時文件名按數字進行排序,存于變量fileCountT中

????for?!ifEOFT?{

????????//?分配指定大小的切片(可以理解為Go語言中的可變長數組)準備放置讀取到的文本行

????????bufT?:=?make([]string,?0,?limitLineCountT)

????????fileCountT++

????????tk.Pl("正在讀取第%v組數據",?fileCountT)

????????//?臨時文件名,tk.Spr函數相當于fmt.Sprintf函數

????????//?本例中的臨時文件名將依次為sub00000001.txt、sub00000002.txt...

????????subFileNameT?:=?tk.Spr("sub%08d.txt",?fileCountT)

????????//?默認將臨時文件放在執(zhí)行時的當前目錄下

????????subPathT?:=?filepath.Join("./",?subFileNameT)

????????//?循環(huán)讀取limitLineCountT次,試圖讀取limitLineCountT行文本

????????for?j?:=?0;?j?<?limitLineCountT;?j++?{

????????????strT,?errT?:=?readerT.ReadString('\n')

????????????if?errT?!=?nil?{

????????????????//?讀到文件結尾時的處理

????????????????if?errT?==?io.EOF?{

????????????????????tmps?=?tk.Trim(strT)

????????????????????if?tmps?!=?""?{

????????????????????????bufT?=?append(bufT,?tmps)

????????????????????}

????????????????????ifEOFT?=?true

????????????????}?else?{

????????????????????tk.Pl("文件讀取失敗:%v",?errT)

????????????????????fileT.Close()

????????????????????os.Exit(1)

????????????????}

????????????????break

????????????}

????????????tmps?=?tk.Trim(strT)

????????????//?本例中空行將被丟棄,即不處理空行(包括含有空格等空白字符的行)

????????????if?tmps?!=?""?{

????????????????bufT?=?append(bufT,?tmps)

????????????}

????????}

????????//?對讀取到的最多l(xiāng)imitLineCountT行文本進行排序

????????tk.Pl("正在排序第%v組數據",?fileCountT)

????????sort.Sort(sort.StringSlice(bufT))

????????//?保存排序后的文本到臨時文件

????????tk.Pl("正在保存第%v組數據到臨時文件%v",?fileCountT,?subPathT)

????????rse?:=?tk.SaveStringListBuffered(bufT,?subPathT,?"\n")

????????if?tk.IsErrorString(rse)?{

????????????tk.Pl("保存臨時文件%v失敗:%v",?subPathT,?tk.GetErrorString(rse))

????????????fileT.Close()

????????????os.Exit(1)

????????}

????????//?記錄總共處理的行數

????????lineCountT?+=?len(bufT)

????}

????fileT.Close()

????tk.Pl("共讀取了%v行,寫入了%v個臨時文件",?lineCountT,?fileCountT)

????//?排序寫

????tk.Pl("進行多文件排序并去除重復行……")

????//?存放臨時文件讀取器的變量

????filesT?:=?make([]*os.File,?fileCountT)

????readersT?:=?make([]*bufio.Reader,?fileCountT)

????//?用于進行對多個文件讀取的第一行進行大小比對排序的變量

????strBufT?:=?make([]string,?fileCountT)

????compareBufT?:=?make([]int,?fileCountT)

????selIndexT?:=?0

????//?用于保存當前寫入的行,用于去除重復行

????currentLineT?:=?""

????//?統(tǒng)計整體讀取的行數和寫入的行數

????readCountT?:=?0

????writeCountT?:=?0

????//?打開多個臨時文件用于同時讀取

????for?i?:=?1;?i?<=?fileCountT;?i++?{

????????subPathT?:=?filepath.Join("./",?tk.Spr("sub%08d.txt",?i))

????????tk.Pl("打開臨時文件%v準備讀取",?subPathT)

????????filesT[i-1],?errT?=?os.Open(subPathT)

????????if?errT?!=?nil?{

????????????tk.Pl("打開文件時發(fā)生錯誤:%v",?errT)

????????????os.Exit(1)

????????}

????????readersT[i-1]?=?bufio.NewReader(filesT[i-1])

????}

????//?創(chuàng)建一個新文件用于寫入最終結果,默認為當前目錄下的output.txt文件

????outputFileT,?errT?:=?os.Create("./output.txt")

????if?errT?!=?nil?{

????????tk.Pl("創(chuàng)建輸出文件時發(fā)生錯誤:%v",?errT)

????????os.Exit(1)

????}

????//?創(chuàng)建寫入器

????outputWriterT?:=?bufio.NewWriter(outputFileT)

????//?用于判斷是否是寫入的第一行

????//?如果不是第一行,將再寫入每一行文本之前,先寫入一個回車換行符

????notFirstFlagT?:=?false

????//?循環(huán)讀取并寫入結果文件

????for?true?{

????????var?lineT?string

????????//?記錄一共被關閉了多少個臨時文件,表示已經有多少個臨時文件被讀取完畢

????????var?closedFileT?=?0

????????//?是否讀到文件結尾

????????var?eofT?bool

????????//?從各個文件中都讀取一行,空行將被丟棄

????????for?k?:=?0;?k?<?fileCountT;?k++?{

????????????if?readersT[k]?==?nil?{

????????????????closedFileT++

????????????????continue

????????????}

????????????//?如果某個文件對應的一行已空,則再讀一行

????????????if?strBufT[k]?==?""?{

????????????????foundT?:=?false

????????????????for?readersT[k]?!=?nil?{

????????????????????lineT,?eofT,?errT?=?tk.ReadLineFromBufioReader(readersT[k])

????????????????????if?errT?!=?nil?{

????????????????????????tk.Pl("從臨時文件%v中讀取數據時發(fā)生錯誤:%v",?k,?errT)

????????????????????????os.Exit(1)

????????????????????}

????????????????????lineT?=?tk.Trim(lineT)

????????????????????if?eofT?{

????????????????????????readersT[k]?=?nil

????????????????????????filesT[k].Close()

????????????????????}

????????????????????if?lineT?==?""?{

????????????????????????continue

????????????????????}

????????????????????foundT?=?true

????????????????????break

????????????????}

????????????????if?foundT?{

????????????????????strBufT[k]?=?lineT

????????????????}

????????????}

????????}

????????//?進行計數式比對,找出排名最靠前的一行

????????var?compareT?int

????????for?ii?:=?0;?ii?<?fileCountT;?ii++?{

????????????compareBufT[ii]?=?0

????????}

????????for?ii?:=?0;?ii?<?(fileCountT?-?1);?ii++?{

????????????if?strBufT[ii]?==?""?{

????????????????continue

????????????}

????????????for?jj?:=?ii?+?1;?jj?<?fileCountT;?jj++?{

????????????????if?strBufT[jj]?==?""?{

????????????????????compareBufT[ii]++

????????????????????continue

????????????????}

????????????????compareT?=?strings.Compare(strBufT[ii],?strBufT[jj])

????????????????if?compareT?>?0?{

????????????????????compareBufT[jj]++

????????????????}?else?if?compareT?<?0?{

????????????????????compareBufT[ii]++

????????????????}

????????????}

????????}

????????maxT?:=?0

????????for?kk?:=?0;?kk?<?fileCountT;?kk++?{

????????????if?compareBufT[kk]?>?maxT?{

????????????????maxT?=?compareBufT[kk]

????????????????selIndexT?=?kk

????????????}

????????}

????????//?處理只有一個文件時的比對

????????if?fileCountT?==?1?&&?strBufT[0]?!=?""?{

????????????maxT?=?1

????????????selIndexT?=?0

????????}

????????//?如果所有行都是空行,說明已經讀取完畢所有文件,將退出循環(huán)

????????if?maxT?<=?0?{

????????????tk.Pl("讀取緩沖區(qū)全部為空")

????????????break

????????}

????????readCountT++

????????//?如果將要寫入的一行與上一行一樣,說明是重復行,則丟棄

????????//?由此實現去除重復行的效果

????????//?注意此方法僅對排序后的文本才是正確的

????????if?currentLineT?!=?""?{

????????????if?strBufT[selIndexT]?==?currentLineT?{

????????????????//?tk.Pl("發(fā)現重復行:%v",?currentLineT)

????????????????strBufT[selIndexT]?=?""

????????????????continue

????????????}

????????}

????????currentLineT?=?strBufT[selIndexT]

????????strBufT[selIndexT]?=?""

????????if?notFirstFlagT?{

????????????outputWriterT.WriteString("\r\n")

????????}?else?{

????????????notFirstFlagT?=?true

????????}

????????//?將最終選出的文本行寫入結果文件

????????_,?errT?=?outputWriterT.WriteString(currentLineT)

????????if?errT?!=?nil?{

????????????tk.Pl("向輸出文件中寫入數據時發(fā)生錯誤:%v",?errT)

????????????os.Exit(1)

????????}

????????writeCountT++

????????//?所有文件如果都已關閉,說明都已讀取完,循環(huán)將終止

????????if?closedFileT?==?fileCountT?{

????????????break

????????}

????}

????//?由于使用的是bufio,即緩沖方式寫入文件,注意一定要用Flush來保證在內存中的數據被確保真正寫入文件中

????outputWriterT.Flush()

????outputFileT.Close()

????tk.Pl("處理完畢(共寫入%v行),按q鍵加回車退出……",?writeCountT)

}



下面是該代碼執(zhí)行的結果示例截圖:

其中,test1.txt是大小近1G,包含4千多萬行的文本文件。可以看出,最終去除重復后只剩下3千多萬行。處理過程中分割成為了10個臨時文件,每個文件最多5百萬行。最終的結果存放在output.txt中,我們可以用前文中的preview1程序對其進行預覽查看,可以看出其中的內容是經過排序的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容