Kafka源碼解析(二)—Log分析

Kafka源碼解析(二)—Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

一般來說Log 的常見操作分為 4 大部分。

  1. 高水位管理操作
  2. 日誌段管理
  3. 關鍵位移值管理
  4. 讀寫操作

其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark

高水位HighWatermark初始化

高水位是通過LogOffsetMetadata類來定義的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

我們再來看看LogOffsetMetadata類:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  // check if this offset is already on an older segment compared with the given offset
  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }
  ...
}

LogOffsetMetadata有三個初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

高水位HighWatermark設值與更新

  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    //高水位的值不可能小於零
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {// 保護Log對象修改的Monitor鎖
      highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
      //事務相關,暫時忽略
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      //事務相關,暫時忽略
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

updateHighWatermark

  def updateHighWatermark(hw: Long): Long = {
    //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
    val newHighWatermark = if (hw < logStartOffset)
      logStartOffset
    //  傳入的高水位的值如果大於LEO,那麼設置為LEO
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    //返回新的高水位的值
    newHighWatermark
  }

這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

maybeIncrementHighWatermark

/**
 * Update the high watermark to a new value if and only if it is larger than the old value. It is
 * an error to update to a value which is larger than the log end offset.
 *
 * This method is intended to be used by the leader to update the high watermark after follower
 * fetch offsets have been updated.
 *
 * @return the old high watermark, if updated by the new value
 */
//  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
//  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  //如果新的高水位的值大於LEO,會報錯
  if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
      s"log end offset $logEndOffsetMetadata")

  lock.synchronized {
    // 獲取老的高水位值
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
    //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
      updateHighWatermarkMetadata(newHighWatermark)
      Some(oldHighWatermark)// 返回老的高水位值
    } else {
      None
    }
  }
}

這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

fetchHighWatermarkMetadata

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    // 讀取時確保日誌不能被關閉
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata
    if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
      lock.synchronized {
        // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    //通過給的offset,去日誌文件中找到相應的日誌信息
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

日誌段操作

日誌讀取操作

read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
      val includeAbortedTxns = isolation == FetchTxnCommitted
 
      // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      // 到日字段中根據索引尋找最近的日誌段
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      // 這裏給出了幾種異常場景:
      // 1. 給的日誌索引大於最大值;
      // 2. 通過索引找的日誌段為空;
      // 3. 給的日誌索引小於logStartOffset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
      // 查看一下讀取隔離級別設置。
      // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
      // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
      // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => endOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }
      //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }
 
      // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        // 找到日誌段中最大的日誌位移
        val maxPosition = { 
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }
        // 根據位移信息從日誌段中讀取日誌信息
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
        if (fetchInfo == null) {
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

read方法,有四個參數,分別是:

  • startOffset:讀取的日誌索引位置。
  • maxLength:讀取數據量長度。
  • isolation:隔離級別,多用於 Kafka 事務。
  • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

代碼中使用了segments,來根據位移查找日誌段:

  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我們下面看看read方法具體做了哪些事:

  1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
  2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
  3. 校驗異常情況:
    1. startOffset是不是超過了LEO;
    2. 是不是日誌段集合里沒有索引小於startOffset;
    3. startOffset小於Log Start Offset;
  4. 接下來獲取一下隔離級別;
  5. 如果尋找的索引等於LEO,那麼返回空;
  6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
  7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
    1. 首先找到日誌段中最大的位置;
    2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
    3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
  8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

刪除日誌

刪除日誌的入口是:deleteOldSegments

  //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
  // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

我這裏列舉一下這三個方法主要是怎麼實現的:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }
  
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    //判斷日誌段空間是否超過設置空間大小
    //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }
  
  private def deleteLogStartOffsetBreachedSegments(): Int = {
    //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

下面我們來看一下deleteOldSegments的操作:

deleteOldSegments

這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    //刪除任何匹配到predicate規則的日誌段
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

deletableSegments

  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    //如果日誌段是空的,那麼直接返回
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      //如果日誌段集合不為空,找到第一個日誌段
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        //獲取下一個日誌段
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
        //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

這個方法邏輯十分清晰,主要做了如下幾件事:

  1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

  2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

  3. 判斷當前被遍曆日志段是否能夠被刪除

    1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
    2. 日誌段是否能夠通過predicate函數校驗;
    3. 日誌段是否是最後一個日誌段;
  4. 將符合條件的日誌段都加入到deletable集合中,並返回。

接下來調用deleteSegments函數:

  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
        //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 確保Log對象沒有被關閉
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          // 刪除給定的日誌段對象以及底層的物理文件
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 嘗試更新日誌的Log Start Offset值
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

寫日誌

寫日誌的方法主要有兩個:

appendAsLeader

  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

appendAsFollower

  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
  }

appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

append

  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果壓根就不需要寫入任何消息,直接返回即可
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第2步:消息格式規整,即刪除無效格式消息或無效字節
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        // 確保Log對象未關閉
        checkIfMemoryMappedBufferClosed()
        //需要分配位移值
        if (assignOffsets) {
          // assign offsets to the message set
          // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              isFromClient,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          // 更新校驗結果對象類LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 第4步:驗證消息,確保消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
          // 直接使用給定的位移值,無需自己分配位移值
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 第5步:更新Leader Epoch緩存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        // 第6步:確保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
        //下面情況將會執行日誌切分:
        //logSegment 已經滿了
        //日誌段中的第一個消息的maxTime已經過期
        //index索引滿了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 第8步:驗證事務狀態
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, isFromClient)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
        // 前面說過,LEO值永遠指向下一條不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 第11步:更新事務狀態
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")
        // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
        // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()
        // 第12步:返回寫入結果
        appendInfo
      }
    }
  }

上面代碼的主要步驟如下:

我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

analyzeAndValidateRecords

  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset: Option[Long] = None
    var lastOffset = -1L
    var sourceCodec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    var offsetOfMaxTimestamp = -1L
    var readFirstMessage = false
    var lastOffsetOfFirstBatch = -1L

    for (batch <- records.batches.asScala) {
      // we only validate V2 and higher to avoid potential compatibility issues with older clients
      // 消息格式Version 2的消息批次,起始位移值必須從0開始
      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
          s"be 0, but it is ${batch.baseOffset}")

      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
      // For magic version 2, we can get the first offset directly from the batch header.
      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
      // case, validation will be more lenient.
      // Also indicate whether we have the accurate first offset or not
      if (!readFirstMessage) {
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
          firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
        lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
        readFirstMessage = true
      }

      // check that offsets are monotonically increasing
      // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
      // 這違反了位移值單調遞增性
      if (lastOffset >= batch.lastOffset)
        monotonic = false

      // update the last offset seen
      // 使用當前batch最後一條消息的位移值去更新lastOffset
      lastOffset = batch.lastOffset

      // Check if the message sizes are valid.
      val batchSize = batch.sizeInBytes
      // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
      if (batchSize > config.maxMessageSize) {
        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
      }

      // check the validity of the message by checking CRC
      // 執行消息批次校驗,包括格式是否正確以及CRC校驗
      if (!batch.isValid) {
        brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
        throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
      }
      // 更新maxTimestamp字段和offsetOfMaxTimestamp
      if (batch.maxTimestamp > maxTimestamp) {
        maxTimestamp = batch.maxTimestamp
        offsetOfMaxTimestamp = lastOffset
      }
      // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
      shallowMessageCount += 1
      validBytesCount += batchSize
      // 從消息批次中獲取壓縮器類型
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }

    // Apply broker-side compression if any
    // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
    // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
    // 最後生成LogAppendInfo對象並返回
    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