MessageSet.scala

abstract class MessageSet extends Iterable[MessageAndOffset] {
  // 消息寫入channel
  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
  // 迭代順序讀取MessageSet中的消息
  def iterator: Iterator[MessageAndOffset]
}

// file 指向日志文件
// channel 是FileChannel類型,讀寫文件用
// start,end 日志文件分片的起始位置和結(jié)束位置
// isSlice 表示FileMessageSet 是否為日志文件的分片,還是完整的日志文件
class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {
  // 日志文件或分片大小
  // 有可能多個(gè)Handler線程并發(fā)向一個(gè)分區(qū)寫入消息,所以是原子
  private val _size =
    if(isSlice)
      new AtomicInteger(end - start)
    else
      new AtomicInteger(math.min(channel.size.toInt, end) - start)

  // 初始化過程中移動(dòng)position指針到日志文件的尾部
  // 避免重啟服務(wù)后的寫入覆蓋之前的操作
  if (!isSlice)
    channel.position(math.min(channel.size.toInt, end))

  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) { // mutable是true,表示文件可寫
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {
          val randomAccessFile = new RandomAccessFile(file, "rw")
          // 預(yù)先分配文件大小
          // 可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時(shí)的磁盤尋道開銷
          // https://zhuanlan.zhihu.com/p/34915311 文件碎片
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()
      }
    }
    else
      new FileInputStream(file).getChannel()
  }

  // 追加一批消息到日志文件
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)
    _size.getAndAdd(written)
  }

  // 查詢指定offset后續(xù)的第一個(gè)消息的offset
  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
    var position = startingPosition
    // LogOverHead = offset + size 兩個(gè)字段,不包含消息數(shù)據(jù)
    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
    val size = sizeInBytes() // 當(dāng)前FileMessageSet的大小
    // 從position開始逐條消息遍歷
    while(position + MessageSet.LogOverhead < size) {
      buffer.rewind()
      channel.read(buffer, position)
      buffer.rewind()
      val offset = buffer.getLong()
      if(offset >= targetOffset)
        return OffsetPosition(offset, position)
      val messageSize = buffer.getInt()
      // 指針指向下一條消息
      position += MessageSet.LogOverhead + messageSize
    }
    null
  }
}

class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging {

