我們一般提到排序都是指內(nèi)排序,比如快速排序,堆排序,歸并排序等,所謂內(nèi)排序就是可以在內(nèi)存中完成的排序。RAM的訪問(wèn)速度大約是磁盤(pán)的25萬(wàn)倍,我們當(dāng)然希望如果可以的話都是內(nèi)排來(lái)完成。但對(duì)于大數(shù)據(jù)集來(lái)說(shuō),內(nèi)存是遠(yuǎn)遠(yuǎn)不夠的,這時(shí)候就涉及到外排序的知識(shí)了。
外部排序指的是大文件的排序,即待排序的記錄存儲(chǔ)在外存儲(chǔ)器上,待排序的文件無(wú)法一次裝入內(nèi)存,需要在內(nèi)存和外部存儲(chǔ)器之間進(jìn)行多次數(shù)據(jù)交換,以達(dá)到排序整個(gè)文件的目的。
多路歸并排序
基本思想
外部排序最常用的算法是多路歸并排序,即將原文件分解成多個(gè)能夠一次性裝人內(nèi)存的部分,分別把每一部分調(diào)入內(nèi)存完成排序。然后,對(duì)已經(jīng)排序的子文件進(jìn)行歸并排序。
一般來(lái)說(shuō)外排序分為兩個(gè)步驟:預(yù)處理和合并排序。即首先根據(jù)內(nèi)存的大小,將有n個(gè)記錄的磁盤(pán)文件分批讀入內(nèi)存,采用有效的內(nèi)存排序方法進(jìn)行排序,將其預(yù)處理為若干個(gè)有序的子文件,這些有序子文件就是初始順串,然后采用合并的方法將這些初始順串逐趟合并成一個(gè)有序文件。
兩兩歸并排序
假設(shè)有一個(gè)含10000 個(gè)記錄的文件,首先通過(guò)10 次內(nèi)部排序得到10 個(gè)初始?xì)w并段R1~R10 ,其中每一段都含1000 個(gè)記錄。然后兩兩歸并,直至得到一個(gè)有序文件為止
如R1與R2歸并排序成R12,R3與R4排序成R34,再R12與R34排序從R1234
這種方法要?dú)w并排序多次,時(shí)間耗費(fèi)多,不提倡
多路歸并排序
由于多路歸并,有k路,就要比較k-1次,所以有了減少比較次數(shù)的勝者樹(shù)與敗者樹(shù),而多路歸并常用敗者樹(shù)
多路歸并排序算法在常見(jiàn)數(shù)據(jù)結(jié)構(gòu)書(shū)中都有涉及。從2路到多路(k路),增大k可以減少外存信息讀寫(xiě)時(shí)間,但k個(gè)歸并段中選取最小的記錄需要比較k-1次, 為得到u個(gè)記錄的一個(gè)有序段共需要(u-1)(k-1)次,若歸并趟數(shù)為s次,那么對(duì)n個(gè)記錄的文件進(jìn)行外排時(shí),內(nèi)部歸并過(guò)程中進(jìn)行的總的比較次數(shù)為 s(n-1)(k-1),也即(向上取整)(logkm)(k-1)(n-1)=(向上取整)(log2m/log2k)(k-1)(n-1),而(k- 1)/log2k隨k增而增因此內(nèi)部歸并時(shí)間隨k增長(zhǎng)而增長(zhǎng)了,抵消了外存讀寫(xiě)減少的時(shí)間,這樣做不行,由此引出了“敗者樹(shù)”tree of loser的使用。在內(nèi)部歸并過(guò)程中利用敗者樹(shù)將k個(gè)歸并段中選取最小記錄比較的次數(shù)降為(向上取整)(log2k)次使總比較次數(shù)為(向上取整) (log2m)(n-1),與k無(wú)關(guān)。
敗者樹(shù)是完全二叉樹(shù), 因此數(shù)據(jù)結(jié)構(gòu)可以采用一維數(shù)組。其元素個(gè)數(shù)為k個(gè)葉子結(jié)點(diǎn)、k-1個(gè)比較結(jié)點(diǎn)、1個(gè)冠軍結(jié)點(diǎn)共2k個(gè)。ls[0]為冠軍結(jié)點(diǎn),ls[1]--ls[k- 1]為比較結(jié)點(diǎn),ls[k]--ls[2k-1]為葉子結(jié)點(diǎn)(同時(shí)用另外一個(gè)指針?biāo)饕齜[0]--b[k-1]指向)。另外bk為一個(gè)附加的輔助空間,不 屬于敗者樹(shù),初始化時(shí)存著MINKEY的值。
多路歸并排序算法的過(guò)程大致為:
1):首先將k個(gè)歸并段中的首元素關(guān)鍵字依次存入b[0]--b[k-1]的葉子結(jié)點(diǎn)空間里,然后調(diào)用CreateLoserTree創(chuàng)建敗者樹(shù),創(chuàng)建完畢之后最小的關(guān)鍵字下標(biāo)(即所在歸并段的序號(hào))便被存入ls[0]中。然后不斷循環(huán):
2)把ls[0]所存最小關(guān)鍵字來(lái)自于哪個(gè)歸并段的序號(hào)得到為q,將該歸并段的首元素輸出到有序歸并段里,然后把下一個(gè)元素關(guān)鍵字放入上一個(gè)元素本來(lái)所 在的葉子結(jié)點(diǎn)b[q]中,調(diào)用Adjust順著b[q]這個(gè)葉子結(jié)點(diǎn)往上調(diào)整敗者樹(shù)直到新的最小的關(guān)鍵字被選出來(lái),其下標(biāo)同樣存在ls[0]中。循環(huán)這個(gè) 操作過(guò)程直至所有元素被寫(xiě)到有序歸并段里。
偽代碼如下:
void Adjust(LoserTree &ls, int s)
/*從葉子結(jié)點(diǎn)b[s]到根結(jié)點(diǎn)的父結(jié)點(diǎn)ls[0]調(diào)整敗者樹(shù)*/
{ int t, temp;
t=(s+K)/2; /*t為b[s]的父結(jié)點(diǎn)在敗者樹(shù)中的下標(biāo),K是歸并段的個(gè)數(shù)*/
while(t>0) /*若沒(méi)有到達(dá)樹(shù)根,則繼續(xù)*/
{ if(b[s]>b[ls[t]]) /*與父結(jié)點(diǎn)指示的數(shù)據(jù)進(jìn)行比較*/
{ /*ls[t]記錄敗者所在的段號(hào),s指示新的勝者,勝者將去參加更上一層的比較*/
temp=s;
s=ls[t];
ls[t]=temp;
}
t=t/2; /*向樹(shù)根退一層,找到父結(jié)點(diǎn)*/
}
ls[0]=s; /*ls[0]記錄本趟最小關(guān)鍵字所在的段號(hào)*/
}
void K_merge( int ls[K])
/*ls[0]~ls[k-1]是敗者樹(shù)的內(nèi)部比較結(jié)點(diǎn)。b[0]~b[k-1]分別存儲(chǔ)k個(gè)初始?xì)w并段的當(dāng)前記錄*/
/*函數(shù)Get_next(i)用于從第i個(gè)歸并段讀取并返回當(dāng)前記錄*/
{ int b[K+1),i,q;
for(i=0; i<K;i++)
{ b[i]=Get_next(i); /*分別讀取K個(gè)歸并段的第一個(gè)關(guān)鍵字*/ }
b[K]=MINKEY; /*創(chuàng)建敗者樹(shù)*/
for(i=0; i<K ; i++) /*設(shè)置ls中的敗者初值*/
ls[i]=K;
for(i=K-1 ; i>=0 ; i--) /*依次從b[K-1]……b[0]出發(fā)調(diào)整敗者*/
Adjust(ls , i); /*敗者樹(shù)創(chuàng)建完畢,最小關(guān)鍵字序號(hào)存入ls[0]
while(b[ls[0]] !=MAXKEY )
{ q=ls[0]; /*q為當(dāng)前最小關(guān)鍵字所在的歸并段*/
prinftf("%d",b[q]);
b[q]=Get_next(q);
Adjust(ls,q); /*q為調(diào)整敗者樹(shù)后,選擇新的最小關(guān)鍵字*/
}
}
勝者樹(shù)
勝者樹(shù)的一個(gè)優(yōu)點(diǎn)是,如果一個(gè)選手的值改變了,可以很容易地修改這棵勝者樹(shù)。只需要沿著從該結(jié)點(diǎn)到根結(jié)點(diǎn)的路徑修改這棵二叉樹(shù),而不必改變其他比賽的結(jié)果。
注意:方塊,表示最底層要比較的東西,里面的值是要比較的數(shù)值,下面的是數(shù)值對(duì)應(yīng)的index。
圓圈,表示節(jié)點(diǎn)與節(jié)點(diǎn)之間比較的結(jié)果,里面的值是比較后結(jié)果,對(duì)應(yīng)的對(duì)象的index,這里是勝利的數(shù)的index,旁邊的是這個(gè)圓圈對(duì)應(yīng)的index。
Fig.1是一個(gè)勝者樹(shù)的示例。規(guī)定數(shù)值小者勝。
- b3 PK b4,b3勝b4負(fù),內(nèi)部結(jié)點(diǎn)ls[4]的值為3;
2.b3 PK b0,b3勝b0負(fù),內(nèi)部結(jié)點(diǎn)ls[2]的值為3;
3.b1 PK b2,b1勝b2負(fù),內(nèi)部結(jié)點(diǎn)ls[3]的值為1;
4.b3 PK b1,b3勝b1負(fù),內(nèi)部結(jié)點(diǎn)ls[1]的值為3。.
當(dāng)葉子結(jié)點(diǎn)b3的值變?yōu)?1時(shí),重構(gòu)的勝者樹(shù)所示。
1.b3 PK b4,b3勝b4負(fù),內(nèi)部結(jié)點(diǎn)ls[4]的值為3;
2.b3 PK b0,b0勝b3負(fù),內(nèi)部結(jié)點(diǎn)ls[2]的值為0;
3.b1 PK b2,b1勝b2負(fù),內(nèi)部結(jié)點(diǎn)ls[3]的值為1;
4.b0 PK b1,b1勝b0負(fù),內(nèi)部結(jié)點(diǎn)ls[1]的值為1。.
敗者樹(shù)
敗者樹(shù)是勝者樹(shù)的一種變體。在敗者樹(shù)中,用父結(jié)點(diǎn)記錄其左右子結(jié)點(diǎn)進(jìn)行比賽的敗者,而讓勝者參加下一輪的比賽。敗者樹(shù)的根結(jié)點(diǎn)記錄的是敗者,需要加一個(gè)結(jié)點(diǎn)來(lái)記錄整個(gè)比賽的勝利者。采用敗者樹(shù)可以簡(jiǎn)化重構(gòu)的過(guò)程。
注意:方塊,表示最底層要比較的東西,里面的值是要比較的數(shù)值,下面的是數(shù)值對(duì)應(yīng)的index。
圓圈,表示節(jié)點(diǎn)與節(jié)點(diǎn)之間比較的結(jié)果,里面的值是比較后結(jié)果,對(duì)應(yīng)的對(duì)象的index,這里是失敗的數(shù)的index,旁邊的是這個(gè)圓圈對(duì)應(yīng)的index。
線段上的數(shù)字,表示線段下面的圓圈,對(duì)應(yīng)的比較,勝利的數(shù)的index。
一棵敗者樹(shù)。規(guī)定數(shù)大者敗。
- b3 PK b4,b3勝b4負(fù),內(nèi)部結(jié)點(diǎn)ls[4]的值為4;
2.b3 PK b0,b3勝b0負(fù),內(nèi)部結(jié)點(diǎn)ls[2]的值為0;
3.b1 PK b2,b1勝b2負(fù),內(nèi)部結(jié)點(diǎn)ls[3]的值為2;
b3 PK b1,b3勝b1負(fù),內(nèi)部結(jié)點(diǎn)ls[1]的值為1;
在根結(jié)點(diǎn)ls[1]上又加了一個(gè)結(jié)點(diǎn)ls[0]=3,記錄的最后的勝者。
敗者樹(shù)重構(gòu)過(guò)程如下:
將新進(jìn)入選擇樹(shù)的結(jié)點(diǎn)與其父結(jié)點(diǎn)進(jìn)行比賽:將敗者存放在父結(jié)點(diǎn)中;而勝者再與上一級(jí)的父結(jié)點(diǎn)比較。
比賽沿著到根結(jié)點(diǎn)的路徑不斷進(jìn)行,直到ls[1]處。把敗者存放在結(jié)點(diǎn)ls[1]中,勝者存放在ls[0]中。
圖是當(dāng)b3變?yōu)?3時(shí),敗者樹(shù)的重構(gòu)圖。
注意,敗者樹(shù)的重構(gòu)跟勝者樹(shù)是不一樣的,敗者樹(shù)的重構(gòu)只需要與其父結(jié)點(diǎn)比較。對(duì)照Fig. 3來(lái)看,b3與結(jié)點(diǎn)ls[4]的原值比較,ls[4]中存放的原值是結(jié)點(diǎn)4,即b3與b4比較,b3負(fù)b4勝,則修改ls[4]的值為結(jié)點(diǎn)3。同理,以此類(lèi)推,沿著根結(jié)點(diǎn)不斷比賽,直至結(jié)束。
由上可知,敗者樹(shù)簡(jiǎn)化了重構(gòu)。敗者樹(shù)的重構(gòu)只是與該結(jié)點(diǎn)的父結(jié)點(diǎn)的記錄有關(guān),而勝者樹(shù)的重構(gòu)還與該結(jié)點(diǎn)的兄弟結(jié)點(diǎn)有關(guān)。
算法的步驟是:每次從k個(gè)組中的首元素中選一個(gè)最小的數(shù),加入到新組,這樣每次都要比較k-1次,故
算法復(fù)雜度為O((n-1)*(k-1)),而如果使用敗者樹(shù),可以在O(logk)的復(fù)雜度下得到最小的數(shù),算法復(fù)雜
度將為O((n-1)*logk), 對(duì)于外部排序這種數(shù)據(jù)量超大的排序來(lái)說(shuō),這是一個(gè)不小的提高。
敗者樹(shù)的建立與調(diào)整
敗者樹(shù)的java代碼
package algorithm.sort.losertreeSort;
import java.awt.Adjustable;
import java.util.ArrayList;
import java.util.Arrays;
/*
*/
import java.util.Iterator;
/**
* @author xusy
*
*/
import java.util.List;
public class LoserTree {
//數(shù)據(jù)源,為葉子節(jié)點(diǎn)提供數(shù)據(jù),iterator里的是有序的數(shù)據(jù),從小到大
private Iterator<Integer>[] data;
//總共有幾個(gè)數(shù)據(jù)源
private int size;
//葉子節(jié)點(diǎn),數(shù)據(jù)源中具體的數(shù)據(jù),一對(duì)一
private Integer[] leaves;
//非葉子節(jié)點(diǎn),記錄葉子節(jié)點(diǎn)的下標(biāo), 根據(jù)節(jié)點(diǎn)的值可以定位到葉子節(jié)點(diǎn)所指向的數(shù)據(jù)(就是圖里畫(huà)的敗者樹(shù))
//nodes[0]為最小值的索引
private int[] nodes;
/**根據(jù)data,構(gòu)建敗者樹(shù)
* @param data iterator里的是有序的數(shù)據(jù),從小到大
*/
public LoserTree(List<Iterator<Integer>> data){
//因?yàn)閕terator不能變成迭代器數(shù)組,只能變成迭代器列表
this.data=data.toArray(new Iterator[0]);
size=data.size();
leaves=new Integer[size];
nodes=new int[size];
//為葉子節(jié)點(diǎn),leaves數(shù)組賦值
for(int i=0;i<size;i++){
setLeavesValueFromData(i);
}
//找到葉子節(jié)點(diǎn)中最小值的下標(biāo)
int winner=0;
for(int i=1;i<size;i++){
//如果i位元素比winner位元素小
if(compareLeavesByIndex(i, winner)){
winner=i;
}
}
// 非葉子節(jié)點(diǎn)全部初始化為最小值對(duì)應(yīng)的葉子節(jié)點(diǎn)下標(biāo)
Arrays.fill(nodes, winner);
//從后向前依次調(diào)整非葉子節(jié)點(diǎn)
for(int i=size-1;i>=0;i--){
adjust(i);
}
}
/**根據(jù)數(shù)據(jù)源data,設(shè)置leaves[i]的值,如果對(duì)應(yīng)的data沒(méi)有值了,就設(shè)置null
* @param i 位置
*/
public void setLeavesValueFromData(int i){
Iterator<Integer> iterator=data[i];
if(iterator.hasNext()){
leaves[i]=iterator.next();
}
else{
leaves[i]=null;
}
}
/**比較leaves數(shù)組中位置為index1的元素是否小于index2的元素(因?yàn)槭且玫叫〉模? * @param index1
* @param index2
* @return
*/
public boolean compareLeavesByIndex(int index1,int index2){
Integer value1=leaves[index1];
Integer value2=leaves[index2];
if(value1==null){
return false;
}
if(value2==null){
return true;
}
//當(dāng)葉節(jié)點(diǎn)數(shù)據(jù)相等時(shí)比較分支索引是為了實(shí)現(xiàn)排序算法的穩(wěn)定性
if(value1==value2){
return index1<index2;
}
if(value1<value2){
return true;
}
else{
return false;
}
}
/**調(diào)整第index個(gè)葉子節(jié)點(diǎn)的值,在非葉子節(jié)點(diǎn)nodes(敗者樹(shù))中的位置,最后nodes[0]為最小的節(jié)點(diǎn)
* 具體調(diào)整過(guò)程為: 葉子節(jié)點(diǎn)和父節(jié)點(diǎn)比較, 敗者(較大值)留在父節(jié)點(diǎn)位置, 勝者(較小值)繼續(xù)和祖父節(jié)點(diǎn)比較,直至最終
* @param index
*/
public void adjust(int index){
int parent=(index+size)/2;
//直到parent變成0
while (parent>0) {
//如果父節(jié)點(diǎn)小于當(dāng)前值,敗者為當(dāng)前值,敗者留在父親節(jié)點(diǎn),index變成父親節(jié)點(diǎn)的值,相當(dāng)于父親節(jié)點(diǎn)與祖父節(jié)點(diǎn)繼續(xù)比較
//如果父節(jié)點(diǎn)大于當(dāng)前值,敗者為父節(jié)點(diǎn),父節(jié)點(diǎn)不變,繼續(xù)與祖父節(jié)點(diǎn)比較
if(compareLeavesByIndex(nodes[parent], index)){
int temp=nodes[parent];
nodes[parent]=index;
index=temp;
}
//祖父節(jié)點(diǎn)的位置
parent=parent/2;
//一套流程下來(lái),index成為勝者,小的
//parent放置著敗者,大的
//parent最后成為下一個(gè)比較的節(jié)點(diǎn)(祖父節(jié)點(diǎn))
}
//跳出循環(huán)后,index成為最小的節(jié)點(diǎn)
nodes[0]=index;
}
/**返回?cái)≌邩?shù)中data,經(jīng)過(guò)敗者樹(shù),多路歸并排序后的list
* @return
*/
public List<Integer> mergeSort(){
List<Integer> list=new ArrayList<>();
Integer smallest=null;
while (true) {
//得到最小值
smallest=leaves[nodes[0]];
if(smallest==null){
break;
}
list.add(smallest);
//由于leaves數(shù)組中的最小值,索引為nodes[0]的元素被拿走了,所以要重新讀入一個(gè)
setLeavesValueFromData(nodes[0]);
// 根據(jù)新插入的葉子節(jié)點(diǎn)重新調(diào)整樹(shù)
adjust(nodes[0]);
}
return list;
}
}
測(cè)試
package algorithm.sort.losertreeSort;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class Main {
public static void main(String[] args) {
//int[][] testTable = {{1,2,3,0},{1,2,3,4},{1,2567678,3,4,6767,45,12,345,3435,34,66666},
//{1,1,1,3,3,4,3,2,4,2},{1,6,2,7,1}};
List<Iterator<Integer>> list=new ArrayList<>();
list.add(createRandomIntArray(-100,100,50));
list.add(createRandomIntArray(-100,100,40));
list.add(createRandomIntArray(-100,100,50));
list.add(createRandomIntArray(-100,100,50));
test(list);
}
private static void test(List<Iterator<Integer>> ito) {
//開(kāi)始時(shí)打印數(shù)組
long begin = System.currentTimeMillis();
LoserTree tree=new LoserTree(ito);
List<Integer> result=tree.mergeSort();
long end = System.currentTimeMillis();
//System.out.println(ito + ": rtn=" +rtn);
for (int i = 0; i < result.size(); i++) {
System.out.print(result.get(i)+" ");
}//打印結(jié)果幾數(shù)組
System.out.println();
System.out.println("耗時(shí):" + (end - begin) + "ms");
System.out.println("-------------------");
System.out.println("-------------------");
}
public static Iterator<Integer> createRandomIntArray(int min,int max,int length){
List<Integer> result=new ArrayList<>(length);
for(int i=0;i<length;i++){
double rand=Math.random();
result.add((int)(min+(max-min)*rand));
}
Collections.sort(result);
return result.iterator();
}
}
敗者樹(shù)的效率
敗者樹(shù)可以在log(n)的時(shí)間內(nèi)找到最值。任何一個(gè)葉子結(jié)點(diǎn)的值改變后,利用中間結(jié)點(diǎn)的信息,還是能夠在log(n)的時(shí)間內(nèi)找到最值。(n為多路歸并的路數(shù))
設(shè),總共有k組,總共n個(gè)數(shù)據(jù),每組n/k個(gè)
一開(kāi)始對(duì)每組進(jìn)行排序的時(shí)間是 klog(n/k)n/k=n*log(n/k)
進(jìn)行敗者樹(shù)歸并排序的時(shí)間為n*log(k)
總共時(shí)間為n(log(n/k)+log(k))=nlog(n) 也就是最好的速度,速度為O(nlog(n))*
所需的空間為O(k)