精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

圖解 Kafka 源碼之服務端啟動流程

云計算 Kafka
從今天開始,我們來深度剖析 Kafka「Controller」的底層源碼實現,這是 Controller 系列第一篇,我們先回過頭來繼續來深度聊聊「Kafka 服務端啟動的流程」,看看 Kafka 服務端是如何啟動的。

前面「八篇」文章通過「場景驅動方式」帶你深度剖析了 Kafka「日志系統」源碼架構設計的方方面面,從今天開始,我們來深度剖析 Kafka「Controller」的底層源碼實現,這是 Controller 系列第一篇,我們先回過頭來繼續來深度聊聊「Kafka  服務端啟動的流程」,看看 Kafka 服務端是如何啟動的。

一、總體概述

在深入剖析Kafka「Controller」之前,我想你可能或多或少會有這樣的疑問:

Kafka  服務端都有哪些組件,這些組件又是通過哪個類來啟動的呢?

這里我們通過啟動 Kafka 來了解,大家都知道,啟動 Kafka 可以執行以下命令來啟動

# 1、啟動 kafka 服務命令:
bin/kafka-server-start.sh config/server.properties &

那么今天就來看看通過這個腳本 KafkaServer 初始化了哪些組件。

二、kafka-server-start.sh

我們來看下里面的 shell 內容,如下:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注釋說明該腳本的版權信息和使用許可。
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
# 2、檢查命令行參數的個數,若小于 1 則輸出腳本的使用方法并退出。
base_dir=$(dirname $0)
# 3、獲取當前腳本所在目錄的路徑,并將其賦值給 base_dir 變量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、檢查 KAFKA_LOG4J_OPTS 環境變量是否設置,若未設置則設置該變量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
    export JMX_RMI_PORT="10000"
fi
# 5、檢查 KAFKA_HEAP_OPTS 環境變量是否設置,若未設置則設置該變量的值,并設置 JMX_PORT 和 JMX_RMI_PORT 環境變量的值,將 EXTRA_ARGS 變量的值設置為字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、檢查命令行參數中 COMMAND 變量的值是否為 -daemon,若是則將 EXTRA_ARGS 變量的值添加 -daemon 選項。同時將命令行參數向左移一位,即從 $2 開始計算參數。
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

這里我們重點來看 「第 7 步」,它底層執行的是封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。接下來我們來看下該類都做了什么。

三、kafka.Kafka 類

「Kafka.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

從整體上來看,該類就 3 個方法,相對比較簡單,我能來看下里面的重點。

這里我們通過「2.8.x」版本來講解,「2.7.x」還未增加 KafkaRaftServer 類。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {
  // 創建一個命令行參數解析器
  val optionParser = new OptionParser(false)
  // 定義 --override 選項,用于覆蓋 server.properties 文件中的屬性
  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
    .withRequiredArg()
    .ofType(classOf[String])
   
  // 定義 --version 選項,用于打印版本信息并退出
  optionParser.accepts("version", "Print version information and exit.")
  // 若沒有提供參數或者參數包含 --help 選項,則打印用法并退出
  if (args.length == 0 || args.contains("--help")) {
    CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
  }
  // 若參數中包含 --version 選項,則打印版本信息并退出
  if (args.contains("--version")) {
    CommandLineUtils.printVersionAndDie()
  }
  // 加載 server.properties 文件中的屬性到 Properties 對象中
  val props = Utils.loadProps(args(0))
  // 若提供了其他參數,則解析這些參數
  if (args.length > 1) {
    // 解析參數中的選項和參數值
    val options = optionParser.parse(args.slice(1, args.length): _*)
    // 檢查是否有非選項參數
    if (options.nonOptionArguments().size() > 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
    }
    // 將解析得到的選項和參數值添加到 props 對象中
    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
  }
  // 返回解析得到的屬性集合
  props
}

該函數的作用是從命令行參數中解析出屬性集合。它內部使用了 OptionParser 類庫來解析命令行選項,并從 server.properties 文件中加載屬性。