  private def create(offsetAssigner: OffsetAssigner, compressionCodec: CompressionCodec,
                     wrapperMessageTimestamp: Option[Long], timestampType: TimestampType,
                     messages: Message*): ByteBuffer = {
    if (messages.isEmpty)
      MessageSet.Empty.buffer
    else if (compressionCodec == NoCompressionCodec) {
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      // offsetAssigner.nextAbsoluteOffset() 為消息分配offset,并寫入buffer
      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
      buffer.rewind()
      buffer
    } else {
      var offset = -1L
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
        // 創(chuàng)建指定壓縮類型的輸出流
        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
        try {
          // 遍歷寫入內(nèi)層壓縮消息
          for (message <- messages) {
            offset = offsetAssigner.nextAbsoluteOffset()
            // Magic為1,寫入的是相對(duì)offset
            if (magicAndTimestamp.magic > Message.MagicValue_V0)
              output.writeLong(offsetAssigner.toInnerOffset(offset))
            else
              // 否則寫的是絕對(duì)offset
              output.writeLong(offset)
            output.writeInt(message.size)
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      // 按照消息格式寫入整個(gè)外層消息,外層消息的offset是最后一條內(nèi)層消息的offset
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }
  }

  def writeFullyTo(channel: GatheringByteChannel): Int = {
    buffer.mark()
    var written = 0
    while (written < sizeInBytes)
      written += channel.write(buffer)
    buffer.reset()
    written
  }
}

public static class RecordsIterator extends AbstractIterator<LogEntry> {
    // 消息內(nèi)容
    private final ByteBuffer buffer;
    // 讀取消息流
    private final DataInputStream stream;
    // 壓縮類型
    private final CompressionType type;
    // 是否壓縮消息的深層迭代
    private final boolean shallow;
    // 迭代壓縮消息的迭代器
    private RecordsIterator innerIter;

    // LogEntry: offset + record
    private final ArrayDeque<LogEntry> logEntries;
    // 迭代壓縮消息用,記錄壓縮消息里第一個(gè)消息的offset
    private final long absoluteBaseOffset;

    // public構(gòu)造方法,創(chuàng)建Outer Iterator
    public RecordsIterator(ByteBuffer buffer, boolean shallow) {
        this.type = CompressionType.NONE;
        this.buffer = buffer;
        this.shallow = shallow;
        this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
        this.logEntries = null;
        this.absoluteBaseOffset = -1;
    }

    private RecordsIterator(LogEntry entry) {
        this.type = entry.record().compressionType();
        this.buffer = entry.record().value();
        this.shallow = true;
        this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
        long wrapperRecordOffset = entry.offset(); // 外層消息的offset
        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
            this.logEntries = new ArrayDeque<>();
            long wrapperRecordTimestamp = entry.record().timestamp();
            // 在這個(gè)循環(huán)里,將內(nèi)層消息解壓出來添加到logEntries集合里
            while (true) {
                try {
                    LogEntry logEntry = getNextEntryFromStream();
                    Record recordWithTimestamp = new Record(logEntry.record().buffer(),
                                                            wrapperRecordTimestamp,
                                                            entry.record().timestampType());
                    logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
                } catch (EOFException e) {
                    break;
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            }
            this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
        } else {
            this.logEntries = null;
            this.absoluteBaseOffset = -1;
        }

    }

    @Override
    protected LogEntry makeNext() {
        if (innerDone()) {
            try {
                LogEntry entry = getNextEntry();
                if (entry == null)
                    return allDone();

                if (absoluteBaseOffset >= 0) {
                    long absoluteOffset = absoluteBaseOffset + entry.offset();
                    entry = new LogEntry(absoluteOffset, entry.record());
                }

                CompressionType compression = entry.record().compressionType();
                if (compression == CompressionType.NONE || shallow) {
                    return entry;
                } else {
                    innerIter = new RecordsIterator(entry);
                    return innerIter.next();
                }
            } catch (EOFException e) {
                return allDone();
            } catch (IOException e) {
                throw new KafkaException(e);
            }
        } else {
            return innerIter.next();
        }
    }
}

// MemoryRecords的迭代器RecordsIterator抽象類
public abstract class AbstractIterator<T> implements Iterator<T> {

    private static enum State {
        READY,
        NOT_READY, // 迭代器未準(zhǔn)備好迭代下一項(xiàng),需要調(diào)用maybeComputeNext
        DONE, FAILED
    };

    private State state = State.NOT_READY;
    private T next;

    @Override
    public boolean hasNext() {
        switch (state) {
            case FAILED:
                throw new IllegalStateException("Iterator is in failed state");
            case DONE:
                return false;
            case READY:
                return true;
            default:
                return maybeComputeNext();
        }
    }

    @Override
    public T next() {
        if (!hasNext())
            throw new NoSuchElementException();
        state = State.NOT_READY;
        if (next == null)
            throw new IllegalStateException("Expected item but none found.");
        return next;
    }

    protected abstract T makeNext();

    private Boolean maybeComputeNext() {
        state = State.FAILED;
        next = makeNext();
        if (state == State.DONE) {
            return false;
        } else {
            state = State.READY;
            return true;
        }
    }

}

// _file 指向磁盤上的索引文件
// baseOffset 對(duì)應(yīng)日志文件中第一個(gè)消息的offset
//
class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {

  // 對(duì)mmap操作時(shí)需要加鎖保護(hù)
  private val lock = new ReentrantLock

  // 用來操作索引文件的MappedByteBuffer
  @volatile
  private[this] var mmap: MappedByteBuffer = {
    // 索引文件不存在,創(chuàng)建新文件返回true,否則返回false
    val newlyCreated = _file.createNewFile()
    val raf = new RandomAccessFile(_file, "rw")
    try {
      if (newlyCreated) {
        // 新創(chuàng)建的文件進(jìn)行擴(kuò)容,擴(kuò)容結(jié)果是小于maxIndexSize的最大8的倍數(shù)
        // 比如 67,8 = 64
        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
      }

      val len = raf.length()
      // 內(nèi)存映射
      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

      if (newlyCreated)
        idx.position(0)
      else
        // 已經(jīng)存在的索引文件,position移動(dòng)到末尾,防止覆蓋原數(shù)據(jù)
        idx.position(roundToExactMultiple(idx.limit, 8))
      idx
    } finally {
      CoreUtils.swallow(raf.close())
    }
  }

  @volatile
  private[this] var _entries = mmap.position / 8 // 當(dāng)前索引文件的索引項(xiàng)個(gè)數(shù)
  @volatile
  private[this] var _maxEntries = mmap.limit / 8 // 當(dāng)前索引文件做多保存的索引項(xiàng)個(gè)數(shù)
  @volatile
  private[this] var _lastOffset = readLastEntry.offset // 保存最后一個(gè)索引項(xiàng)的offset

  // 查找目標(biāo)小于targetOffset的最大offset對(duì)應(yīng)的position
  def lookup(targetOffset: Long): OffsetPosition = {
      val idx = mmap.duplicate // 創(chuàng)建一個(gè)索引文件副本
      val slot = indexSlotFor(idx, targetOffset) // 二分查找的具體實(shí)現(xiàn)
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        // 將offset和物理地址position封裝成OffsetPosition對(duì)象返回
        // 物理地址position就是索引對(duì)應(yīng)的消息在日志文件中的絕對(duì)位置
        // 只要打開文件并移動(dòng)文件指針到這個(gè)position就可以讀取對(duì)應(yīng)的消息
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
  }
}

@nonthreadsafe
class LogSegment(val log: FileMessageSet, // 對(duì)應(yīng)日志文件的FileMessageSet對(duì)象
                 val index: OffsetIndex, // 對(duì)應(yīng)索引文件的OffsetIndex對(duì)象
                 val baseOffset: Long, // LogSegment中第一條消息的offset
                 val indexIntervalBytes: Int, // 索引項(xiàng)之間間隔的最小字節(jié)數(shù)
                 val rollJitterMs: Long,
                 time: Time) extends Logging {

  // LogSegment對(duì)象創(chuàng)建時(shí)間,truncateTo清空日志文件時(shí)重置該字段
  var created = time.milliseconds
  // 上次添加索引后,在日志文件里累計(jì)加入的Message集合的字節(jié)數(shù),用于判斷下次添加索引項(xiàng)的時(shí)機(jī)
  private var bytesSinceLastIndexEntry = 0

  @nonthreadsafe
  def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      // 檢測(cè)是否滿足添加索引項(xiàng)的字節(jié)數(shù)條件
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        // 添加索引,重置計(jì)數(shù)器
        // 追加寫入物理地址position就很簡(jiǎn)單了,直接就是文件當(dāng)前大小
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

  // 撈取指定offset范圍的消息
  @threadsafe
  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo = {
    val logSize = log.sizeInBytes
    val startPosition = translateOffset(startOffset)

    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

    // maxOffset的取值情況,計(jì)算讀取的字節(jié)數(shù)
    val length = maxOffset match {
      case None =>
        min((maxPosition - startPosition.position).toInt, maxSize)
      case Some(offset) =>
        val mapping = translateOffset(offset, startPosition.position)
        val endPosition =
          if(mapping == null)
            logSize
          else
            mapping.position
        min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
    }

    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
  }
}

// 對(duì)多個(gè)LogSegment對(duì)象的順序組合,形成一個(gè)邏輯的日志
class Log(val dir: File, // 存放每個(gè)LogSegment對(duì)應(yīng)的日志文件和索引文件的目錄
          @volatile var config: LogConfig, // Log相關(guān)的配置信息
          // 指定恢復(fù)操作的起始o(jì)ffset,recoveryPoint之前的Message已經(jīng)持久化到磁盤上,
          // 其后的消息不一定,有丟失的風(fēng)險(xiǎn)
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler, // 異步定時(shí)任務(wù)線程池,比如異步flush創(chuàng)建日志文件
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  // 可能存在多個(gè)handler向同一個(gè)Log追加消息,需要保證同步
  private val lock = new Object
  // 使用SkipList管理LogSegment
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

  // 產(chǎn)生分配給消息的offset
  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)

  // 只有最后一個(gè)LogSegment才能寫入
  def activeSegment = segments.lastEntry.getValue

  // 追加producer發(fā)送的消息
  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
    val appendInfo = analyzeAndValidateMessageSet(messages)
    var validMessages = trimInvalidBytes(messages, appendInfo)

      lock synchronized {
        if (assignOffsets) {
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = offset.value
          val now = time.milliseconds
          val (validatedMessages, messageSizesMaybeChanged) =
            validMessages.validateMessagesAndAssignOffsets(offset, now, appendInfo.sourceCodec, appendInfo.targetCodec,
                                                           config.compact, config.messageFormatVersion.messageFormatVersion,
                                                           config.messageTimestampType, config.messageTimestampDifferenceMaxMs)
          validMessages = validatedMessages
          // 記錄最后一條消息的offset
          appendInfo.lastOffset = offset.value - 1
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.timestamp = now
        }
        // 獲取activeSegment, 如果segment文件容量不夠,需要roll一個(gè)新文件
        val segment = maybeRoll(validMessages.sizeInBytes)
        segment.append(appendInfo.firstOffset, validMessages)
        updateLogEndOffset(appendInfo.lastOffset + 1)

        if (unflushedMessages >= config.flushInterval)
          flush()

        appendInfo
      }
  }

