目錄
前言
在上一篇文章中,我們認識了Spark管理磁盤塊的組件DiskBlockManager,本文接著來看真正負責磁盤存儲的組件DiskStore,以及與它相關的BlockData。這部分內(nèi)容會涉及到一點與Java NIO相關的東西,看官需要稍微注意一下。
磁盤存儲DiskStore
構造方法與屬性成員
代碼#28.1 - o.a.s.storage.DiskStore類的構造方法與屬性成員
private[spark] class DiskStore(
conf: SparkConf,
diskManager: DiskBlockManager,
securityManager: SecurityManager) extends Logging {
private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
Int.MaxValue.toString)
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
// ......
}
DiskStore接受3個構造方法參數(shù),分別是SparkConf、DiskBlockManager和SecurityManager的實例,其中SecurityManager用于提供對數(shù)據(jù)加密的支持。3個屬性字段的含義如下:
- minMemoryMapBytes:使用內(nèi)存映射(memory map)讀取文件的最小閾值,由配置項spark.storage.memoryMapThreshold指定,默認值2M。當磁盤中的文件大小超過該值時,就不會直接讀取,而用內(nèi)存映射文件來讀取,提高效率。
- maxMemoryMapBytes:使用內(nèi)存映射讀取文件的最大閾值,由配置項spark.storage.memoryMapLimitForTests指定。它是個測試參數(shù),默認值為不限制。
- blockSizes:維護塊ID與其對應大小之間的映射關系的ConcurrentHashMap。
寫入塊
寫入塊的邏輯由put()方法來實現(xiàn)。
代碼#28.2 - o.a.s.storage.DiskStore.put()/contains()方法
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
writeFunc(out)
blockSizes.put(blockId, out.getCount)
threwException = false
} finally {
try {
out.close()
} catch {
case ioe: IOException =>
if (!threwException) {
threwException = true
throw ioe
}
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
}
put()方法首先調(diào)用contains()方法檢查塊是否已經(jīng)以文件的形式寫入了,只有沒有寫入才會繼續(xù)操作。然后,調(diào)用DiskBlockManager.getFile()方法打開塊ID對應的文件,然后獲取該文件的WritableByteChannel(NIO中的寫通道,表示可以通過調(diào)用write()方法向文件寫入數(shù)據(jù))。最后,調(diào)用參數(shù)中傳入的writeFunc函數(shù),操作WritableByteChannel將數(shù)據(jù)寫入,并將塊ID與其對應的字節(jié)數(shù)加入blockSizes映射。
接下來看一看代碼#28.2中調(diào)用的openForWrite()方法。
代碼#28.3 - o.a.s.storage.DiskStore.openForWrite()方法
private def openForWrite(file: File): WritableByteChannel = {
val out = new FileOutputStream(file).getChannel()
try {
securityManager.getIOEncryptionKey().map { key =>
CryptoStreamUtils.createWritableChannel(out, conf, key)
}.getOrElse(out)
} catch {
case e: Exception =>
Closeables.close(out, true)
file.delete()
throw e
}
}
可見,該方法就是通過文件對象構造了文件輸出流FileOutputStream,然后獲取它對應的Channel對象用于寫數(shù)據(jù)。特別地,如果I/O需要加密,就需要另外調(diào)用CryptoStreamUtils.createWritableChannel()方法包裝,本文就不涉及了。至于CountingWritableChannel,也只是基于WritableByteChannel接口擴展出來的一個簡單類,增加了統(tǒng)計字節(jié)數(shù)的方法,代碼也就不再列出。
寫入字節(jié)
代碼#28.4 - o.a.s.storage.DiskStore.putBytes()方法
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { channel =>
bytes.writeFully(channel)
}
}
可見,該方法除了塊ID外,還需要傳入封裝在ChunkedByteBuffer中的數(shù)據(jù)。調(diào)用上述put()方法時,傳入的writeFunc函數(shù)調(diào)用了ChunkedByteBuffer.writeFully()方法,負責將數(shù)據(jù)以一定的Chunk大小分塊寫入WritableByteChannel。
讀取字節(jié)
代碼#28.5 - o.a.s.storage.DiskStore.getBytes()方法
def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)
securityManager.getIOEncryptionKey() match {
case Some(key) =>
new EncryptedBlockData(file, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
}
這段代碼很簡單,但可以注意到,在加密環(huán)境下和非加密環(huán)境下返回的結果是不同的,前者是EncryptedBlockData對象,后者是DiskBlockData對象,而它們都是BlockData的子類。顧名思義,BlockData就是對磁盤塊數(shù)據(jù)的具體封裝,下面選擇最常見的DiskBlockData來看一看。
磁盤塊數(shù)據(jù)DiskBlockData
這個類是定義在DiskStore下方的私有類,比較短,因此直接全貼在下面。
代碼#28.6 - o.a.s.storage.DiskBlockData類
private class DiskBlockData(
minMemoryMapBytes: Long,
maxMemoryMapBytes: Long,
file: File,
blockSize: Long) extends BlockData {
override def toInputStream(): InputStream = new FileInputStream(file)
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxMemoryMapBytes)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}
override def toByteBuffer(): ByteBuffer = {
require(blockSize < maxMemoryMapBytes,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}
override def size: Long = blockSize
override def dispose(): Unit = {}
private def open() = new FileInputStream(file).getChannel
}
很久之前也已經(jīng)大概說過,BlockData特征只是定義了塊數(shù)據(jù)的轉(zhuǎn)化方式,具體的細節(jié)則留給各個實現(xiàn)類。我們具體看看toChunkedByteBuffer()和toByteBuffer()這兩個方法。
轉(zhuǎn)化為ChunkedByteBuffer
Utils.tryWithResource()方法實際上就是Java中try-with-resources的Scala實現(xiàn),因為Scala中并沒有這個語法糖。
toChunkedByteBuffer()方法會將文件轉(zhuǎn)化為輸入流FileInputStream,并獲取其ReadableFileChannel,再調(diào)用JavaUtils.readFully()方法將從Channel中取得的數(shù)據(jù)填充到ByteBuffer中。每個ByteBuffer即為一個Chunk,所有Chunk的數(shù)組形成最終的ChunkedByteBuffer。關于ChunkedByteBuffer在文章#21簡要提到過,之后會很快寫一篇番外文章專門講解它,因為有點意思。
轉(zhuǎn)化為ByteBuffer
toByteBuffer()方法會檢查塊大小是否小于spark.storage.memoryMapThreshold(終于出現(xiàn)了)。如果小于的話,就會采用與toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之,就調(diào)用ReadableFileChannel.map()方法將數(shù)據(jù)映射到MappedByteBuffer中,即進程的虛擬內(nèi)存中。不過,考慮到內(nèi)存映射的應用場景的話,2MB的閾值可能有點?。ūJ兀┝耍稽c碎碎念,請勿在意。
總結
本文研究了Spark磁盤存儲類DiskStore的具體實現(xiàn),主要是寫入塊/字節(jié)以及讀取字節(jié)的方法。另外,DiskStore讀取的字節(jié)會用BlockData來封裝,因此也順便了解了一下DiskBlockData的一點細節(jié)。