abstract class MessageSet extends Iterable[MessageAndOffset] {
// 消息寫入channel
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
// 迭代順序讀取MessageSet中的消息
def iterator: Iterator[MessageAndOffset]
}
// file 指向日志文件
// channel 是FileChannel類型,讀寫文件用
// start,end 日志文件分片的起始位置和結(jié)束位置
// isSlice 表示FileMessageSet 是否為日志文件的分片,還是完整的日志文件
class FileMessageSet private[kafka](@volatile var file: File,
private[log] val channel: FileChannel,
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
// 日志文件或分片大小
// 有可能多個(gè)Handler線程并發(fā)向一個(gè)分區(qū)寫入消息,所以是原子
private val _size =
if(isSlice)
new AtomicInteger(end - start)
else
new AtomicInteger(math.min(channel.size.toInt, end) - start)
// 初始化過程中移動(dòng)position指針到日志文件的尾部
// 避免重啟服務(wù)后的寫入覆蓋之前的操作
if (!isSlice)
channel.position(math.min(channel.size.toInt, end))
def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
if (mutable) { // mutable是true,表示文件可寫
if (fileAlreadyExists)
new RandomAccessFile(file, "rw").getChannel()
else {
if (preallocate) {
val randomAccessFile = new RandomAccessFile(file, "rw")
// 預(yù)先分配文件大小
// 可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時(shí)的磁盤尋道開銷
// https://zhuanlan.zhihu.com/p/34915311 文件碎片
randomAccessFile.setLength(initFileSize)
randomAccessFile.getChannel()
}
else
new RandomAccessFile(file, "rw").getChannel()
}
}
else
new FileInputStream(file).getChannel()
}
// 追加一批消息到日志文件
def append(messages: ByteBufferMessageSet) {
val written = messages.writeFullyTo(channel)
_size.getAndAdd(written)
}
// 查詢指定offset后續(xù)的第一個(gè)消息的offset
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
// LogOverHead = offset + size 兩個(gè)字段,不包含消息數(shù)據(jù)
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes() // 當(dāng)前FileMessageSet的大小
// 從position開始逐條消息遍歷
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
// 指針指向下一條消息
position += MessageSet.LogOverhead + messageSize
}
null
}
}
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {
private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec,
wrapperMessageTimestamp: Option[Long], timestampType: TimestampType,
messages: Message*): ByteBuffer = {
if (messages.isEmpty)
MessageSet.Empty.buffer
else if (compressionCodec == NoCompressionCodec) {
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
// offsetAssigner.nextAbsoluteOffset() 為消息分配offset,并寫入buffer
for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
buffer.rewind()
buffer
} else {
var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
// 創(chuàng)建指定壓縮類型的輸出流
val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
try {
// 遍歷寫入內(nèi)層壓縮消息
for (message <- messages) {
offset = offsetAssigner.nextAbsoluteOffset()
// Magic為1,寫入的是相對(duì)offset
if (magicAndTimestamp.magic > Message.MagicValue_V0)
output.writeLong(offsetAssigner.toInnerOffset(offset))
else
// 否則寫的是絕對(duì)offset
output.writeLong(offset)
output.writeInt(message.size)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
}
} finally {
output.close()
}
}
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
// 按照消息格式寫入整個(gè)外層消息,外層消息的offset是最后一條內(nèi)層消息的offset
writeMessage(buffer, messageWriter, offset)
buffer.rewind()
buffer
}
}
def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
// 消息內(nèi)容
private final ByteBuffer buffer;
// 讀取消息流
private final DataInputStream stream;
// 壓縮類型
private final CompressionType type;
// 是否壓縮消息的深層迭代
private final boolean shallow;
// 迭代壓縮消息的迭代器
private RecordsIterator innerIter;
// LogEntry: offset + record
private final ArrayDeque<LogEntry> logEntries;
// 迭代壓縮消息用,記錄壓縮消息里第一個(gè)消息的offset
private final long absoluteBaseOffset;
// public構(gòu)造方法,創(chuàng)建Outer Iterator
public RecordsIterator(ByteBuffer buffer, boolean shallow) {
this.type = CompressionType.NONE;
this.buffer = buffer;
this.shallow = shallow;
this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
private RecordsIterator(LogEntry entry) {
this.type = entry.record().compressionType();
this.buffer = entry.record().value();
this.shallow = true;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset(); // 外層消息的offset
if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
this.logEntries = new ArrayDeque<>();
long wrapperRecordTimestamp = entry.record().timestamp();
// 在這個(gè)循環(huán)里,將內(nèi)層消息解壓出來添加到logEntries集合里
while (true) {
try {
LogEntry logEntry = getNextEntryFromStream();
Record recordWithTimestamp = new Record(logEntry.record().buffer(),
wrapperRecordTimestamp,
entry.record().timestampType());
logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
} catch (EOFException e) {
break;
} catch (IOException e) {
throw new KafkaException(e);
}
}
this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
} else {
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
}
@Override
protected LogEntry makeNext() {
if (innerDone()) {
try {
LogEntry entry = getNextEntry();
if (entry == null)
return allDone();
if (absoluteBaseOffset >= 0) {
long absoluteOffset = absoluteBaseOffset + entry.offset();
entry = new LogEntry(absoluteOffset, entry.record());
}
CompressionType compression = entry.record().compressionType();
if (compression == CompressionType.NONE || shallow) {
return entry;
} else {
innerIter = new RecordsIterator(entry);
return innerIter.next();
}
} catch (EOFException e) {
return allDone();
} catch (IOException e) {
throw new KafkaException(e);
}
} else {
return innerIter.next();
}
}
}
// MemoryRecords的迭代器RecordsIterator抽象類
public abstract class AbstractIterator<T> implements Iterator<T> {
private static enum State {
READY,
NOT_READY, // 迭代器未準(zhǔn)備好迭代下一項(xiàng),需要調(diào)用maybeComputeNext
DONE, FAILED
};
private State state = State.NOT_READY;
private T next;
@Override
public boolean hasNext() {
switch (state) {
case FAILED:
throw new IllegalStateException("Iterator is in failed state");
case DONE:
return false;
case READY:
return true;
default:
return maybeComputeNext();
}
}
@Override
public T next() {
if (!hasNext())
throw new NoSuchElementException();
state = State.NOT_READY;
if (next == null)
throw new IllegalStateException("Expected item but none found.");
return next;
}
protected abstract T makeNext();
private Boolean maybeComputeNext() {
state = State.FAILED;
next = makeNext();
if (state == State.DONE) {
return false;
} else {
state = State.READY;
return true;
}
}
}
// _file 指向磁盤上的索引文件
// baseOffset 對(duì)應(yīng)日志文件中第一個(gè)消息的offset
//
class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
// 對(duì)mmap操作時(shí)需要加鎖保護(hù)
private val lock = new ReentrantLock
// 用來操作索引文件的MappedByteBuffer
@volatile
private[this] var mmap: MappedByteBuffer = {
// 索引文件不存在,創(chuàng)建新文件返回true,否則返回false
val newlyCreated = _file.createNewFile()
val raf = new RandomAccessFile(_file, "rw")
try {
if (newlyCreated) {
// 新創(chuàng)建的文件進(jìn)行擴(kuò)容,擴(kuò)容結(jié)果是小于maxIndexSize的最大8的倍數(shù)
// 比如 67,8 = 64
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
val len = raf.length()
// 內(nèi)存映射
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
if (newlyCreated)
idx.position(0)
else
// 已經(jīng)存在的索引文件,position移動(dòng)到末尾,防止覆蓋原數(shù)據(jù)
idx.position(roundToExactMultiple(idx.limit, 8))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}
@volatile
private[this] var _entries = mmap.position / 8 // 當(dāng)前索引文件的索引項(xiàng)個(gè)數(shù)
@volatile
private[this] var _maxEntries = mmap.limit / 8 // 當(dāng)前索引文件做多保存的索引項(xiàng)個(gè)數(shù)
@volatile
private[this] var _lastOffset = readLastEntry.offset // 保存最后一個(gè)索引項(xiàng)的offset
// 查找目標(biāo)小于targetOffset的最大offset對(duì)應(yīng)的position
def lookup(targetOffset: Long): OffsetPosition = {
val idx = mmap.duplicate // 創(chuàng)建一個(gè)索引文件副本
val slot = indexSlotFor(idx, targetOffset) // 二分查找的具體實(shí)現(xiàn)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
// 將offset和物理地址position封裝成OffsetPosition對(duì)象返回
// 物理地址position就是索引對(duì)應(yīng)的消息在日志文件中的絕對(duì)位置
// 只要打開文件并移動(dòng)文件指針到這個(gè)position就可以讀取對(duì)應(yīng)的消息
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
}
@nonthreadsafe
class LogSegment(val log: FileMessageSet, // 對(duì)應(yīng)日志文件的FileMessageSet對(duì)象
val index: OffsetIndex, // 對(duì)應(yīng)索引文件的OffsetIndex對(duì)象
val baseOffset: Long, // LogSegment中第一條消息的offset
val indexIntervalBytes: Int, // 索引項(xiàng)之間間隔的最小字節(jié)數(shù)
val rollJitterMs: Long,
time: Time) extends Logging {
// LogSegment對(duì)象創(chuàng)建時(shí)間,truncateTo清空日志文件時(shí)重置該字段
var created = time.milliseconds
// 上次添加索引后,在日志文件里累計(jì)加入的Message集合的字節(jié)數(shù),用于判斷下次添加索引項(xiàng)的時(shí)機(jī)
private var bytesSinceLastIndexEntry = 0
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
// 檢測(cè)是否滿足添加索引項(xiàng)的字節(jié)數(shù)條件
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
// 添加索引,重置計(jì)數(shù)器
// 追加寫入物理地址position就很簡(jiǎn)單了,直接就是文件當(dāng)前大小
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
// 撈取指定offset范圍的消息
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
val logSize = log.sizeInBytes
val startPosition = translateOffset(startOffset)
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// maxOffset的取值情況,計(jì)算讀取的字節(jié)數(shù)
val length = maxOffset match {
case None =>
min((maxPosition - startPosition.position).toInt, maxSize)
case Some(offset) =>
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize
else
mapping.position
min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
}
// 對(duì)多個(gè)LogSegment對(duì)象的順序組合,形成一個(gè)邏輯的日志
class Log(val dir: File, // 存放每個(gè)LogSegment對(duì)應(yīng)的日志文件和索引文件的目錄
@volatile var config: LogConfig, // Log相關(guān)的配置信息
// 指定恢復(fù)操作的起始o(jì)ffset,recoveryPoint之前的Message已經(jīng)持久化到磁盤上,
// 其后的消息不一定,有丟失的風(fēng)險(xiǎn)
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler, // 異步定時(shí)任務(wù)線程池,比如異步flush創(chuàng)建日志文件
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
// 可能存在多個(gè)handler向同一個(gè)Log追加消息,需要保證同步
private val lock = new Object
// 使用SkipList管理LogSegment
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
// 產(chǎn)生分配給消息的offset
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
// 只有最后一個(gè)LogSegment才能寫入
def activeSegment = segments.lastEntry.getValue
// 追加producer發(fā)送的消息
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
var validMessages = trimInvalidBytes(messages, appendInfo)
lock synchronized {
if (assignOffsets) {
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val (validatedMessages, messageSizesMaybeChanged) =
validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec,
config.compact, config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType, config.messageTimestampDifferenceMaxMs)
validMessages = validatedMessages
// 記錄最后一條消息的offset
appendInfo.lastOffset = offset.value - 1
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.timestamp = now
}
// 獲取activeSegment, 如果segment文件容量不夠,需要roll一個(gè)新文件
val segment = maybeRoll(validMessages.sizeInBytes)
segment.append(appendInfo.firstOffset, validMessages)
updateLogEndOffset(appendInfo.lastOffset + 1)
if (unflushedMessages >= config.flushInterval)
flush()
appendInfo
}
}
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
// 說明已經(jīng)持久化了
return
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
// 持久化后要更新recoveryPoint
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
if(startOffset == next)
// 文件末尾的offset,返回空
return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
var entry = segments.floorEntry(startOffset)
while(entry != null) {
val maxPosition = {
if (entry == segments.lastEntry) {
// 最后一個(gè)logEntry會(huì)有并發(fā)沖突可能
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
if (entry != segments.lastEntry)
entry.getValue.size
else
exposedPos
} else {
entry.getValue.size
}
}
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
}
private def asyncDeleteSegment(segment: LogSegment) {
// 將日志文件和索引文件后綴改成.deleted
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
segment.delete()
}
// 異步刪除文件
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
}
}
@threadsafe
class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
private var executor: ScheduledThreadPoolExecutor = null
private val schedulerThreadId = new AtomicInteger(0)
override def startup() {
this synchronized {
if(isStarted)
throw new IllegalStateException("This scheduler has already been started!")
executor = new ScheduledThreadPoolExecutor(threads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
})
}
}
def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
this synchronized {
ensureRunning
val runnable = CoreUtils.runnable {
fun()
}
if(period >= 0)
executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
}
}
}
@threadsafe
// 在server.properties指定多個(gè)Log目錄,每個(gè)Log目錄創(chuàng)建多個(gè)Log,LogManager在創(chuàng)建Log時(shí)會(huì)選擇最少Log的目錄
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig,
// 指定數(shù)量的加載線程
ioThreads: Int,
val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler,
val brokerState: BrokerState, private val time: Time) extends Logging {
// 創(chuàng)建或刪除Log時(shí)需要加鎖
private val logCreationOrDeletionLock = new Object
// 底層是ConcurrentHashMap
private val logs = new Pool[TopicAndPartition, Log]()
// 每個(gè)Log目錄的鎖
private val dirLocks = lockLogDirs(logDirs)
// 每個(gè)Log目錄和下面RecoveryPointCheckPoint文件的映射關(guān)系
// RecoveryPointCheckPoint文件記錄了該log目錄下所有Log的recoveryPoint
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
def startup() {
if(scheduler != null) {
// 清理日志定時(shí)任務(wù)
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
// 刷新硬盤定時(shí)任務(wù)
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
// 備份定時(shí)任務(wù),寫入RecoveryPointCheckpoint文件
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
def cleanupLogs() {
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
// 保留時(shí)長(zhǎng)或日志文件大小決定是否刪除
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
}
}
class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
// 線程
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
}
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
// 目錄 -> offsetCheck文件
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
// 正在進(jìn)行的TopicPartition的壓縮狀態(tài)
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
// 保護(hù)checkpoints和inProgress
private val lock = new ReentrantLock
// 等待線程狀態(tài)從LogCleaningAborted到LogCleaningPaused
private val pausedCleaningCond = lock.newCondition()
// 更新指定目錄的壓縮checkPoint
def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
checkpoint.write(existing)
}
}
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
// 所有目錄的cleanerCheckPoint
val lastClean = allCleanerCheckpoints()
val dirtyLogs = logs.filter {
// 過濾掉cleanup.policy配置項(xiàng)為delete的log
case (topicAndPartition, log) => log.config.compact
}.filterNot {
// 過濾掉inProgress狀態(tài)的log
case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
}.map {
case (topicAndPartition, log) =>
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
// 壓縮開始的位置,可能是checkPoint開始,也可能是第1條消息
val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
if (offset < logStartOffset) {
logStartOffset
} else {
offset
}
}
LogToClean(topicAndPartition, log, firstDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
}
MessageSet.scala
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 在創(chuàng)建scala項(xiàng)目的時(shí)候create Scala SDK: 這里選擇bin上一級(jí)目錄,然后點(diǎn)擊OK 這樣就出現(xiàn)了...
- 多維數(shù)組:數(shù)組的元素,還是數(shù)組,數(shù)組套數(shù)組,就是多維數(shù)組 構(gòu)造指定行與列的二維數(shù)組:Array.ofDim方法 構(gòu)...
- http://www.cnblogs.com/cbscan/articles/4147709.html
- Scala 篇 單例對(duì)象 在 Java 中實(shí)現(xiàn)單例對(duì)象通常需要自己實(shí)現(xiàn)一個(gè)類并創(chuàng)建 getInstance() 的...