如果提供了 override 選項,則它將覆蓋 server.properties 文件中的相應屬性。函數返回一個 Properties 對象,其中包含了解析得到的屬性。

如果沒有提供正確的命令行參數或者提供了 --help 或 --version 選項,函數會打印幫助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {
    val config = KafkaConfig.fromProps(props, false)
    // 直接啟動定時任務、網絡層、請求處理層
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = false
      )
    } else {
      // 調用 BrokerServer 等來啟動網絡層和請求處理層
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None
      )
    }
}

在 kafka 2.8.x 版本中 新增了 raft 協議之后將 BrokerServer、ControllServer 使用了單獨的文件來啟動最終調用網絡層和請求處理層,如果還是使用 zk 的方式啟動則是 KafkaServer 啟動網絡層和請求處理層。

3、main

# 2.7.x 版本源碼
def main(args: Array[String]): Unit = {
  try {
    // 1、解析命令行參數,獲得屬性集合
    val serverProps = getPropsFromArgs(args)
    // 2、從屬性集合創建 KafkaServerStartable 對象
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    try {
      // 如果不是 Windows 操作系統,并且不是 IBM JDK,則注冊 LoggingSignalHandler
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      // 如果注冊 LoggingSignalHandler 失敗,則在日志中打印警告信息
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 3、添加 shutdown hook,用于在程序結束時執行 KafkaServerStartable 的 shutdown 方法
    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
    // 4、啟動 KafkaServerStartable 實例
    kafkaServerStartable.startup()
    // 5、等待 KafkaServerStartable 實例終止
    kafkaServerStartable.awaitShutdown()
  }
  catch {
    // 如果有異常發生,則記錄日志并退出程序
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  // 6、正常終止程序
  Exit.exit(0)
}

該函數是 Kafka 服務進程的入口,它是整個 Kafka 運行過程的驅動程序。該函數首先通過調用 getPropsFromArgs 函數解析命令行參數并獲得屬性集合,然后使用這些屬性創建 KafkaServerStartable 實例。接著,它注冊一個 shutdown hook,用于在程序終止時執行 KafkaServerStartable 的 shutdown 方法。然后它啟動 KafkaServerStartable 實例,并等待該實例終止。如果發生異常,則記錄日志并退出程序。函數最后調用 Exit.exit 方法退出程序,返回 0 表示正常終止。

# 2.8.x 版本
def main(args: Array[String]): Unit = {
    // 獲取Kafka服務的配置信息
    val serverProps = getPropsFromArgs(args)
    // 根據配置信息構建Kafka服務
    val server = buildServer(serverProps)
    try {
      // 注冊用于記錄日志的信號處理器(若實現失敗則退出)
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 掛載關閉處理器,用于捕獲終止信號和常規終止請求
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown() // 關閉Kafka服務
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.") // 日志記錄致命錯誤信息
          // 調用Exit.halt()強制退出,避免重復調用Exit.exit()引發死鎖
          Exit.halt(1)
      }
    })
    try server.startup() // 啟動Kafka服務
    catch {
      case _: Throwable =>
        // 調用Exit.exit()設置退出狀態碼,KafkaServer.startup()會在拋出異常時調用shutdown()
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
    server.awaitShutdown() // 等待Kafka服務關閉
    Exit.exit(0) // 調用Exit.exit()設置退出狀態碼
}

這里最重要的是 「第 4 步」,調用 kafkaServerStartable.startup() 或者 server.startup() 來啟動 kafka。

這里我們還是以「ZK 模式」的方式來啟動,后面抽空再進行對 「Raft 模式」啟動進行補充。

四、KafkaServerStartable

「KafkaServerStartable.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

在 Scala 語言里,在一個源代碼文件中同時定義相同名字的 class 和 object 的用法被稱為伴生(Companion)。Class 對象被稱為伴生類,它和 Java 中的類是一樣的;而 Object 對象是一個單例對象,用于保存一些靜態變量或靜態方法。

這里我們主要來看下 Class 類代碼。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  // 創建 KafkaServer 實例
  // 構造函數有兩個參數 —— staticServerConfig 表示靜態服務器配置,reporters 表示 Kafka 指標報告器。如果 threadNamePrefix 參數未用于構造函數,則默認值為 None。threadNamePrefix 參數表示線程名稱前綴,用于調試和維護目的。
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
  // 啟動 KafkaServer
  // startup 方法嘗試啟動 Kafka 服務器。如果啟動 Kafka 服務器時發生異常,則記錄一條 fatal 錯誤日志并退出程序。對于成功啟動的 Kafka 服務器,它將開始監聽客戶端連接,并在收到消息時執行所需的操作。
  def startup(): Unit = {
    try server.startup()
    catch {
      // 如果出現異常,則記錄日志并退出程序
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }
  // 關閉 KafkaServer
  // shutdown 方法嘗試停止 Kafka 服務器。如果在停止服務器時出現異常,則記錄一條 fatal 錯誤日志并強制退出程序。調用 shutdown 方法后,服務器將不再接受新的請求,并開始等待當前進行中的請求完成。當所有處理中的請求都完成后,服務器將徹底停止。
  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      // 如果出現異常,則記錄日志并強制退出程序
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }
  // setServerState 方法允許從 KafkaServerStartable 對象中設置 broker 狀態。如果自定義 KafkaServerStartable 對象想要引入新的狀態,則此方法很有用。
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }
  // 等待 KafkaServer 退出
  // awaitShutdown 方法等待 Kafka 服務器完全退出。在 Kafka 服務器執行 shutdown 方法后,它將不再接受新的請求。但是,服務器可能仍在處理一些已經接收的請求。awaitShutdown 方法將阻塞當前線程,直到服務器徹底停止。
  def awaitShutdown(): Unit = server.awaitShutdown()
}