  def flush(offset: Long) : Unit = {
    if (offset <= this.recoveryPoint)
      // 說明已經(jīng)持久化了
      return
    for(segment <- logSegments(this.recoveryPoint, offset))
      segment.flush()
    lock synchronized {
      if(offset > this.recoveryPoint) {
        // 持久化后要更新recoveryPoint
        this.recoveryPoint = offset
        lastflushedTime.set(time.milliseconds)
      }
    }
  }

  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
    val currentNextOffsetMetadata = nextOffsetMetadata
    val next = currentNextOffsetMetadata.messageOffset
    if(startOffset == next)
      // 文件末尾的offset,返回空
      return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)

    var entry = segments.floorEntry(startOffset)
    while(entry != null) {
      val maxPosition = {
        if (entry == segments.lastEntry) {
          // 最后一個(gè)logEntry會(huì)有并發(fā)沖突可能
          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
          if (entry != segments.lastEntry)
            entry.getValue.size
          else
            exposedPos
        } else {
          entry.getValue.size
        }
      }
      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
      if(fetchInfo == null) {
        entry = segments.higherEntry(entry.getKey)
      } else {
        return fetchInfo
      }
    }

    FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
  }

  private def asyncDeleteSegment(segment: LogSegment) {
    // 將日志文件和索引文件后綴改成.deleted
    segment.changeFileSuffixes("", Log.DeletedFileSuffix)
    def deleteSeg() {
      segment.delete()
    }
    // 異步刪除文件
    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
  }
}

