目前來說,kafka的日志中記錄的內(nèi)容比較多,具體的存儲內(nèi)容見這篇博客,寫的比較好??梢钥吹剑鎯Φ膬?nèi)容還是比較多的,當(dāng)存儲文件比較大的時候,我們應(yīng)該如何處理這些日志?下面我們通過kafka啟動過程的源碼,分析下kafka的日志處理過程。
一、入口方法
在kafkaServer.scala中的start方法中,有一個這樣的調(diào)用:
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
二、定時任務(wù)總方法
這塊就是啟動了日志相關(guān)的定時任務(wù),具體都有哪些內(nèi)容?我們跟進去看一下:
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
可以看到,這塊主要使用了一個定時任務(wù)線程池,來處理任務(wù)的定時執(zhí)行。具體包括兩塊,一部分是清理日志,另一部分是將日志寫入文件。
2.1 清理日志
首先是cleanupLogs,這塊涉及到配置,log.retention.check.interval.ms,也就是多長時間執(zhí)行一次日志清理。我們看下具體的方法:
/**
* Delete any eligible logs. Return the number of segments deleted.
*/
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
這塊還涉及到另一個配置:cleanup.policy,也就是清理的策略,目前有幾種,一種是compact,也就是日志壓縮,不會清理掉日志文件;還有一種就是delete,也就是刪除。這塊主要有兩個方法,我們分別看下:
2.1.1 清理過期日志
/**
* Runs through the log removing segments older than a certain age
*/
private def cleanupExpiredSegments(log: Log): Int = {
if (log.config.retentionMs < 0)
return 0
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
這塊又涉及到一個配置:retention.ms,這個參數(shù)表示日志保存的時間。如果小于0,表示永不失效,也就沒有了刪除這一說。
當(dāng)然,如果文件的修改時間跟當(dāng)前時間差,大于設(shè)置的日志保存時間,就要執(zhí)行刪除動作了。具體的刪除方法為:
/**
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return The number of segments deleted
*/
def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
//find any segments that match the user-supplied predicate UNLESS it is the final segment
//and it is empty (since we would just end up re-creating it)
val lastEntry = segments.lastEntry
val deletable =
if (lastEntry == null) Seq.empty
else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if (segments.size == numToDelete)
roll()
// remove the segments for lookups
deletable.foreach(deleteSegment(_))
}
numToDelete
}
}
這塊的邏輯是:根據(jù)傳入的predicate來判斷哪些日志符合被刪除的要求,放入到deletable中,最后遍歷deletable,進行刪除操作。
private def deleteSegment(segment: LogSegment) {
info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
lock synchronized {
segments.remove(segment.baseOffset)
asyncDeleteSegment(segment)
}
}
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
segment.delete()
}
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
}
這塊是一個異步刪除文件的過程,包含一個配置:file.delete.delay.ms。表示每隔多久刪除一次日志文件。刪除的過程是先把日志的后綴改為.delete,然后定時刪除。
2.1.2 清理過大日志
/**
* Runs through the log removing segments until the size of the log
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
log.deleteOldSegments(shouldDelete)
}
這塊代碼比較清晰,如果日志大小大于retention.bytes,那么就會被標(biāo)記為待刪除,然后調(diào)用的方法是一樣的,也是deleteOldSegments。就不贅述了。
2.2 日志刷到硬盤
這塊有兩個定時任務(wù)。
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
涉及到兩個配置:
- log.flush.scheduler.interval.ms:檢查是否需要固化到硬盤的時間間隔
- log.flush.offset.checkpoint.interval.ms:控制上次固化硬盤的時間點,以便于數(shù)據(jù)恢復(fù)一般不需要去修改
我們分別看下兩個任務(wù)做了啥。
2.2.1 flushDirtyLogs
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicAndPartition.topic, e)
}
}
}
這個方法的目的是把日志刷新到硬盤中,保證數(shù)據(jù)不丟。
這塊設(shè)計到一個配置:flush.ms。當(dāng)日志的刷新時間與當(dāng)前時間差,大于配置的值時,就會執(zhí)行flush操作。
/**
* Flush all log segments
*/
def flush(): Unit = flush(this.logEndOffset)
/**
* Flush log segments for all offsets up to offset-1
* @param offset The offset to flush up to (non-inclusive); the new recovery point
*/
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
return
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
/**
* Flush this log segment to disk
*/
@threadsafe
def flush() {
LogFlushStats.logFlushTimer.time {
log.flush()
index.flush()
}
}
找到當(dāng)前segment的最后一個offset,即logEndOffset,然后調(diào)用flush方法,刷新到日志文件中。首先判斷,當(dāng)前offset是否小于recoveryPoint,也就是第一個需要刷新到硬盤的offset,如果小于的話,直接返回,否則繼續(xù)flush操作。
將日志中從recoveryPoint到offset的所有日志,刷新到日志文件中,調(diào)用segment.flush()方法上。刷新log文件和index文件。
2.2.2 checkpointRecoveryPointOffsets
/**
* Write out the current recovery point for all logs to a text file in the log directory
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
/**
* Make a checkpoint for all logs in provided directory.
*/
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
這塊主要是用于寫一些恢復(fù)點的數(shù)據(jù)到文件中去,文件名是recovery-point-offset-checkpoint,里面的內(nèi)容是:
- 第一行是當(dāng)前的版本version
- 第二行是所有偏移量的數(shù)字和,每個topic和partition的組合的數(shù)量
- 之后會遍歷所有的topic和partition組合,每行展示的內(nèi)容是:topic partition offset
但是這塊的寫文件不是直接向目標(biāo)文件寫入,而是先寫一個臨時文件,然后再將臨時文件移動到目標(biāo)文件中。
三、總結(jié)
以上就是kafka中日志處理的一些源碼,我們總結(jié)一下,其中涉及到的配置項有:
- log.retention.check.interval.ms
- cleanup.policy
- retention.ms
- file.delete.delay.ms
- retention.bytes
- log.flush.scheduler.interval.ms
- log.flush.offset.checkpoint.interval.ms
- flush.ms
可能還有其他的一些配置,這塊沒有涉及到。當(dāng)然,這些參數(shù)如何配置,才能使性能達到最優(yōu),也需要不斷地進行測試和探索,目前只能依靠默認的參數(shù)來進行配置,這顯然是不夠的。