KafkaServerStartable 類是一個可啟動和停止的 Kafka 服務器。類中的 server 成員變量是 KafkaServer 類的實例,它將在 KafkaServerStartable 類對象啟動時創建。該類提供了啟動和停止 Kafka 服務器的方法,以及設置 broker 狀態和等待 Kafka 服務器退出的方法。

跟本文有關系的是 「啟動」方法,它調用了 KafkaServer#startup 方法進行啟動。

五、KafkaServer 類

Kafka 集群由多個 Broker 節點構成,每個節點上都運行著一個 Kafka 實例,這些實例之間基于 ZK 來發現彼此,并由集群控制器 KafkaController 統籌協調運行,彼此之間基于 socket 連接進行通信。

「KafkaServer.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 為 Kafka 的啟動類,其中包含了 Kafka 的所有組件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //監控上報
                  ) extends Logging with KafkaMetricsGroup {
  //標識節點已經啟動完成
  private val startupComplete = new AtomicBoolean(false)
  //標識節點正在執行關閉操作
  private val isShuttingDown = new AtomicBoolean(false)
  //標識節點正在執行啟動操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主線程等待 KafkaServer 的關閉
  private var shutdownLatch = new CountDownLatch(1)
  //日志上下文
  private var logContext: LogContext = null
  var metrics: Metrics = null
  //記錄節點的當前狀態
  val brokerState: BrokerState = new BrokerState
  //API接口類,用于處理數據類請求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于處理控制類請求
  var controlPlaneRequestProcessor: KafkaApis = null
  //權限管理
  var authorizer: Option[Authorizer] = None
  //啟動socket,監聽9092端口,等待接收客戶端請求 
  var socketServer: SocketServer = null
  //數據類請求處理線程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令類處理線程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null
  //副本管理器
  var replicaManager: ReplicaManager = null
  //topic增刪管理器
  var adminManager: AdminManager = null
  //token管理器
  var tokenManager: DelegationTokenManager = null
  //動態配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  //分組協調器
  var groupCoordinator: GroupCoordinator = null
  //事務協調器
  var transactionCoordinator: TransactionCoordinator = null
  //集群控制器
  var kafkaController: KafkaController = null
  //定時任務調度器
  var kafkaScheduler: KafkaScheduler = null
  //集群分區狀態信息緩存
  var metadataCache: MetadataCache = null
  //配額管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null
  //zk客戶端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null
  def clusterId: String = _clusterId
  // Visible for testing
  private[kafka] def zkClient = _zkClient
  private[kafka] def brokerTopicStats = _brokerTopicStats
  ....
}