@threadsafe
class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-",
                     daemon: Boolean = true) extends Scheduler with Logging {
  private var executor: ScheduledThreadPoolExecutor = null
  private val schedulerThreadId = new AtomicInteger(0)

  override def startup() {
    this synchronized {
      if(isStarted)
        throw new IllegalStateException("This scheduler has already been started!")
      executor = new ScheduledThreadPoolExecutor(threads)
      executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
      executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
      executor.setThreadFactory(new ThreadFactory() {
              def newThread(runnable: Runnable): Thread =
                Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
            })
    }
  }

  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
    this synchronized {
      ensureRunning
      val runnable = CoreUtils.runnable {
          fun()
      }
      if(period >= 0)
        executor.scheduleAtFixedRate(runnable, delay, period, unit)
      else
        executor.schedule(runnable, delay, unit)
    }
  }
}

@threadsafe
// 在server.properties指定多個(gè)Log目錄,每個(gè)Log目錄創(chuàng)建多個(gè)Log,LogManager在創(chuàng)建Log時(shí)會(huì)選擇最少Log的目錄
class LogManager(val logDirs: Array[File],
                 val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig,
                 // 指定數(shù)量的加載線程
                 ioThreads: Int,
                 val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler,
                 val brokerState: BrokerState, private val time: Time) extends Logging {

  // 創(chuàng)建或刪除Log時(shí)需要加鎖
  private val logCreationOrDeletionLock = new Object
  // 底層是ConcurrentHashMap
  private val logs = new Pool[TopicAndPartition, Log]()

  // 每個(gè)Log目錄的鎖
  private val dirLocks = lockLogDirs(logDirs)
  // 每個(gè)Log目錄和下面RecoveryPointCheckPoint文件的映射關(guān)系
  // RecoveryPointCheckPoint文件記錄了該log目錄下所有Log的recoveryPoint
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  loadLogs()

  def startup() {
    if(scheduler != null) {
      // 清理日志定時(shí)任務(wù)
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      // 刷新硬盤定時(shí)任務(wù)
      scheduler.schedule("kafka-log-flusher",
                         flushDirtyLogs,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)
      // 備份定時(shí)任務(wù),寫入RecoveryPointCheckpoint文件
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }

  def cleanupLogs() {
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      // 保留時(shí)長(zhǎng)或日志文件大小決定是否刪除
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
  }
}

class LogCleaner(val config: CleanerConfig,
                 val logDirs: Array[File],
                 val logs: Pool[TopicAndPartition, Log],
                 time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
  // 線程
  private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
}

private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
  // 目錄 -> offsetCheck文件
  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
  // 正在進(jìn)行的TopicPartition的壓縮狀態(tài)
  private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
  // 保護(hù)checkpoints和inProgress
  private val lock = new ReentrantLock
  // 等待線程狀態(tài)從LogCleaningAborted到LogCleaningPaused
  private val pausedCleaningCond = lock.newCondition()

  // 更新指定目錄的壓縮checkPoint
  def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
    inLock(lock) {
      val checkpoint = checkpoints(dataDir)
      val existing = checkpoint.read().filterKeys(logs.keys) ++ update
      checkpoint.write(existing)
    }
  }

  def grabFilthiestLog(): Option[LogToClean] = {
    inLock(lock) {
      // 所有目錄的cleanerCheckPoint
      val lastClean = allCleanerCheckpoints()
      val dirtyLogs = logs.filter {
        // 過濾掉cleanup.policy配置項(xiàng)為delete的log
        case (topicAndPartition, log) => log.config.compact
      }.filterNot {
        // 過濾掉inProgress狀態(tài)的log
        case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
      }.map {
        case (topicAndPartition, log) =>
          val logStartOffset = log.logSegments.head.baseOffset
          val firstDirtyOffset = {
            // 壓縮開始的位置,可能是checkPoint開始,也可能是第1條消息
            val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
            if (offset < logStartOffset) {
              logStartOffset
            } else {
              offset
            }
          }
          LogToClean(topicAndPartition, log, firstDirtyOffset)
      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs

      this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
      // and must meet the minimum threshold for dirty byte ratio
      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
      if(cleanableLogs.isEmpty) {
        None
      } else {
        val filthiest = cleanableLogs.max
        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
        Some(filthiest)
      }
    }
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容