Tape是一個(gè)隊(duì)列集合類庫(kù),主要包含內(nèi)存對(duì)象隊(duì)列,文件對(duì)象隊(duì)列和任務(wù)隊(duì)列,特別是文件對(duì)象隊(duì)列的設(shè)計(jì),使用了RandomAccessFile,可以在新增,移除對(duì)象時(shí)就把數(shù)據(jù)保存在文件里,使用起來非常方便。
使用方法
Tape的使用方法非常簡(jiǎn)單,和List差不多,包含方法size(),add(T entry),peek(),remove(),看字面意思就明白,我就不具體展開了,至于TaskQueue的使用,可以看下官方的介紹.
源碼分析
QueueFile(隊(duì)列文件)
內(nèi)存對(duì)象隊(duì)列和任務(wù)隊(duì)列都比較簡(jiǎn)單,主要是看到了QueueFile(隊(duì)列文件)的設(shè)計(jì)受到了一些啟發(fā),在這里和可以和大家聊聊。
數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)

看到上圖,我們可以清晰的看到,該數(shù)據(jù)結(jié)構(gòu)包含頭部信息(16 bytes)和元素列表(Length - 16 bytes)。頭部信息包含文件長(zhǎng)度(4 bytes),元素個(gè)數(shù)(4 bytes),第一個(gè)元素位置(4 bytes),最后一個(gè)元素位置(4 bytes)。元素列表里面的單個(gè)元素包含元素長(zhǎng)度(4 bytes)和元素內(nèi)容。
源碼設(shè)計(jì)
里面主要用到的是RandomAccessFile(隨機(jī)存儲(chǔ)文件),用seek( )方法來訪問記錄,只需要記住位置和大小即可。
RandomAccessFile file = new RandomAccessFile("file", "rw");
// 以下向file文件中寫數(shù)據(jù)
file.writeInt(20);
file.seek(0);// 把文件指針位置設(shè)置到文件起始處
// 以下從file文件中讀數(shù)據(jù),要注意文件指針的位置
file.readInt();
在QueueFile里面對(duì)RandomAccessFile使用主要就是用到上面這幾個(gè)方法,我們接下來看看QueueFile源碼。
QueueFile初始化
public QueueFile(File file) throws IOException {
if (!file.exists()) initialize(file);//如果文件不存在,則初始化
raf = open(file);//創(chuàng)建RandomAccessFile
readHeader();//讀取頭部信息
}
private static void initialize(File file) throws IOException {
// Use a temp file so we don't leave a partially-initialized file.
File tempFile = new File(file.getPath() + ".tmp");//創(chuàng)建文件*.tmp
RandomAccessFile raf = open(tempFile);//創(chuàng)建RandomAccessFile
try {
raf.setLength(INITIAL_LENGTH);//初始化RandomAccessFile文件長(zhǎng)度是4096
raf.seek(0);// 把文件指針位置設(shè)置到文件起始處
byte[] headerBuffer = new byte[16];//初始化頭部信息bytes
writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);//將文件長(zhǎng)度為4096寫入headerBuffer
raf.write(headerBuffer);//將headerBuffer寫入RandomAccessFile文件
} finally {
raf.close();
}
// A rename is atomic.
if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");//將tempFile重命名成file
}
private void readHeader() throws IOException {
raf.seek(0);
raf.readFully(buffer);
fileLength = readInt(buffer, 0);
if (fileLength > raf.length()) {
throw new IOException("File is truncated. Expected length: " + fileLength + ", Actual length: " + raf.length());
} else if (fileLength == 0) {
throw new IOException("File is corrupt; length stored in header is 0.");
}
elementCount = readInt(buffer, 4);//讀取元素個(gè)數(shù)
int firstOffset = readInt(buffer, 8);//讀取第一個(gè)元素的位置
int lastOffset = readInt(buffer, 12);//讀取最后一個(gè)元素的位置
first = readElement(firstOffset);//讀取第一個(gè)元素
last = readElement(lastOffset);//讀取最后一個(gè)元素
}
至此,QueueFile初始化完成。
接下來看下add方法。
public void add(byte[] data) throws IOException {
add(data, 0, data.length);
}
public synchronized void add(byte[] data, int offset, int count) throws IOException {
nonNull(data, "buffer");
if ((offset | count) < 0 || count > data.length - offset) {
throw new IndexOutOfBoundsException();
}
expandIfNecessary(count);//判斷新增內(nèi)容之后文件長(zhǎng)度是否夠,如果不夠則進(jìn)行擴(kuò)大
// Insert a new element after the current last element.
boolean wasEmpty = isEmpty();
int position = wasEmpty ? HEADER_LENGTH : wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
Element newLast = new Element(position, count);
// Write length.
writeInt(buffer, 0, count);//向buffer寫入新元素長(zhǎng)度
ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);//向RandomAccessFile寫入新元素長(zhǎng)度
// Write data.
ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);//向RandomAccessFile寫入新元素內(nèi)容
// Commit the addition. If wasEmpty, first == last.
int firstPosition = wasEmpty ? newLast.position : first.position;
writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);//寫入頭部信息
last = newLast;
elementCount++;
if (wasEmpty) first = last; // first element
}
peek方法,返回第一個(gè)元素內(nèi)容
public synchronized byte[] peek() throws IOException {
if (isEmpty()) return null;
int length = first.length;
byte[] data = new byte[length];
ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
return data;
}
remove方法
public synchronized void remove() throws IOException {
if (isEmpty()) throw new NoSuchElementException();
if (elementCount == 1) {//如果元素個(gè)數(shù)等于1,則清空全部?jī)?nèi)容
clear();
} else {
// assert elementCount > 1
int firstTotalLength = Element.HEADER_LENGTH + first.length;
ringErase(first.position, firstTotalLength);//清除第一個(gè)元素?cái)?shù)據(jù)
int newFirstPosition = wrapPosition(first.position + firstTotalLength);
ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
int length = readInt(buffer, 0);//新元素內(nèi)容長(zhǎng)度
writeHeader(fileLength, elementCount - 1, newFirstPosition, last.position);//寫入更新之后的頭部信息
elementCount--;
first = new Element(newFirstPosition, length);
}
}
private void ringErase(int position, int length) throws IOException {
while (length > 0) {
int chunk = min(length, ZEROES.length);
ringWrite(position, ZEROES, 0, chunk);
length -= chunk;
position += chunk;
}
}
需要注意的是,在清除第一個(gè)元素?cái)?shù)據(jù)時(shí),只是將第一個(gè)元素的數(shù)據(jù)設(shè)置成0。
總結(jié)
測(cè)試發(fā)現(xiàn),RandomAccessFile在處理大數(shù)據(jù)的時(shí)候讀寫性能很差,主要原因是RandomAccessFile每讀/寫一個(gè)字節(jié)就需對(duì)磁盤進(jìn)行一次I/O操作,所以QueueFile不適合大數(shù)據(jù)的讀寫。不過可以對(duì)RandomAccessFile進(jìn)行優(yōu)化,增加緩沖讀寫機(jī)制,看該文。
參考資料
Tape
花1K內(nèi)存實(shí)現(xiàn)高效I/O的RandomAccessFile類
可以隨意轉(zhuǎn)發(fā),也歡迎關(guān)注我的簡(jiǎn)書,我會(huì)堅(jiān)持給大家?guī)矸窒怼?/p>