1、startup

該類方法很多,我們這里只看 startup 啟動方法,來看看其內部都啟動了哪些組件,來解決本文開頭提出的問題。

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    try {
      info("starting")
      // 是否已關閉
      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
      // 是否已啟動
      if (startupComplete.get)
        return
      // 是否可以啟動
      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) { // 設置broker狀態為Starting
        brokerState.newState(Starting)
        /* setup zookeeper */
        // 連接ZK,并創建根節點
        initZkClient(time)
        /* initialize features */
        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
        if (config.isFeatureVersioningSupported) {
          _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
        }
        /* Get or create cluster_id */
        // 從ZK獲取或創建集群id,規則:UUID的mostSigBits、leastSigBits組合轉base64
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")
        /* load metadata */
        // 獲取brokerId及log存儲路徑,brokerId通過zk生成或者server.properties配置broker.id
        // 規則:/brokers/seqid的version值 + maxReservedBrokerId(默認1000),保證唯一性
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        // 配置logger
        this.logIdent = logContext.logPrefix
        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 初始化AdminZkClient,支持動態修改配置 
        config.dynamicConfig.initialize(zkClient)
        /* start scheduler */
        // 初始化定時任務調度器
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
        kafkaScheduler.startup()
        /* create and configure metrics */
        // 創建及配置監控,默認使用JMX及Yammer Metrics
        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
        kafkaYammerMetrics.configure(config.originals)
        val jmxReporter = new JmxReporter()
        jmxReporter.configure(config.originals)
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(jmxReporter)
        val metricConfig = KafkaServer.metricConfig(config)
        val metricsContext = createKafkaMetricsContext()
        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats
        // 初始化配額管理器
        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
        // 用于保證kafka-log數據目錄的存在
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        /* start log manager */
        // 啟動日志管理器,kafka的消息以日志形式存儲
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        // 啟動日志清理、刷新、校驗、恢復等的定時線程
        logManager.startup()
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        // SCRAM認證方式的token緩存
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 啟動socket,監聽9092端口,等待接收客戶端請求 
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startProcessingRequests = false)
        /* start replica manager */
        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
        // 啟動副本管理器,高可用相關
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()
        brokerToControllerChannelManager.start()
        // 將broker信息注冊到ZK上
        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)
        // Now that the broker is successfully registered, checkpoint its metadata
        // 校驗 broker 信息
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
        /* start token manager */
        // 啟動 token 管理器
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
        tokenManager.startup()
        /* start kafka controller */
        // 啟動Kafka控制器,只有 Leader 會與ZK建連
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
        kafkaController.startup()
        // admin管理器
        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動集群群組協調器
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動事務協調器
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()
        /* Get the authorizer and initialize it if one is specified.*/
        // ACL
        authorizer = config.authorizer
        authorizer.foreach(_.configure(config.originals))
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
              ep -> cs.toCompletableFuture
            }
          case None =>
            brokerInfo.broker.endPoints.map { ep =>
              ep.toJava -> CompletableFuture.completedFuture[Void](null)
            }.toMap
        }
        // 創建拉取管理器
        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
        /* start processing requests */
        // 初始化數據類請求的KafkaApis,負責數據類請求邏輯處理
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
        // 初始化數據類請求處理的線程池  
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          // 初始化控制類請求的 KafkaApis
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
          // 初始化控制類請求的線程池
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
        }
        Mx4jLoader.maybeLoad()
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        // 啟動動態配置處理器
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
        // 啟動請求處理線程
        socketServer.startProcessingRequests(authorizerFutures)
        // 更新broker狀態
        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
 }

這里總結下該方法都啟動了哪些組件:

  • initZkClient(time) 初始化 Zk。
  • kafkaScheduler  定時器。
  • logManager 日志模塊。
  • MetadataCache  元數據緩存。
  • socketServer 網絡服務器。
  • replicaManager 副本模塊。
  • kafkaController 控制器。
  • groupCoordinator 協調器用于和ConsumerCoordinator 交互
  • transactionCoordinator 事務相關
  • fetchManager  副本拉取管理器。
  • dynamicConfigManager 動態配置管理器。

2、Broker 狀態

這個是在 2.7.x 版本之前的狀態,在 2.8.x 之后版本進行了重構。

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning:初始狀態,標識當前 broker 節點未運行。
  • Starting:標識當前 broker 節點正在啟動中。
  • RecoveringFromUncleanShutdown:標識當前 broker 節點正在從上次非正常關閉中恢復。
  • RuningAsBroker:標識當前 broker 節點啟動成功,可以對外提供服務。
  • PendingControlledShutdown:標識當前 broker 節點正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:標識當前 broker 節點正在執行 shutdown 操作。

這些就是 KafkaServer 中主要模塊的入口,接下來的文章會通過這些入口一一進行分析。

六、總結

這里,我們一起來總結一下這篇文章的重點。

  • 文章開頭通過對「kafka-server-start.sh」內容進行剖析,引出了 「kafka.Kafka」類。
  • 在「kafka.Kafka」的 main 方法中調用了「KafkaServerStartable」嘗試啟動 Kafka 服務器。
  • 接著在 「KafkaServerStartable」的 startup 方法中調用了 「KafkaServer」的 startup 方法啟動服務器需要的各種組件類。

下篇我們來深度剖析「Broker 啟動集群如何感知」,大家期待,我們下期見。

責任編輯:姜華 來源: 華仔聊技術
相關推薦

2019-09-20 08:54:38

KafkaBroker消息

2022-09-23 08:02:42

Kafka消息緩存

2023-12-26 08:16:56

Kafka緩存架構客戶端

2023-02-22 08:12:30

KafkaSender 線程

2022-05-08 17:53:38

Nacos服務端客戶端

2021-09-06 09:46:26

Dubbo 服務端開發

2021-06-11 06:54:34

Dubbo客戶端服務端

2016-03-18 09:04:42

swift服務端

2017-03-03 09:10:41

2023-03-15 08:17:27

Kafka網絡通信組件

2015-10-12 08:33:06

TCP網絡協議服務端

2021-04-16 08:54:03

CMS系統redisnode服務器

2021-08-10 20:41:33

AndroidApp流程

2022-03-06 12:15:38

NettyReactor線程

2013-03-25 10:08:44

PHPWeb

2012-03-02 10:38:33

MySQL

2019-09-23 10:47:52

Kafka架構微服務

2022-08-22 08:45:57

Kafka網絡層源碼實現

2021-06-30 06:59:47

Zabbix Server服務端MySQL

2010-08-03 09:59:30

NFS服務
點贊
收藏

51CTO技術棧公眾號

国产免费播放一区二区| 欧美videosex性欧美黑吊| 久久久999| 色99之美女主播在线视频| 亚洲成人天堂网| 青草视频在线免费直播| 久久久久久久一区| 成人欧美在线观看| 久久久久久久久久影院| 久久人人99| 日韩精品日韩在线观看| 国产成年人视频网站| 成人高潮aa毛片免费| 国产视频一区在线播放| 亚洲最大成人在线| 夜夜爽妓女8888视频免费观看| 91精品国产调教在线观看| 日韩电影大片中文字幕| 国产传媒免费观看| 一本大道色婷婷在线| 一区二区三区在线免费视频| 欧美日韩国产综合视频在线| 亚洲xxxx天美| 久久福利资源站| 日av在线播放中文不卡| 久久精品99国产精| 忘忧草精品久久久久久久高清| 日韩精品高清在线| 久久久久亚洲av无码网站| 岛国精品在线| 欧美在线三级电影| 欧美日韩亚洲一| 后进极品白嫩翘臀在线播放| 国产精品网曝门| 欧美二区三区| 亚洲 精品 综合 精品 自拍| 国产成人综合网站| 91精品久久久久久久久久久久久久 | a一区二区三区亚洲| 91精品福利在线| 能在线观看的av| 国产白浆在线免费观看| 亚洲午夜在线电影| 国产传媒久久久| av毛片在线免费看| 综合久久综合久久| 色撸撸在线观看| 国产在线观看91| 亚洲婷婷综合久久一本伊一区| 色一情一乱一伦一区二区三区丨| 国产资源在线播放| 国产三级精品三级| 亚洲国产婷婷香蕉久久久久久99| 国产在线播放av| 欧美国产欧美综合| 在线视频欧美一区| 麻豆网站在线看| 中文字幕亚洲欧美在线不卡| 一区二区成人国产精品| 国产视频在线播放| 亚洲一区二区三区在线播放| www.av91| 爱草tv视频在线观看992| 激情亚洲一区二区三区四区| 欧美爱爱视频免费看| 在线观看爽视频| 91国偷自产一区二区开放时间 | www.久久热| 精品久久一二三区| 色噜噜在线观看| 国内亚洲精品| 精品国产一区二区三区四区在线观看 | 欧美二区不卡| 久久久久在线观看| 国产精品久久久久久久久久久久久久久久久 | 欧美变态挠脚心| 精品亚洲一区二区| av永久免费观看| 中文字幕一区二区三三| 久久久久久com| 亚洲天堂视频网站| 久久99国内精品| 国产精品久久久久久久天堂第1集| 四虎永久在线精品免费网址| 国产偷国产偷精品高清尤物| 国产精品h视频| 国产丝袜在线播放| 色噜噜狠狠成人中文综合| 日本高清久久久| 91欧美日韩在线| 亚洲片在线资源| 日韩欧美综合视频| 99riav1国产精品视频| 国产精品高清免费在线观看| a级片免费观看| 久久久欧美精品sm网站| www亚洲国产| 忘忧草在线日韩www影院| 欧美偷拍一区二区| 国产精品久久久久久亚洲色 | 色乱码一区二区三区熟女| 波多野结衣在线高清| 欧美三级电影网| 日本性生活一级片| 欧美jizz| 国产成人一区二区三区小说| 国产高清第一页| 国产欧美日韩另类一区| 台湾无码一区二区| 国产日本久久| 日韩成人av在线| 国产精品丝袜一区二区| 天堂va蜜桃一区二区三区漫画版| 97视频资源在线观看| 成人欧美一区| 欧美日韩亚洲国产一区| 久草福利在线观看| 91亚洲一区| 国产精品久久久久7777婷婷| 欧美一级在线免费观看| 亚洲欧美日本在线| 亚洲成人天堂网| 国产a久久精品一区二区三区| 欧美激情小视频| 国产又粗又黄又爽| 中文天堂在线一区| 国产精品亚洲αv天堂无码| 4438全国亚洲精品观看视频| 久久色免费在线视频| 中文字幕你懂的| 国产亚洲一区二区三区在线观看| 欧美不卡在线播放| japanese色系久久精品| 久精品免费视频| 99久久国产免费| 国产精品国产馆在线真实露脸| 欧美综合在线观看视频| 久久99偷拍| 97视频免费在线看| 五月天久久久久久| 午夜影院久久久| 久久人妻少妇嫩草av无码专区 | 欧美视频免费播放| 亚洲国产国产| 日本精品视频网站| 九色在线免费| 日本精品视频一区二区三区| 精品无码一区二区三区| 亚洲欧美日韩一区在线观看| 久久亚洲综合网| 成人av观看| 亚洲视频在线观看免费| 天天射天天干天天| 欧美激情中文字幕一区二区| 亚洲综合婷婷久久| 久久久9色精品国产一区二区三区| 成人午夜一级二级三级| gogo在线高清视频| 精品国产乱码久久久久久牛牛| 日韩精品一区二区三| 久久久久国产精品厨房| 欧美成人黄色网址| 亚洲有吗中文字幕| 国产精品日韩二区| 日韩av影片| 在线观看精品自拍私拍| 国产女同91疯狂高潮互磨| 亚洲自拍偷拍图区| 国产三级国产精品| 免费成人美女在线观看.| 99re8这里只有精品| 粉嫩一区二区三区四区公司1| 欧美亚洲激情在线| 在线激情小视频| 日韩午夜在线观看| 少妇一级淫片免费放中国| 欧美国产一区二区| 国产老头和老头xxxx×| 国产农村妇女毛片精品久久莱园子| 日韩国产一区久久| 欧美日韩午夜电影网| 欧美一级成年大片在线观看| 91精彩视频在线播放| 日韩免费成人网| 精品不卡一区二区| 亚洲女与黑人做爰| 老熟妇精品一区二区三区| 久久久国产亚洲精品| 超碰97免费观看| 欧美一区 二区| 国产一区二区在线播放| wwww亚洲| 久久香蕉频线观| 四虎成人免费在线| 日韩一区二区三区视频在线| 国产性猛交╳xxx乱大交| 亚洲视频狠狠干| a毛片毛片av永久免费| 国产成人8x视频一区二区| 青青草av网站| 亚洲黄色影院| 成年人免费观看的视频| 久久av电影| 国产精品免费一区二区三区四区 | 91精品国产一区二区三区| 成年人视频在线免费看| 亚洲黄色在线视频| 天天干天天操天天拍| 99精品桃花视频在线观看| 亚洲色图偷拍视频| 久久久人人人| 天堂…中文在线最新版在线| 久久久久亚洲| 亚洲欧洲久久| 免费精品国产| 国产一区二区不卡视频| 疯狂欧洲av久久成人av电影| 国产精品∨欧美精品v日韩精品| 国产传媒在线| 欧美疯狂xxxx大交乱88av| 欧美日本一道| 在线日韩日本国产亚洲| 亚洲人午夜射精精品日韩| 日韩欧美你懂的| 国产女人18毛片水18精| 欧美精选在线播放| 在线视频1卡二卡三卡| 日本电影亚洲天堂一区| 无码人妻精品一区二| 天天综合网天天综合色| 国产无套内射又大又猛又粗又爽 | 在线观看污视频| 98精品视频| 亚洲乱码一区二区三区| 欧美日韩有码| 五月天丁香综合久久国产| 精品国产乱码久久久久久果冻传媒| 精品乱码一区二区三区| 老司机凹凸av亚洲导航| 国产女人水真多18毛片18精品 | 一区二区三区免费看| 精品国产乱码久久久| 日韩精品最新在线观看| 欧洲福利电影| 亚洲一区精品视频| 98精品久久久久久久| 国产精品av免费| 亚洲一区欧美| 成人免费在线视频播放| 欧美日韩专区| 欧美人成在线观看| 亚洲免费精品| 丝袜老师办公室里做好紧好爽 | 亚洲综合另类小说| 精品一区免费观看| 福利视频导航一区| 无码人妻久久一区二区三区| 欧美性受xxxx| 99er热精品视频| 日韩免费看网站| 色视频精品视频在线观看| 亚洲视频网站在线观看| 免费在线观看黄| 久久99久久99精品中文字幕| av小说在线播放| 青草青草久热精品视频在线网站 | 欧美视频一区二区三区在线观看 | 国产精品资源网站| 97人妻精品一区二区三区免费| 99久久免费视频.com| 日本成人免费视频| 亚洲欧美日韩国产综合在线| 久久国产精品波多野结衣| 黑人巨大精品欧美一区二区| 中国老头性行为xxxx| 91精品国产手机| 亚洲 欧美 激情 另类| 正在播放欧美一区| 欧美精品videossex少妇| 日本免费一区二区三区视频观看 | 国产欧美日本在线| 免费看成人哺乳视频网站| 中文字幕一区二区三区在线乱码 | 1769国内精品视频在线播放| 日韩精品一区二区三区av| 91久久精品一区二区别| 四虎5151久久欧美毛片| 亚洲永久激情精品| 国产欧美精品久久| 成人日韩在线视频| 92国产精品观看| 日韩激情综合网| 一本色道久久综合亚洲精品按摩| 国产精品人妻一区二区三区| 精品无人区太爽高潮在线播放| 日本电影在线观看网站| 国内精品国产三级国产在线专| 在线成人av观看| 成人动漫在线观看视频| 欧美三级情趣内衣| 91好吊色国产欧美日韩在线| 国产精品一区二区男女羞羞无遮挡| 国产精品1000部啪视频| 久久国产综合| 九九热只有这里有精品| 久久精品免费观看| 欧美熟妇一区二区| 亚洲精品国产第一综合99久久| 无码视频在线观看| 日韩av在线直播| 日本高清在线观看| 国产一区二区色| 欧美日韩亚洲在线观看| 欧美 国产 日本| 99久久精品免费看| 1024手机在线视频| 欧美电影一区二区| 999国产在线视频| 日本一区二区在线播放| 美女视频亚洲色图| 成人午夜视频免费观看| 美女脱光内衣内裤视频久久网站 | 国产成人av影视| 91视频免费观看| 精品一区免费观看| 欧美xxxx老人做受| 婷婷在线播放| yy111111少妇影院日韩夜片| 91视频精品| 亚洲欧美日韩综合网| 国产农村妇女毛片精品久久麻豆| 国产成人免费看| 亚洲成人久久电影| 青青在线视频| 国产富婆一区二区三区| 午夜久久久久| 2018国产精品| 亚洲va天堂va国产va久| 亚洲精品字幕在线| 国内成人精品视频| 黄色欧美在线| www.爱色av.com| 91一区在线观看| aaa人片在线| 亚洲欧美三级在线| 日韩在线免费| 亚洲一区二区精品在线| 九色综合狠狠综合久久| 秋霞欧美一区二区三区视频免费| 欧美欧美欧美欧美| 中中文字幕av在线| 成人在线观看91| 一本不卡影院| 欧美特级黄色录像| 欧美视频一区二区三区在线观看 | 国产91精品网站| 欧美亚洲激情| 天天久久综合网| 亚洲国产精品久久不卡毛片| 亚洲av激情无码专区在线播放| 奇米一区二区三区四区久久| 成人影院天天5g天天爽无毒影院| 不卡的在线视频| 亚洲影视在线播放| 亚洲欧美一区二区三| 国产91色在线|免| 久久久久久久久久久9不雅视频| 中文字幕无码毛片免费看| 精品福利在线视频| yw193.com尤物在线| 91黄色精品| 亚洲专区一区二区三区| 欧美激情 一区| 日韩一区二区三区免费看| 九九精品调教| 欧美精品123| 韩国三级在线一区| 中文字幕一区二区三区手机版 | 午夜小视频福利在线观看| 国产免费一区| 麻豆成人在线观看| 国产精品成人久久| 在线日韩欧美视频| jizz国产精品| 色综合色综合色综合色综合| 亚洲自拍另类综合| 大乳在线免费观看| 古典武侠综合av第一页| 日韩av一级电影| 中文字幕一区二区三区手机版| 揄拍成人国产精品视频| 99精品在免费线中文字幕网站一区 | 天天干天天舔天天操| 日韩欧美一区二区久久婷婷| 亚洲人体视频| 欧妇女乱妇女乱视频| 国产欧美一区二区精品性色 | 久久久极品av| 亚欧日韩另类中文欧美| 亚洲国产综合av|